libceph: change how "safe" callback is used
authorAlex Elder <elder@inktank.com>
Mon, 15 Apr 2013 16:20:42 +0000 (11:20 -0500)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:18:52 +0000 (21:18 -0700)
An osd request currently has two callbacks.  They inform the
initiator of the request when we've received confirmation for the
target osd that a request was received, and when the osd indicates
all changes described by the request are durable.

The only time the second callback is used is in the ceph file system
for a synchronous write.  There's a race that makes some handling of
this case unsafe.  This patch addresses this problem.  The error
handling for this callback is also kind of gross, and this patch
changes that as well.

In ceph_sync_write(), if a safe callback is requested we want to add
the request on the ceph inode's unsafe items list.  Because items on
this list must have their tid set (by ceph_osd_start_request()), the
request added *after* the call to that function returns.  The
problem with this is that there's a race between starting the
request and adding it to the unsafe items list; the request may
already be complete before ceph_sync_write() even begins to put it
on the list.

To address this, we change the way the "safe" callback is used.
Rather than just calling it when the request is "safe", we use it to
notify the initiator the bounds (start and end) of the period during
which the request is *unsafe*.  So the initiator gets notified just
before the request gets sent to the osd (when it is "unsafe"), and
again when it's known the results are durable (it's no longer
unsafe).  The first call will get made in __send_request(), just
before the request message gets sent to the messenger for the first
time.  That function is only called by __send_queued(), which is
always called with the osd client's request mutex held.

We then have this callback function insert the request on the ceph
inode's unsafe list when we're told the request is unsafe.  This
will avoid the race because this call will be made under protection
of the osd client's request mutex.  It also nicely groups the setup
and cleanup of the state associated with managing unsafe requests.

The name of the "safe" callback field is changed to "unsafe" to
better reflect its new purpose.  It has a Boolean "unsafe" parameter
to indicate whether the request is becoming unsafe or is now safe.
Because the "msg" parameter wasn't used, we drop that.

This resolves the original problem reportedin:
    http://tracker.ceph.com/issues/4706

Reported-by: Yan, Zheng <zheng.z.yan@intel.com>
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: Sage Weil <sage@inktank.com>
fs/ceph/file.c
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index ae23e31a8f383e39c8427f02015fa2e45b5057a9..a65acf355384ef4da4fcecc8503243b4606349de 100644 (file)
@@ -446,19 +446,35 @@ done:
 }
 
 /*
- * Write commit callback, called if we requested both an ACK and
- * ONDISK commit reply from the OSD.
+ * Write commit request unsafe callback, called to tell us when a
+ * request is unsafe (that is, in flight--has been handed to the
+ * messenger to send to its target osd).  It is called again when
+ * we've received a response message indicating the request is
+ * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
+ * is completed early (and unsuccessfully) due to a timeout or
+ * interrupt.
+ *
+ * This is used if we requested both an ACK and ONDISK commit reply
+ * from the OSD.
  */
-static void sync_write_commit(struct ceph_osd_request *req,
-                             struct ceph_msg *msg)
+static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 {
        struct ceph_inode_info *ci = ceph_inode(req->r_inode);
 
-       dout("sync_write_commit %p tid %llu\n", req, req->r_tid);
-       spin_lock(&ci->i_unsafe_lock);
-       list_del_init(&req->r_unsafe_item);
-       spin_unlock(&ci->i_unsafe_lock);
-       ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+       dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
+               unsafe ? "un" : "");
+       if (unsafe) {
+               ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
+               spin_lock(&ci->i_unsafe_lock);
+               list_add_tail(&req->r_unsafe_item,
+                             &ci->i_unsafe_writes);
+               spin_unlock(&ci->i_unsafe_lock);
+       } else {
+               spin_lock(&ci->i_unsafe_lock);
+               list_del_init(&req->r_unsafe_item);
+               spin_unlock(&ci->i_unsafe_lock);
+               ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+       }
 }
 
 /*
@@ -570,7 +586,8 @@ more:
 
                if ((file->f_flags & O_SYNC) == 0) {
                        /* get a second commit callback */
-                       req->r_safe_callback = sync_write_commit;
+                       req->r_unsafe_callback = ceph_sync_write_unsafe;
+                       req->r_inode = inode;
                        own_pages = true;
                }
        }
@@ -581,21 +598,8 @@ more:
        ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
        ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-       if (!ret) {
-               if (req->r_safe_callback) {
-                       /*
-                        * Add to inode unsafe list only after we
-                        * start_request so that a tid has been assigned.
-                        */
-                       spin_lock(&ci->i_unsafe_lock);
-                       list_add_tail(&req->r_unsafe_item,
-                                     &ci->i_unsafe_writes);
-                       spin_unlock(&ci->i_unsafe_lock);
-                       ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
-               }
-               
+       if (!ret)
                ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
-       }
 
        if (file->f_flags & O_DIRECT)
                ceph_put_page_vector(pages, num_pages, false);
index 2a68a7465c18db56be9e5563e902bf4216defe41..0d3358ef5285cb2bb17b5246b7737937edda3cb3 100644 (file)
@@ -29,6 +29,7 @@ struct ceph_authorizer;
  */
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
                                     struct ceph_msg *);
+typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
 
 /* a given osd we're communicating with */
 struct ceph_osd {
@@ -149,7 +150,8 @@ struct ceph_osd_request {
        struct kref       r_kref;
        bool              r_mempool;
        struct completion r_completion, r_safe_completion;
-       ceph_osdc_callback_t r_callback, r_safe_callback;
+       ceph_osdc_callback_t r_callback;
+       ceph_osdc_unsafe_callback_t r_unsafe_callback;
        struct ceph_eversion r_reassert_version;
        struct list_head  r_unsafe_item;
 
index 939be67199ca5fed987313fbb183eb16d66f58d0..0c5bf2fb50756df44a4f9904a5dc2be0b5c35178 100644 (file)
@@ -1314,8 +1314,14 @@ static void __send_request(struct ceph_osd_client *osdc,
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+       /* Mark the request unsafe if this is the first timet's being sent. */
+
+       if (!req->r_sent && req->r_unsafe_callback)
+               req->r_unsafe_callback(req, true);
        req->r_sent = req->r_osd->o_incarnation;
+
+       ceph_con_send(&req->r_osd->o_con, req->r_request);
 }
 
 /*
@@ -1403,8 +1409,8 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-       if (req->r_safe_callback)
-               req->r_safe_callback(req, NULL);
+       if (req->r_unsafe_callback)
+               req->r_unsafe_callback(req, false);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }