Fix common misspellings
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / net / ceph / osd_client.c
index b85ed5a5503dde0640c3eb17003c5e736ffef7c5..8d4ee7e01793b6646f6e72abc6b4f0f26d8264d8 100644 (file)
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
 
 static void send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req);
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req);
+static int __send_request(struct ceph_osd_client *osdc,
+                         struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
        case CEPH_OSD_OP_CALL:
+       case CEPH_OSD_OP_NOTIFY:
                return 1;
        default:
                return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
+       INIT_LIST_HEAD(&req->r_linger_item);
+       INIT_LIST_HEAD(&req->r_linger_osd);
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
+       case CEPH_OSD_OP_NOTIFY:
+               {
+                       __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
+                       __le32 timeout = cpu_to_le32(src->watch.timeout);
+
+                       BUG_ON(!req->r_trail);
+
+                       ceph_pagelist_append(req->r_trail,
+                                               &prot_ver, sizeof(prot_ver));
+                       ceph_pagelist_append(req->r_trail,
+                                               &timeout, sizeof(timeout));
+               }
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+               dst->watch.ver = cpu_to_le64(src->watch.ver);
+               dst->watch.flag = src->watch.flag;
+               break;
        default:
                pr_err("unrecognized osd opcode %d\n", dst->op);
                WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
 static void __kick_osd_requests(struct ceph_osd_client *osdc,
                                struct ceph_osd *osd)
 {
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *nreq;
        int err;
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
                list_move(&req->r_req_lru_item, &osdc->req_unsent);
                dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
                     osd->o_osd);
-               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               if (!req->r_linger)
+                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+       }
+
+       list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
+                                r_linger_osd) {
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
+                    osd->o_osd);
        }
 }
 
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        INIT_LIST_HEAD(&osd->o_requests);
+       INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        int ret = 0;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-       if (list_empty(&osd->o_requests)) {
+       if (list_empty(&osd->o_requests) &&
+           list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
  */
-static void register_request(struct ceph_osd_client *osdc,
-                            struct ceph_osd_request *req)
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req)
 {
-       mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
        INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
                dout(" first request, scheduling timeout\n");
                __schedule_osd_timeout(osdc);
        }
+}
+
+static void register_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       __register_request(osdc, req);
        mutex_unlock(&osdc->request_mutex);
 }
 
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests))
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
                        __move_osd_to_lru(osdc, req->r_osd);
-               req->r_osd = NULL;
+               }
+               if (list_empty(&req->r_osd_item) &&
+                   list_empty(&req->r_linger_item))
+                       req->r_osd = NULL;
        }
 
        ceph_osdc_put_request(req);
@@ -812,10 +862,62 @@ static void __cancel_request(struct ceph_osd_request *req)
        }
 }
 
+static void __register_linger_request(struct ceph_osd_client *osdc,
+                                   struct ceph_osd_request *req)
+{
+       dout("__register_linger_request %p\n", req);
+       list_add_tail(&req->r_linger_item, &osdc->req_linger);
+       list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+}
+
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req)
+{
+       dout("__unregister_linger_request %p\n", req);
+       if (req->r_osd) {
+               list_del_init(&req->r_linger_item);
+               list_del_init(&req->r_linger_osd);
+
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
+                       __move_osd_to_lru(osdc, req->r_osd);
+               }
+               req->r_osd = NULL;
+       }
+}
+
+void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+                                        struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       if (req->r_linger) {
+               __unregister_linger_request(osdc, req);
+               ceph_osdc_put_request(req);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
+
+void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+                                 struct ceph_osd_request *req)
+{
+       if (!req->r_linger) {
+               dout("set_request_linger %p\n", req);
+               req->r_linger = 1;
+               /*
+                * caller is now responsible for calling
+                * unregister_linger_request
+                */
+               ceph_osdc_get_request(req);
+       }
+}
+EXPORT_SYMBOL(ceph_osdc_set_request_linger);
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.  Move the request to the appropiate list
+ * no up osd, set r_osd to NULL.  Move the request to the appropriate list
  * (unsent, homeless) or leave on in-flight lru.
  *
  * Return 0 if unchanged, 1 if changed, or negative on error.
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
                osdc->client->options->osd_keepalive_timeout * HZ;
        unsigned long last_stamp = 0;
        struct list_head slow_osds;
