ceph: control access to page vector for incoming data
authorSage Weil <sage@newdream.net>
Tue, 22 Dec 2009 18:45:45 +0000 (10:45 -0800)
committerSage Weil <sage@newdream.net>
Wed, 23 Dec 2009 16:17:20 +0000 (08:17 -0800)
When we issue an OSD read, we specify a vector of pages that the data is to
be read into.  The request may be sent multiple times, to multiple OSDs, if
the osdmap changes, which means we can get more than one reply.

Only read data into the page vector if the reply is coming from the
OSD we last sent the request to.  Keep track of which connection is using
the vector by taking a reference.  If another connection was already
using the vector before and a new reply comes in on the right connection,
revoke the pages from the other connection.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h
fs/ceph/osd_client.c
fs/ceph/osd_client.h

index c03b4185c143c3deb74a24baacced90ee760ae80..506b638a023b207c32e621733a0169731661b372 100644 (file)
@@ -1975,6 +1975,35 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
        mutex_unlock(&con->mutex);
 }
 
+/*
+ * Revoke a page vector that we may be reading data into
+ */
+void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
+{
+       mutex_lock(&con->mutex);
+       if (con->in_msg && con->in_msg->pages == pages) {
+               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+
+               /* skip rest of message */
+               dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
+                    con->in_msg, pages);
+               if (con->in_msg_pos.data_pos < data_len)
+                       con->in_base_pos = con->in_msg_pos.data_pos - data_len;
+               else
+                       con->in_base_pos = con->in_base_pos -
+                               sizeof(struct ceph_msg_header) -
+                               sizeof(struct ceph_msg_footer);
+               con->in_msg->pages = NULL;
+               ceph_msg_put(con->in_msg);
+               con->in_msg = NULL;
+               con->in_tag = CEPH_MSGR_TAG_READY;
+       } else {
+               dout("con_revoke_pages %p msg %p pages %p no-op\n",
+                    con, con->in_msg, pages);
+       }
+       mutex_unlock(&con->mutex);
+}
+
 /*
  * Queue a keepalive byte to ensure the tcp connection is alive.
  */
index 94b55de90331aa29f2f5eb58162e7285da1ff46a..7e2aab1d3ce2bd9ff13025cec401df4d2a5c880a 100644 (file)
@@ -230,6 +230,8 @@ extern void ceph_con_open(struct ceph_connection *con,
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
+extern void ceph_con_revoke_pages(struct ceph_connection *con,
+                                 struct page **pages);
 extern void ceph_con_keepalive(struct ceph_connection *con);
 extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
 extern void ceph_con_put(struct ceph_connection *con);
index a1800fb63237d6fcffbc8bb82e6cf952d3e6b313..374f0013956cb7522f7e361ca8be65694a208fa0 100644 (file)
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref)
                ceph_msg_put(req->r_request);
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
+       if (req->r_con_filling_pages) {
+               dout("release_request revoking pages %p from con %p\n",
+                    req->r_pages, req->r_con_filling_pages);
+               ceph_con_revoke_pages(req->r_con_filling_pages,
+                                     req->r_pages);
+               ceph_con_put(req->r_con_filling_pages);
+       }
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work)
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
+                        struct ceph_connection *con)
 {
        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
        struct ceph_osd_request *req;
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        ceph_osdc_get_request(req);
        flags = le32_to_cpu(rhead->flags);
 
+       /*
+        * if this connection filled our pages, drop our reference now, to
+        * avoid a (safe but slower) revoke later.
+        */
+       if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
+               dout(" got pages, dropping con_filling_pages ref %p\n", con);
+               req->r_con_filling_pages = NULL;
+               ceph_con_put(con);
+       }
+
        if (req->r_reply) {
                /*
                 * once we see the message has been received, we don't
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
        }
        dout("prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
-               m->pages = req->r_pages;
-               m->nr_pages = req->r_num_pages;
-               req->r_reply = m;  /* only for duration of read over socket */
-               ceph_msg_get(m);
-               req->r_prepared_pages = 1;
-               ret = 0; /* success */
+       if (unlikely(req->r_num_pages < want))
+               goto out;
+
+       if (req->r_con_filling_pages) {
+               dout("revoking pages %p from old con %p\n", req->r_pages,
+                    req->r_con_filling_pages);
+               ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
+               ceph_con_put(req->r_con_filling_pages);
        }
+       req->r_con_filling_pages = ceph_con_get(con);
+       req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
+       m->pages = req->r_pages;
+       m->nr_pages = req->r_num_pages;
+       ret = 0; /* success */
 out:
        mutex_unlock(&osdc->request_mutex);
        return ret;
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg);
+               handle_reply(osdc, msg, con);
                break;
 
        default:
index 2e4cfd1e9f1079b45e717aed668bdaa781957bfe..8fef71cc4457ed70a0a801a54e6263cd12b7ec81 100644 (file)
@@ -43,11 +43,13 @@ struct ceph_osd_request {
        struct list_head r_osd_item;
        struct ceph_osd *r_osd;
 
+       struct ceph_connection *r_con_filling_pages;
+
        struct ceph_msg  *r_request, *r_reply;
        int               r_result;
        int               r_flags;     /* any additional flags for the osd */
        u32               r_sent;      /* >0 if r_request is sending/sent */
-       int r_prepared_pages, r_got_reply;
+       int               r_got_reply;
        int               r_num_prealloc_reply;
 
        struct ceph_osd_client *r_osdc;