libceph: fix osd request queuing on osdmap updates
authorSage Weil <sage@newdream.net>
Tue, 18 Jan 2011 04:34:08 +0000 (20:34 -0800)
committerSage Weil <sage@newdream.net>
Mon, 21 Mar 2011 19:24:19 +0000 (12:24 -0700)
If we send a request to osd A, and the request's pg remaps to osd B and
then back to A in quick succession, we need to resend the request to A. The
old code was only calling kick_requests after processing all incremental
maps in a message, so it was very possible to not resend a request that
needed to be resent.  This would make the osd eventually time out (at least
with the current default of osd timeouts enabled).

The correct approach is to scan requests on every map incremental.  This
patch refactors the kick code in a few ways:
 - all requests are either on req_lru (in flight), req_unsent (ready to
   send), or req_notarget (currently map to no up osd)
 - mapping always done by map_request (previous map_osds)
 - if the mapping changes, we requeue.  requests are resent only after all
   map incrementals are processed.
 - some osd reset code is moved out of kick_requests into a separate
   function
 - the "kick this osd" functionality is moved to kick_osd_requests, as it
   is unrelated to scanning for request->pg->osd mapping changes

Signed-off-by: Sage Weil <sage@newdream.net>
include/linux/ceph/osd_client.h
net/ceph/osd_client.c

