]> git.baikalelectronics.ru Git - kernel.git/commitdiff
ceph: using hash value to compose dentry offset
authorYan, Zheng <zyan@redhat.com>
Fri, 29 Apr 2016 03:27:30 +0000 (11:27 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:36 +0000 (01:15 +0200)
If MDS sorts dentries in dirfrag in hash order, we use hash value to
compose dentry offset. dentry offset is:

  (0xff << 52) | ((24 bits hash) << 28) |
  (the nth entry hash hash collision)

This offset is stable across directory fragmentation. This alos means
there is no need to reset readdir offset if directory get fragmented
in the middle of readdir.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/dir.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h
include/linux/ceph/ceph_fs.h

index e954ea2fb71056df9d9351370430acdb0f6acd8c..4850c3624a873c024b7ce11aa5ab0a89711b51da 100644 (file)
@@ -69,16 +69,42 @@ out_unlock:
 }
 
 /*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ *     (0xff << 52) | ((24 bits hash) << 28) |
+ *     (the nth entry has hash collision);
+ * - frag+name order;
+ *     ((frag value) << 28) | (the nth entry in frag);
  */
+#define OFFSET_BITS    28
+#define OFFSET_MASK    ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER     (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+       loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+       if (hash_order)
+               fpos |= HASH_ORDER;
+       return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+       return (p & HASH_ORDER) == HASH_ORDER;
+}
+
 static unsigned fpos_frag(loff_t p)
 {
-       return p >> 32;
+       return p >> OFFSET_BITS;
 }
+
+static unsigned fpos_hash(loff_t p)
+{
+       return ceph_frag_value(fpos_frag(p));
+}
+
 static unsigned fpos_off(loff_t p)
 {
-       return p & 0xffffffff;
+       return p & OFFSET_MASK;
 }
 
 static int fpos_cmp(loff_t l, loff_t r)
@@ -177,7 +203,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        u64 idx = 0;
        int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
 
        /* search start position */
        if (ctx->pos > 2) {
@@ -234,7 +260,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                spin_unlock(&dentry->d_lock);
 
                if (emit_dentry) {
-                       dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+                       dout(" %llx dentry %p %pd %p\n", di->offset,
                             dentry, dentry, d_inode(dentry));
                        ctx->pos = di->offset;
                        if (!dir_emit(ctx, dentry->d_name.name,
@@ -269,6 +295,16 @@ out:
        return err;
 }
 
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+       if (!fi->last_readdir)
+               return true;
+       if (is_hash_order(pos))
+               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+       else
+               return fi->frag != fpos_frag(pos);
+}
+
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
        struct ceph_file_info *fi = file->private_data;
@@ -276,7 +312,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
-       unsigned frag = fpos_frag(ctx->pos);
        int i;
        int err;
        u32 ftype;
@@ -317,7 +352,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
                        return err;
-               frag = fpos_frag(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
@@ -325,8 +359,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (fi->frag != frag || fi->last_readdir == NULL) {
+       if (need_send_readdir(fi, ctx->pos)) {
                struct ceph_mds_request *req;
+               unsigned frag;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
@@ -336,6 +371,13 @@ more:
                        fi->last_readdir = NULL;
                }
 
+               if (is_hash_order(ctx->pos)) {
+                       frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+                                               NULL, NULL);
+               } else {
+                       frag = fpos_frag(ctx->pos);
+               }
+
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
                     ceph_vinop(inode), frag, fi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -373,19 +415,23 @@ more:
                        ceph_mdsc_put_request(req);
                        return err;
                }
-               dout("readdir got and parsed readdir result=%d"
-                    " on frag %x, end=%d, complete=%d\n", err, frag,
+               dout("readdir got and parsed readdir result=%d on "
+                    "frag %x, end=%d, complete=%d, hash_order=%d\n",
+                    err, frag,
                     (int)req->r_reply_info.dir_end,
-                    (int)req->r_reply_info.dir_complete);
-
+                    (int)req->r_reply_info.dir_complete,
+                    (int)req->r_reply_info.hash_order);
 
-               /* note next offset and last dentry name */
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       fi->next_offset = req->r_readdir_offset;
-                       /* adjust ctx->pos to beginning of frag */
-                       ctx->pos = ceph_make_fpos(frag, fi->next_offset);
+                       if (!rinfo->hash_order) {
+                               fi->next_offset = req->r_readdir_offset;
+                               /* adjust ctx->pos to beginning of frag */
+                               ctx->pos = ceph_make_fpos(frag,
+                                                         fi->next_offset,
+                                                         false);
+                       }
                }
 
                fi->frag = frag;
@@ -411,23 +457,25 @@ more:
                        fi->dir_release_count = 0;
                }
 
