libceph: add lingering request and watch/notify event framework
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 21 Mar 2011 22:07:16 +0000 (15:07 -0700)
committerSage Weil <sage@newdream.net>
Tue, 22 Mar 2011 18:33:55 +0000 (11:33 -0700)
Lingering requests are requests that are sent to the OSD normally but
tracked also after we get a successful request.  This keeps the OSD
connection open and resends the original request if the object moves to
another OSD.  The OSD can then send notification messages back to us
if another client initiates a notify.

This framework will be used by RBD so that the client gets notification
when a snapshot is created by another node or tool.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
include/linux/ceph/osd_client.h
net/ceph/ceph_common.c
net/ceph/osd_client.c

index e791b8e4635391ecd1b628d7edb8b93c9c2f72f9..f88eacb111d4151dc5491797fe0e03faaadc26a9 100644 (file)
@@ -32,6 +32,7 @@ struct ceph_osd {
        struct rb_node o_node;
        struct ceph_connection o_con;
        struct list_head o_requests;
+       struct list_head o_linger_requests;
        struct list_head o_osd_lru;
        struct ceph_authorizer *o_authorizer;
        void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@ struct ceph_osd_request {
        struct rb_node  r_node;
        struct list_head r_req_lru_item;
        struct list_head r_osd_item;
+       struct list_head r_linger_item;
+       struct list_head r_linger_osd;
        struct ceph_osd *r_osd;
        struct ceph_pg   r_pgid;
        int              r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@ struct ceph_osd_request {
        int               r_flags;     /* any additional flags for the osd */
        u32               r_sent;      /* >0 if r_request is sending/sent */
        int               r_got_reply;
+       int               r_linger;
 
        struct ceph_osd_client *r_osdc;
        struct kref       r_kref;
@@ -89,6 +93,26 @@ struct ceph_osd_request {
        struct ceph_pagelist *r_trail;        /* trailing part of the data */
 };
 
+struct ceph_osd_event {
+       u64 cookie;
+       int one_shot;
+       struct ceph_osd_client *osdc;
+       void (*cb)(u64, u64, u8, void *);
+       void *data;
+       struct rb_node node;
+       struct list_head osd_node;
+       struct kref kref;
+       struct completion completion;
+};
+
+struct ceph_osd_event_work {
+       struct work_struct work;
+       struct ceph_osd_event *event;
+        u64 ver;
+        u64 notify_id;
+        u8 opcode;
+};
+
 struct ceph_osd_client {
        struct ceph_client     *client;
 
@@ -106,6 +130,7 @@ struct ceph_osd_client {
        struct list_head       req_lru;       /* in-flight lru */
        struct list_head       req_unsent;    /* unsent/need-resend queue */
        struct list_head       req_notarget;  /* map to no osd */
+       struct list_head       req_linger;    /* lingering requests */
        int                    num_requests;
        struct delayed_work    timeout_work;
        struct delayed_work    osds_timeout_work;
@@ -117,6 +142,12 @@ struct ceph_osd_client {
 
        struct ceph_msgpool     msgpool_op;
        struct ceph_msgpool     msgpool_op_reply;
+
+       spinlock_t              event_lock;
+       struct rb_root          event_tree;
+       u64                     event_count;
+
+       struct workqueue_struct *notify_wq;
 };
 
 struct ceph_osd_req_op {
@@ -151,6 +182,13 @@ struct ceph_osd_req_op {
                struct {
                        u64 snapid;
                } snap;
+               struct {
+                       u64 cookie;
+                       u64 ver;
+                       __u8 flag;
+                       u32 prot_ver;
+                       u32 timeout;
+               } watch;
        };
        u32 payload_len;
 };
@@ -199,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      bool use_mempool, int num_reply,
                                      int page_align);
 
+extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+                                        struct ceph_osd_request *req);
+extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+                                               struct ceph_osd_request *req);
+
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {
        kref_get(&req->r_kref);
@@ -234,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct page **pages, int nr_pages,
                                int flags, int do_sync, bool nofail);
 
+/* watch/notify events */
+extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+                                 void (*event_cb)(u64, u64, u8, void *),
+                                 int one_shot, void *data,
+                                 struct ceph_osd_event **pevent);
+extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
+extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
+                               unsigned long timeout);
+extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
 