index a1af29648fb54be5c0b5ac1cfc62879314145d56..e791b8e4635391ecd1b628d7edb8b93c9c2f72f9 100644 (file)
@@ -74,7 +74,6 @@ struct ceph_osd_request {
        char              r_oid[40];          /* object name */
        int               r_oid_len;
        unsigned long     r_stamp;            /* send OR check time */
-       bool              r_resend;           /* msg send failed, needs retry */
 
        struct ceph_file_layout r_file_layout;
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
@@ -104,7 +103,9 @@ struct ceph_osd_client {
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */
-       struct list_head       req_lru;       /* pending requests lru */
+       struct list_head       req_lru;       /* in-flight lru */
+       struct list_head       req_unsent;    /* unsent/need-resend queue */
+       struct list_head       req_notarget;  /* map to no osd */
        int                    num_requests;
        struct delayed_work    timeout_work;
        struct delayed_work    osds_timeout_work;
index 3e20a122ffa2f2bf40a91596fe12bb93e6874b64..b85ed5a5503dde0640c3eb17003c5e736ffef7c5 100644 (file)
 #define OSD_OPREPLY_FRONT_LEN  512
 
 static const struct ceph_connection_operations osd_con_ops;
-static int __kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd);
 
-static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void send_queued(struct ceph_osd_client *osdc);
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
 static int op_needs_trail(int op)
 {
@@ -529,6 +528,35 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void __kick_osd_requests(struct ceph_osd_client *osdc,
+                               struct ceph_osd *osd)
+{
+       struct ceph_osd_request *req;
+       int err;
+
+       dout("__kick_osd_requests osd%d\n", osd->o_osd);
+       err = __reset_osd(osdc, osd);
+       if (err == -EAGAIN)
+               return;
+
+       list_for_each_entry(req, &osd->o_requests, r_osd_item) {
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+                    osd->o_osd);
+               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+       }
+}
+
+static void kick_osd_requests(struct ceph_osd_client *osdc,
+                             struct ceph_osd *kickosd)
+{
+       mutex_lock(&osdc->request_mutex);
+       __kick_osd_requests(osdc, kickosd);
+       mutex_unlock(&osdc->request_mutex);
+}
 
 /*
  * If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +571,8 @@ static void osd_reset(struct ceph_connection *con)
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
        down_read(&osdc->map_sem);
-       kick_requests(osdc, osd);
+       kick_osd_requests(osdc, osd);
+       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -781,20 +810,20 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
-       list_del_init(&req->r_req_lru_item);
 }
 
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.
+ * no up osd, set r_osd to NULL.  Move the request to the appropiate list
+ * (unsent, homeless) or leave on in-flight lru.
  *
  * Return 0 if unchanged, 1 if changed, or negative on error.
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static int __map_osds(struct ceph_osd_client *osdc,
-                     struct ceph_osd_request *req)
+static int __map_request(struct ceph_osd_client *osdc,
+                        struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
@@ -802,11 +831,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
        int o = -1, num = 0;
        int err;
 
-       dout("map_osds %p tid %lld\n", req, req->r_tid);
+       dout("map_request %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
                                      &req->r_file_layout, osdc->osdmap);
-       if (err)
+       if (err) {
+               list_move(&req->r_req_lru_item, &osdc->req_notarget);
                return err;
+       }
        pgid = reqhead->layout.ol_pgid;
        req->r_pgid = pgid;
 
@@ -823,7 +854,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
-       dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+       dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
@@ -841,10 +872,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
                req->r_osd = create_osd(osdc);
-               if (!req->r_osd)
+               if (!req->r_osd) {
+                       list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
+               }
 
-               dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+               dout("map_request osd %p is osd%d\n", req->r_osd, o);
                req->r_osd->o_osd = o;
                req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
@@ -855,6 +888,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (req->r_osd) {
                __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+       } else {
+               list_move(&req->r_req_lru_item, &osdc->req_notarget);
        }
        err = 1;   /* osd or pg changed */
 
@@ -869,16 +905,6 @@ static int __send_request(struct ceph_osd_client *osdc,
                          struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
-       int err;
-
-       err = __map_osds(osdc, req);
-       if (err < 0)
-               return err;
-       if (req->r_osd == NULL) {
-               dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
-               return 0;
-       }
 
        dout("send_request %p tid %llu to osd%d flags %d\n",
             req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -897,6 +923,21 @@ static int __send_request(struct ceph_osd_client *osdc,
        return 0;
 }
 
+/*
+ * Send any requests in the queue (req_unsent).
+ */
+static void send_queued(struct ceph_osd_client *osdc)
+{
+       struct ceph_osd_request *req, *tmp;
+
+       dout("send_queued\n");
+       mutex_lock(&osdc->request_mutex);
+       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+               __send_request(osdc, req);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -916,7 +957,6 @@ static void handle_timeout(struct work_struct *work)
        unsigned long keepalive =
                osdc->client->options->osd_keepalive_timeout * HZ;
        unsigned long last_stamp = 0;
-       struct rb_node *p;
        struct list_head slow_osds;
 
        dout("timeout\n");
@@ -925,21 +965,6 @@ static void handle_timeout(struct work_struct *work)
        ceph_monc_request_next_osdmap(&osdc->client->monc);
 
        mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-               req = rb_entry(p, struct ceph_osd_request, r_node);
-
-               if (req->r_resend) {
-                       int err;
-
-                       dout("osdc resending prev failed %lld\n", req->r_tid);
-                       err = __send_request(osdc, req);
-                       if (err)
-                               dout("osdc failed again on %lld\n", req->r_tid);
-                       else
-                               req->r_resend = false;
-                       continue;
-               }
-       }
 
        /*
         * reset osds that appear to be _really_ unresponsive.  this
@@ -963,7 +988,7 @@ static void handle_timeout(struct work_struct *work)
                BUG_ON(!osd);
                pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
                           req->r_tid, osd->o_osd);
-               __kick_requests(osdc, osd);
+               __kick_osd_requests(osdc, osd);
        }
 
        /*
@@ -991,7 +1016,7 @@ static void handle_timeout(struct work_struct *work)
 
        __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
-
+       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -1109,108 +1134,61 @@ bad:
        ceph_msg_dump(msg);
 }
 
-
-static int __kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd)
+static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
-       struct ceph_osd_request *req;
        struct rb_node *p, *n;
-       int needmap = 0;
-       int err;
-
-       dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       if (kickosd) {
-               err = __reset_osd(osdc, kickosd);
-               if (err == -EAGAIN)
-                       return 1;
-       } else {
-               for (p = rb_first(&osdc->osds); p; p = n) {
-                       struct ceph_osd *osd =
-                               rb_entry(p, struct ceph_osd, o_node);
-
-                       n = rb_next(p);
-                       if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-                           memcmp(&osd->o_con.peer_addr,
-                                  ceph_osd_addr(osdc->osdmap,
-                                                osd->o_osd),
-                                  sizeof(struct ceph_entity_addr)) != 0)
-                               __reset_osd(osdc, osd);
-               }
-       }
-
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-               req = rb_entry(p, struct ceph_osd_request, r_node);
-
-               if (req->r_resend) {
-                       dout(" r_resend set on tid %llu\n", req->r_tid);
-                       __cancel_request(req);
-                       goto kick;
-               }
-               if (req->r_osd && kickosd == req->r_osd) {
-                       __cancel_request(req);
-                       goto kick;
-               }
 
-               err = __map_osds(osdc, req);
-               if (err == 0)
-                       continue;  /* no change */
-               if (err < 0) {
-                       /*
-                        * FIXME: really, we should set the request
-                        * error and fail if this isn't a 'nofail'
-                        * request, but that's a fair bit more
-                        * complicated to do.  So retry!
-                        */
-                       dout(" setting r_resend on %llu\n", req->r_tid);
-                       req->r_resend = true;
-                       continue;
-               }
-               if (req->r_osd == NULL) {
-                       dout("tid %llu maps to no valid osd\n", req->r_tid);
-                       needmap++;  /* request a newer map */
-                       continue;
-               }
+       for (p = rb_first(&osdc->osds); p; p = n) {
+               struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
 
-kick:
-               dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-                    req->r_osd ? req->r_osd->o_osd : -1);
-               req->r_flags |= CEPH_OSD_FLAG_RETRY;
-               err = __send_request(osdc, req);
-               if (err) {
-                       dout(" setting r_resend on %llu\n", req->r_tid);
-                       req->r_resend = true;
-               }
+               n = rb_next(p);
+               if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+                   memcmp(&osd->o_con.peer_addr,
+                          ceph_osd_addr(osdc->osdmap,
+                                        osd->o_osd),
+                          sizeof(struct ceph_entity_addr)) != 0)
+                       __reset_osd(osdc, osd);
        }
-
-       return needmap;
 }
 
 /*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
+ * Requeue requests whose mapping to an OSD has changed.  If requests map to
+ * no osd, request a new map.
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static void kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd)
+static void kick_requests(struct ceph_osd_client *osdc)
 {
-       int needmap;
+       struct ceph_osd_request *req;
+       struct rb_node *p;
+       int needmap = 0;
+       int err;
 
+       dout("kick_requests\n");
        mutex_lock(&osdc->request_mutex);
-       needmap = __kick_requests(osdc, kickosd);
+       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_osd_request, r_node);
+               err = __map_request(osdc, req);
+               if (err < 0)
+                       continue;  /* error */
+               if (req->r_osd == NULL) {
+                       dout("%p tid %llu maps to no osd\n", req, req->r_tid);
+                       needmap++;  /* request a newer map */
+               } else if (err > 0) {
+                       dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
+                            req->r_osd ? req->r_osd->o_osd : -1);
+                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               }
+       }
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-
 }