-               if (req->r_reply_info.dir_end) {
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
-                       fi->next_offset = 2;
-               } else {
+               /* note next offset and last dentry name */
+               if (rinfo->dir_nr > 0) {
                        struct ceph_mds_reply_dir_entry *rde =
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
+                       unsigned next_offset = req->r_reply_info.dir_end ?
+                                       2 : (fpos_off(rde->offset) + 1);
                        err = note_last_dentry(fi, rde->name, rde->name_len,
-                                              fpos_off(rde->offset) + 1);
+                                              next_offset);
                        if (err)
                                return err;
+               } else if (req->r_reply_info.dir_end) {
+                       fi->next_offset = 2;
+                       /* keep last name */
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
        dout("readdir frag %x num %d pos %llx chunk first %llx\n",
-            frag, rinfo->dir_nr, ctx->pos,
+            fi->frag, rinfo->dir_nr, ctx->pos,
             rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
        i = 0;
@@ -470,16 +518,26 @@ more:
                ctx->pos++;
        }
 
-       if (fi->last_name) {
+       if (fi->next_offset > 2) {
                ceph_mdsc_put_request(fi->last_readdir);
                fi->last_readdir = NULL;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(frag)) {
-               frag = ceph_frag_next(frag);
-               ctx->pos = ceph_make_fpos(frag, 2);
+       if (!ceph_frag_is_rightmost(fi->frag)) {
+               unsigned frag = ceph_frag_next(fi->frag);
+               if (is_hash_order(ctx->pos)) {
+                       loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+                                                       fi->next_offset, true);
+                       if (new_pos > ctx->pos)
+                               ctx->pos = new_pos;
+                       /* keep last_name */
+               } else {
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+                       kfree(fi->last_name);
+                       fi->last_name = NULL;
+               }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
@@ -532,14 +590,21 @@ static void reset_readdir(struct ceph_file_info *fi)
 static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
 {
        struct ceph_mds_reply_info_parsed *rinfo;
+       loff_t chunk_offset;
        if (new_pos == 0)
                return true;
-       if (fpos_frag(new_pos) != fi->frag)
+       if (is_hash_order(new_pos)) {
+               /* no need to reset last_name for a forward seek when
+                * dentries are sotred in hash order */
+       } else if (fi->frag |= fpos_frag(new_pos)) {
                return true;
+       }
        rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
        if (!rinfo || !rinfo->dir_nr)
                return true;
-       return new_pos < rinfo->dir_entries[0].offset;;
+       chunk_offset = rinfo->dir_entries[0].offset;
+       return new_pos < chunk_offset ||
+              is_hash_order(new_pos) != is_hash_order(chunk_offset);
 }
 
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
@@ -562,17 +627,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
+               if (need_reset_readdir(fi, offset)) {
+                       dout("dir_llseek dropping %p content\n", file);
+                       reset_readdir(fi);
+               } else if (is_hash_order(offset) && offset > file->f_pos) {
+                       /* for hash offset, we don't know if a forward seek
+                        * is within same frag */
+                       fi->dir_release_count = 0;
+                       fi->readdir_cache_idx = -1;
+               }
+
                if (offset != file->f_pos) {
                        file->f_pos = offset;
                        file->f_version = 0;
                        fi->flags &= ~CEPH_F_ATEND;
                }
                retval = offset;
-
-               if (need_reset_readdir(fi, offset)) {
-                       dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi);
-               }
        }
 out:
        inode_unlock(inode);
index b53c95903aebb83e1b7c85971541d7f6aee14cdb..f51b6fd5f570e891d1f16a793f6aa57a5d5573e8 100644 (file)
@@ -1387,6 +1387,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
                             struct ceph_mds_session *session)
 {
        struct dentry *parent = req->r_dentry;
+       struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
        struct qstr dname;
        struct dentry *dn;
@@ -1394,19 +1395,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        int err = 0, skipped = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       struct ceph_dentry_info *di;
        u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+       u32 last_hash = 0;
+       u32 fpos_offset;
        struct ceph_readdir_cache_control cache_ctl = {};
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
 
+       if (rinfo->hash_order && req->r_path2) {
+               last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                         req->r_path2, strlen(req->r_path2));
+               last_hash = ceph_frag_value(last_hash);
+       }
+
        if (rinfo->dir_dir &&
            le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                dout("readdir_prepopulate got new frag %x -> %x\n",
                     frag, le32_to_cpu(rinfo->dir_dir->frag));
                frag = le32_to_cpu(rinfo->dir_dir->frag);
-               req->r_readdir_offset = 2;
+               if (!rinfo->hash_order)
+                       req->r_readdir_offset = 2;
        }
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
@@ -1424,13 +1433,13 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
                /* note dir version at start of readdir so we can tell
                 * if any dentries get dropped */