index f3e4a13fea0c8a5337ce5383951fb389ea24144a..95f96ab94bba12b57052705d385be749d6183c53 100644 (file)
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_OSD_MAP: return "osd_map";
        case CEPH_MSG_OSD_OP: return "osd_op";
        case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
+       case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
        default: return "unknown";
        }
 }
index b85ed5a5503dde0640c3eb17003c5e736ffef7c5..02212ed50852eaa41b5d0c75c7f121b77f074ce3 100644 (file)
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
 
 static void send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req);
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req);
+static int __send_request(struct ceph_osd_client *osdc,
+                         struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
        case CEPH_OSD_OP_CALL:
+       case CEPH_OSD_OP_NOTIFY:
                return 1;
        default:
                return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
+       INIT_LIST_HEAD(&req->r_linger_item);
+       INIT_LIST_HEAD(&req->r_linger_osd);
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
+       case CEPH_OSD_OP_NOTIFY:
+               {
+                       __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
+                       __le32 timeout = cpu_to_le32(src->watch.timeout);
+
+                       BUG_ON(!req->r_trail);
+
+                       ceph_pagelist_append(req->r_trail,
+                                               &prot_ver, sizeof(prot_ver));
+                       ceph_pagelist_append(req->r_trail,
+                                               &timeout, sizeof(timeout));
+               }
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+               dst->watch.ver = cpu_to_le64(src->watch.ver);
+               dst->watch.flag = src->watch.flag;
+               break;
        default:
                pr_err("unrecognized osd opcode %d\n", dst->op);
                WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
 static void __kick_osd_requests(struct ceph_osd_client *osdc,
                                struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *nreq;
        int err;
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
                list_move(&req->r_req_lru_item, &osdc->req_unsent);
                dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
                     osd->o_osd);
-               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               if (!req->r_linger)
+                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+       }
+
+       list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
+                                r_linger_osd) {
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
+                    osd->o_osd);
        }
 }
 
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        INIT_LIST_HEAD(&osd->o_requests);
+       INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        int ret = 0;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-       if (list_empty(&osd->o_requests)) {
+       if (list_empty(&osd->o_requests) &&
+           list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
  */
-static void register_request(struct ceph_osd_client *osdc,
-                            struct ceph_osd_request *req)
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req)
 {
-       mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
        INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
                dout(" first request, scheduling timeout\n");
                __schedule_osd_timeout(osdc);
        }
+}
+
+static void register_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       __register_request(osdc, req);
        mutex_unlock(&osdc->request_mutex);
 }
 
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests))
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
                        __move_osd_to_lru(osdc, req->r_osd);
-               req->r_osd = NULL;
+               }
+               if (list_empty(&req->r_osd_item) &&
+                   list_empty(&req->r_linger_item))
+                       req->r_osd = NULL;
        }
 
        ceph_osdc_put_request(req);
@@ -812,6 +862,58 @@ static void __cancel_request(struct ceph_osd_request *req)
        }
 }
 
+static void __register_linger_request(struct ceph_osd_client *osdc,
+                                   struct ceph_osd_request *req)
+{
+       dout("__register_linger_request %p\n", req);
+       list_add_tail(&req->r_linger_item, &osdc->req_linger);
+       list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+}
+
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req)
+{
+       dout("__unregister_linger_request %p\n", req);
+       if (req->r_osd) {
+               list_del_init(&req->r_linger_item);
+               list_del_init(&req->r_linger_osd);
+
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
+                       __move_osd_to_lru(osdc, req->r_osd);
+               }
+               req->r_osd = NULL;
+       }
+}
+
+void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+                                        struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       if (req->r_linger) {
+               __unregister_linger_request(osdc, req);
+               ceph_osdc_put_request(req);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
+
+void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+                                 struct ceph_osd_request *req)
+{
+       if (!req->r_linger) {
+               dout("set_request_linger %p\n", req);
+               req->r_linger = 1;
+               /*
+                * caller is now responsible for calling
+                * unregister_linger_request
+                */
+               ceph_osdc_get_request(req);
+       }
+}
+EXPORT_SYMBOL(ceph_osdc_set_request_linger);
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
                osdc->client->options->osd_keepalive_timeout * HZ;
        unsigned long last_stamp = 0;
        struct list_head slow_osds;