+
+
 /*
  * Process updated osd map.
  *
@@ -1263,6 +1241,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
                        }
+                       kick_requests(osdc);
+                       reset_changed_osds(osdc);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1300,6 +1280,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        osdc->osdmap = newmap;
                        if (oldmap)
                                ceph_osdmap_destroy(oldmap);
+                       kick_requests(osdc);
                }
                p += maplen;
                nr_maps--;
@@ -1308,8 +1289,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 done:
        downgrade_write(&osdc->map_sem);
        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
-       if (newmap)
-               kick_requests(osdc, NULL);
+       send_queued(osdc);
        up_read(&osdc->map_sem);
        wake_up_all(&osdc->client->auth_wq);
        return;
@@ -1347,15 +1327,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
         * the request still han't been touched yet.
         */
        if (req->r_sent == 0) {
-               rc = __send_request(osdc, req);
-               if (rc) {
-                       if (nofail) {
-                               dout("osdc_start_request failed send, "
-                                    " marking %lld\n", req->r_tid);
-                               req->r_resend = true;
-                               rc = 0;
-                       } else {
-                               __unregister_request(osdc, req);
+               rc = __map_request(osdc, req);
+               if (rc < 0)
+                       return rc;
+               if (req->r_osd == NULL) {
+                       dout("send_request %p no up osds in pg\n", req);
+                       ceph_monc_request_next_osdmap(&osdc->client->monc);
+               } else {
+                       rc = __send_request(osdc, req);
+                       if (rc) {
+                               if (nofail) {
+                                       dout("osdc_start_request failed send, "
+                                            " will retry %lld\n", req->r_tid);
+                                       rc = 0;
+                               } else {
+                                       __unregister_request(osdc, req);
+                               }
                        }
                }
        }
@@ -1441,6 +1428,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
        INIT_LIST_HEAD(&osdc->req_lru);
+       INIT_LIST_HEAD(&osdc->req_unsent);
+       INIT_LIST_HEAD(&osdc->req_notarget);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);