-               struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
                req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
                req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
                req->r_readdir_cache_idx = 0;
        }
 
        cache_ctl.index = req->r_readdir_cache_idx;
+       fpos_offset = req->r_readdir_offset;
 
        /* FIXME: release caps/leases if error occurs */
        for (i = 0; i < rinfo->dir_nr; i++) {
@@ -1444,6 +1453,18 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
 
+               if (rinfo->hash_order) {
+                       u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                                rde->name, rde->name_len);
+                       hash = ceph_frag_value(hash);
+                       if (hash != last_hash)
+                               fpos_offset = 2;
+                       last_hash = hash;
+                       rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
+               } else {
+                       rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
+               }
+
 retry_lookup:
                dn = d_lookup(parent, &dname);
                dout("d_lookup on parent=%p name=%.*s got %p\n",
@@ -1521,9 +1542,7 @@ retry_lookup:
                        dn = realdn;
                }
 
-               di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
-               rde->offset = di->offset;
+               ceph_dentry(dn)->offset = rde->offset;
 
                update_dentry_lease(dn, rde->lease, req->r_session,
                                    req->r_request_started);
index 48def22fc7b940e0742756836a4be7e918c4f229..7ad31283d510bf8036c7160e85f262da4a417eae 100644 (file)
@@ -185,6 +185,7 @@ static int parse_reply_info_dir(void **p, void *end,
                u16 flags = ceph_decode_16(p);
                info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
                info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
+               info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
        }
        if (num == 0)
                goto done;
index 4ce19d852657caf37c8491f146dda99a93fb6011..e7d38aac71093f41bc4c50bfc339cffa3c7069b7 100644 (file)
@@ -81,7 +81,9 @@ struct ceph_mds_reply_info_parsed {
                        struct ceph_mds_reply_dirfrag *dir_dir;
                        size_t                        dir_buf_size;
                        int                           dir_nr;
-                       bool                          dir_complete, dir_end;
+                       bool                          dir_complete;
+                       bool                          dir_end;
+                       bool                          hash_order;
                        struct ceph_mds_reply_dir_entry  *dir_entries;
                };
 
index 0628099ba1f2dec3b026f14f818153be75e38fdd..c9b671dfff810b8789b56724e2cc9884160acd85 100644 (file)
@@ -540,11 +540,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
        return (struct ceph_dentry_info *)dentry->d_fsdata;
 }
 
-static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
-{
-       return ((loff_t)frag << 32) | (loff_t)off;
-}
-
 /*
  * caps helpers
  */
@@ -949,6 +944,7 @@ extern const struct inode_operations ceph_snapdir_iops;
 extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
        ceph_snapdir_dentry_ops;
 
+extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
 extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
 extern int ceph_handle_snapdir(struct ceph_mds_request *req,
                               struct dentry *dentry, int err);
index a811c5e98bfa37425dc46df0eed5bc10761e1583..dfce616002ad97337d2c9f0ad9cff3d3a62485e2 100644 (file)
@@ -357,6 +357,7 @@ extern const char *ceph_mds_op_name(int op);
  */
 #define CEPH_READDIR_FRAG_END          (1<<0)
 #define CEPH_READDIR_FRAG_COMPLETE     (1<<8)
+#define CEPH_READDIR_HASH_ORDER                (1<<9)
 
 union ceph_mds_request_args {
        struct {