-
        dout("timeout\n");
        down_read(&osdc->map_sem);
 
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
            numops * sizeof(struct ceph_osd_op))
                goto bad;
        dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
-
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 
        dout("handle_reply tid %llu flags %d\n", tid, flags);
 
+       if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
+               __register_linger_request(osdc, req);
+
        /* either this is a read, or we got the safe response */
        if (result < 0 ||
            (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        }
 
 done:
+       dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
 
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  */
 static void kick_requests(struct ceph_osd_client *osdc)
 {
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *nreq;
        struct rb_node *p;
        int needmap = 0;
        int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
                } else if (err > 0) {
                        dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
                             req->r_osd ? req->r_osd->o_osd : -1);
-                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+                       if (!req->r_linger)
+                               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               }
+       }
+
+       list_for_each_entry_safe(req, nreq, &osdc->req_linger,
+                                r_linger_item) {
+               dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
+
+               err = __map_request(osdc, req);
+               if (err == 0)
+                       continue;  /* no change and no osd was specified */
+               if (err < 0)
+                       continue;  /* hrm! */
+               if (req->r_osd == NULL) {
+                       dout("tid %llu maps to no valid osd\n", req->r_tid);
+                       needmap++;  /* request a newer map */
+                       continue;
                }
+
+               dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
+                    req->r_osd ? req->r_osd->o_osd : -1);
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -1301,6 +1427,223 @@ bad:
        return;
 }
 