-
        dout("timeout\n");
        down_read(&osdc->map_sem);
 
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
            numops * sizeof(struct ceph_osd_op))
                goto bad;
        dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
-
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 
        dout("handle_reply tid %llu flags %d\n", tid, flags);
 
+       if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
+               __register_linger_request(osdc, req);
+
        /* either this is a read, or we got the safe response */
        if (result < 0 ||
            (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        }
 
 done:
+       dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
 
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  */
 static void kick_requests(struct ceph_osd_client *osdc)
 {
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *nreq;
        struct rb_node *p;
        int needmap = 0;
        int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
                } else if (err > 0) {
                        dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
                             req->r_osd ? req->r_osd->o_osd : -1);
-                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+                       if (!req->r_linger)
+                               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               }
+       }
+
+       list_for_each_entry_safe(req, nreq, &osdc->req_linger,
+                                r_linger_item) {
+               dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
+
+               err = __map_request(osdc, req);
+               if (err == 0)
+                       continue;  /* no change and no osd was specified */
+               if (err < 0)
+                       continue;  /* hrm! */
+               if (req->r_osd == NULL) {
+                       dout("tid %llu maps to no valid osd\n", req->r_tid);
+                       needmap++;  /* request a newer map */
+                       continue;
                }
+
+               dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
+                    req->r_osd ? req->r_osd->o_osd : -1);
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -1301,6 +1427,223 @@ bad:
        return;
 }
 