+/*
+ * watch/notify callback event infrastructure
+ *
+ * These callbacks are used both for watch and notify operations.
+ */
+static void __release_event(struct kref *kref)
+{
+       struct ceph_osd_event *event =
+               container_of(kref, struct ceph_osd_event, kref);
+
+       dout("__release_event %p\n", event);
+       kfree(event);
+}
+
+static void get_event(struct ceph_osd_event *event)
+{
+       kref_get(&event->kref);
+}
+
+void ceph_osdc_put_event(struct ceph_osd_event *event)
+{
+       kref_put(&event->kref, __release_event);
+}
+EXPORT_SYMBOL(ceph_osdc_put_event);
+
+static void __insert_event(struct ceph_osd_client *osdc,
+                            struct ceph_osd_event *new)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (new->cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (new->cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, &osdc->event_tree);
+}
+
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                               u64 cookie)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       return event;
+       }
+       return NULL;
+}
+
+static void __remove_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       if (!RB_EMPTY_NODE(&event->node)) {
+               dout("__remove_event removed %p\n", event);
+               rb_erase(&event->node, &osdc->event_tree);
+               ceph_osdc_put_event(event);
+       } else {
+               dout("__remove_event didn't remove %p\n", event);
+       }
+}
+
+int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+                          void (*event_cb)(u64, u64, u8, void *),
+                          int one_shot, void *data,
+                          struct ceph_osd_event **pevent)
+{
+       struct ceph_osd_event *event;
+
+       event = kmalloc(sizeof(*event), GFP_NOIO);
+       if (!event)
+               return -ENOMEM;
+
+       dout("create_event %p\n", event);
+       event->cb = event_cb;
+       event->one_shot = one_shot;
+       event->data = data;
+       event->osdc = osdc;
+       INIT_LIST_HEAD(&event->osd_node);
+       kref_init(&event->kref);   /* one ref for us */
+       kref_get(&event->kref);    /* one ref for the caller */
+       init_completion(&event->completion);
+
+       spin_lock(&osdc->event_lock);
+       event->cookie = ++osdc->event_count;
+       __insert_event(osdc, event);
+       spin_unlock(&osdc->event_lock);
+
+       *pevent = event;
+       return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_event);
+
+void ceph_osdc_cancel_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       dout("cancel_event %p\n", event);
+       spin_lock(&osdc->event_lock);
+       __remove_event(event);
+       spin_unlock(&osdc->event_lock);
+       ceph_osdc_put_event(event); /* caller's */
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_event);
+
+
+static void do_event_work(struct work_struct *work)
+{
+       struct ceph_osd_event_work *event_work =
+               container_of(work, struct ceph_osd_event_work, work);
+       struct ceph_osd_event *event = event_work->event;
+       u64 ver = event_work->ver;
+       u64 notify_id = event_work->notify_id;
+       u8 opcode = event_work->opcode;
+
+       dout("do_event_work completing %p\n", event);
+       event->cb(ver, notify_id, opcode, event->data);
+       complete(&event->completion);
+       dout("do_event_work completed %p\n", event);
+       ceph_osdc_put_event(event);
+       kfree(event_work);
+}
+
+
+/*
+ * Process osd watch notifications
+ */
+void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+       void *p, *end;
+       u8 proto_ver;
+       u64 cookie, ver, notify_id;
+       u8 opcode;
+       struct ceph_osd_event *event;
+       struct ceph_osd_event_work *event_work;
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_8_safe(&p, end, proto_ver, bad);
+       ceph_decode_8_safe(&p, end, opcode, bad);
+       ceph_decode_64_safe(&p, end, cookie, bad);
+       ceph_decode_64_safe(&p, end, ver, bad);
+       ceph_decode_64_safe(&p, end, notify_id, bad);
+
+       spin_lock(&osdc->event_lock);
+       event = __find_event(osdc, cookie);
+       if (event) {
+               get_event(event);
+               if (event->one_shot)
+                       __remove_event(event);
+       }
+       spin_unlock(&osdc->event_lock);
+       dout("handle_watch_notify cookie %lld ver %lld event %p\n",
+            cookie, ver, event);
+       if (event) {
+               event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
+               INIT_WORK(&event_work->work, do_event_work);
+               if (!event_work) {
+                       dout("ERROR: could not allocate event_work\n");
+                       goto done_err;
+               }
+               event_work->event = event;
+               event_work->ver = ver;
+               event_work->notify_id = notify_id;
+               event_work->opcode = opcode;
+               if (!queue_work(osdc->notify_wq, &event_work->work)) {
+                       dout("WARNING: failed to queue notify event work\n");
+                       goto done_err;
+               }
+       }
+
+       return;
+
+done_err:
+       complete(&event->completion);
+       ceph_osdc_put_event(event);
+       return;
+
+bad:
+       pr_err("osdc handle_watch_notify corrupt msg\n");
+       return;
+}
+
+int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+{
+       int err;
+
+       dout("wait_event %p\n", event);
+       err = wait_for_completion_interruptible_timeout(&event->completion,
+                                                       timeout * HZ);
+       ceph_osdc_put_event(event);
+       if (err > 0)
+               err = 0;
+       dout("wait_event %p returns %d\n", event, err);
+       return err;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
+
 /*
  * Register request, send initial attempt.
  */
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        INIT_LIST_HEAD(&osdc->req_lru);
        INIT_LIST_HEAD(&osdc->req_unsent);
        INIT_LIST_HEAD(&osdc->req_notarget);
+       INIT_LIST_HEAD(&osdc->req_linger);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+       spin_lock_init(&osdc->event_lock);
+       osdc->event_tree = RB_ROOT;
+       osdc->event_count = 0;
 
        schedule_delayed_work(&osdc->osds_timeout_work,
           round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                                "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
+
+       osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
+       if (IS_ERR(osdc->notify_wq)) {
+               err = PTR_ERR(osdc->notify_wq);
+               osdc->notify_wq = NULL;
+               goto out_msgpool;
+       }
        return 0;
 
 out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
+       flush_workqueue(osdc->notify_wq);
+       destroy_workqueue(osdc->notify_wq);
        cancel_delayed_work_sync(&osdc->timeout_work);
        cancel_delayed_work_sync(&osdc->osds_timeout_work);
        if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
                osdc->osdmap = NULL;
        }
        remove_old_osds(osdc, 1);
+       WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        case CEPH_MSG_OSD_OPREPLY:
                handle_reply(osdc, msg, con);
                break;
+       case CEPH_MSG_WATCH_NOTIFY:
+               handle_watch_notify(osdc, msg);
+               break;
 
        default:
                pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 
        switch (type) {
        case CEPH_MSG_OSD_MAP:
+       case CEPH_MSG_WATCH_NOTIFY:
                return ceph_msg_new(type, front, GFP_NOFS);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);