+/*
+ * watch/notify callback event infrastructure
+ *
+ * These callbacks are used both for watch and notify operations.
+ */
+static void __release_event(struct kref *kref)
+{
+       struct ceph_osd_event *event =
+               container_of(kref, struct ceph_osd_event, kref);
+
+       dout("__release_event %p\n", event);
+       kfree(event);
+}
+
+static void get_event(struct ceph_osd_event *event)
+{
+       kref_get(&event->kref);
+}
+
+void ceph_osdc_put_event(struct ceph_osd_event *event)
+{
+       kref_put(&event->kref, __release_event);
+}
+EXPORT_SYMBOL(ceph_osdc_put_event);
+
+static void __insert_event(struct ceph_osd_client *osdc,
+                            struct ceph_osd_event *new)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (new->cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (new->cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, &osdc->event_tree);
+}
+
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                               u64 cookie)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       return event;
+       }
+       return NULL;
+}
+
+static void __remove_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       if (!RB_EMPTY_NODE(&event->node)) {
+               dout("__remove_event removed %p\n", event);
+               rb_erase(&event->node, &osdc->event_tree);
+               ceph_osdc_put_event(event);
+       } else {
+               dout("__remove_event didn't remove %p\n", event);
+       }
+}
+
+int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+                          void (*event_cb)(u64, u64, u8, void *),
+                          int one_shot, void *data,
+                          struct ceph_osd_event **pevent)
+{
+       struct ceph_osd_event *event;
+
+       event = kmalloc(sizeof(*event), GFP_NOIO);
+       if (!event)
+               return -ENOMEM;
+
+       dout("create_event %p\n", event);
+       event->cb = event_cb;
+       event->one_shot = one_shot;
+       event->data = data;
+       event->osdc = osdc;
+       INIT_LIST_HEAD(&event->osd_node);
+       kref_init(&event->kref);   /* one ref for us */
+       kref_get(&event->kref);    /* one ref for the caller */
+       init_completion(&event->completion);
+
+       spin_lock(&osdc->event_lock);
+       event->cookie = ++osdc->event_count;
+       __insert_event(osdc, event);
+       spin_unlock(&osdc->event_lock);
+
+       *pevent = event;
+       return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_event);
+
+void ceph_osdc_cancel_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       dout("cancel_event %p\n", event);
+       spin_lock(&osdc->event_lock);
+       __remove_event(event);
+       spin_unlock(&osdc->event_lock);
+       ceph_osdc_put_event(event); /* caller's */
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_event);
+
+
+static void do_event_work(struct work_struct *work)
+{
+       struct ceph_osd_event_work *event_work =
+               container_of(work, struct ceph_osd_event_work, work);
+       struct ceph_osd_event *event = event_work->event;
+       u64 ver = event_work->ver;
+       u64 notify_id = event_work->notify_id;
+       u8 opcode = event_work->opcode;
+
+       dout("do_event_work completing %p\n", event);
+       event->cb(ver, notify_id, opcode, event->data);
+       complete(&event->completion);
+       dout("do_event_work completed %p\n", event);
+       ceph_osdc_put_event(event);
+       kfree(event_work);
+}
+
+
+/*
+ * Process osd watch notifications
+ */
+void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+       void *p, *end;
+       u8 proto_ver;
+       u64 cookie, ver, notify_id;
+       u8 opcode;
+       struct ceph_osd_event *event;
+       struct ceph_osd_event_work *event_work;
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_8_safe(&p, end, proto_ver, bad);
+       ceph_decode_8_safe(&p, end, opcode, bad);
+       ceph_decode_64_safe(&p, end, cookie, bad);
+       ceph_decode_64_safe(&p, end, ver, bad);
+       ceph_decode_64_safe(&p, end, notify_id, bad);
+
+       spin_lock(&osdc->event_lock);
+       event = __find_event(osdc, cookie);
+       if (event) {
+               get_event(event);
+               if (event->one_shot)
+                       __remove_event(event);
+       }
+       spin_unlock(&osdc->event_lock);
+       dout("handle_watch_notify cookie %lld ver %lld event %p\n",
+            cookie, ver, event);
+       if (event) {
+               event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
+               INIT_WORK(&event_work->work, do_event_work);
+               if (!event_work) {
+                       dout("ERROR: could not allocate event_work\n");
+                       goto done_err;
+               }
+               event_work->event = event;
+               event_work->ver = ver;
+               event_work->notify_id = notify_id;
+               event_work->opcode = opcode;
+               if (!queue_work(osdc->notify_wq, &event_work->work)) {
+                       dout("WARNING: failed to queue notify event work\n");
+                       goto done_err;
+               }
+       }
+
+       return;
+
+done_err:
+       complete(&event->completion);
+       ceph_osdc_put_event(event);
+       return;
+
+bad:
+       pr_err("osdc handle_watch_notify corrupt msg\n");
+       return;
+}
+
+int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+{
+       int err;
+
+       dout("wait_event %p\n", event);
+       err = wait_for_completion_interruptible_timeout(&event->completion,
+                                                       timeout * HZ);
+       ceph_osdc_put_event(event);
+       if (err > 0)
+               err = 0;
+       dout("wait_event %p returns %d\n", event, err);
+       return err;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
+
 /*
  * Register request, send initial attempt.
  */
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        INIT_LIST_HEAD(&osdc->req_lru);
        INIT_LIST_HEAD(&osdc->req_unsent);
        INIT_LIST_HEAD(&osdc->req_notarget);
+       INIT_LIST_HEAD(&osdc->req_linger);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+       spin_lock_init(&osdc->event_lock);
+       osdc->event_tree = RB_ROOT;
+       osdc->event_count = 0;
 
        schedule_delayed_work(&osdc->osds_timeout_work,
           round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                                "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
+
+       osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
+       if (IS_ERR(osdc->notify_wq)) {
+               err = PTR_ERR(osdc->notify_wq);
+               osdc->notify_wq = NULL;
+               goto out_msgpool;
+       }
        return 0;
 
 out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
+       flush_workqueue(osdc->notify_wq);
+       destroy_workqueue(osdc->notify_wq);
        cancel_delayed_work_sync(&osdc->timeout_work);
        cancel_delayed_work_sync(&osdc->osds_timeout_work);
        if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
                osdc->osdmap = NULL;
        }
        remove_old_osds(osdc, 1);
+       WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        case CEPH_MSG_OSD_OPREPLY:
                handle_reply(osdc, msg, con);
                break;
+       case CEPH_MSG_WATCH_NOTIFY:
+               handle_watch_notify(osdc, msg);
+               break;
 
        default:
                pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 
        switch (type) {
        case CEPH_MSG_OSD_MAP:
+       case CEPH_MSG_WATCH_NOTIFY:
                return ceph_msg_new(type, front, GFP_NOFS);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);