From 922dab6134178cae317ae00de86376cba59f3147 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 26 May 2016 01:15:02 +0200 Subject: [PATCH] libceph, rbd: ceph_osd_linger_request, watch/notify v2 This adds support and switches rbd to a new, more reliable version of watch/notify protocol. As with the OSD client update, this is mostly about getting the right structures linked into the right places so that reconnects are properly sent when needed. watch/notify v2 also requires sending regular pings to the OSDs - send_linger_ping(). A major change from the old watch/notify implementation is the introduction of ceph_osd_linger_request - linger requests no longer piggy back on ceph_osd_request. ceph_osd_event has been merged into ceph_osd_linger_request. All the details are now hidden within libceph, the interface consists of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack(). ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep the lifetime management simple. ceph_osdc_notify_ack() accepts an optional data payload, which is relayed back to the notifier. Portions of this patch are loosely based on work by Douglas Fuller and Mike Christie . Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 179 ++--- include/linux/ceph/ceph_fs.h | 5 +- include/linux/ceph/osd_client.h | 97 +-- include/linux/ceph/rados.h | 17 +- net/ceph/ceph_strings.c | 16 + net/ceph/debugfs.c | 36 + net/ceph/osd_client.c | 1148 ++++++++++++++++++++++++------- 7 files changed, 1067 insertions(+), 431 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fce23dc908e3..d0834c477f96 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -351,11 +351,11 @@ struct rbd_device { struct rbd_options *opts; struct ceph_object_id header_oid; + struct ceph_object_locator header_oloc; struct ceph_file_layout layout; - struct ceph_osd_event *watch_event; - struct rbd_obj_request *watch_request; + struct ceph_osd_linger_request *watch_handle; struct rbd_spec *parent_spec; u64 parent_overlap; @@ -1596,12 +1596,6 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) return __rbd_obj_request_wait(obj_request, 0); } -static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request, - unsigned long timeout) -{ - return __rbd_obj_request_wait(obj_request, timeout); -} - static void rbd_img_request_complete(struct rbd_img_request *img_request) { @@ -1751,12 +1745,6 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) complete_all(&obj_request->completion); } -static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p\n", __func__, obj_request); - obj_request_done_set(obj_request); -} - static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = NULL; @@ -1877,10 +1865,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) case CEPH_OSD_OP_CALL: rbd_osd_call_callback(obj_request); break; - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: - rbd_osd_trivial_callback(obj_request); - break; default: rbd_warn(NULL, "%s: unsupported op %hu", obj_request->object_name, (unsigned short) opcode); @@ -3100,45 +3084,18 @@ out_err: obj_request_done_set(obj_request); } -static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) -{ - struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - int ret; - - obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) - return -ENOMEM; +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); +static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev); - ret = -ENOMEM; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; - - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, - notify_id, 0, 0); - rbd_osd_req_format_read(obj_request); - - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; - ret = rbd_obj_request_wait(obj_request); -out: - rbd_obj_request_put(obj_request); - - return ret; -} - -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) { - struct rbd_device *rbd_dev = (struct rbd_device *)data; + struct rbd_device *rbd_dev = arg; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, - rbd_dev->header_oid.name, (unsigned long long)notify_id, - (unsigned int)opcode); + dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, + cookie, notify_id); /* * Until adequate refresh error handling is in place, there is @@ -3150,63 +3107,31 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) if (ret) rbd_warn(rbd_dev, "refresh failed: %d", ret); - ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id); + ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, notify_id, cookie, + NULL, 0); if (ret) rbd_warn(rbd_dev, "notify_ack ret %d", ret); } -/* - * Send a (un)watch request and wait for the ack. Return a request - * with a ref held on success or error. - */ -static struct rbd_obj_request *rbd_obj_watch_request_helper( - struct rbd_device *rbd_dev, - bool watch) +static void rbd_watch_errcb(void *arg, u64 cookie, int err) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct ceph_options *opts = osdc->client->options; - struct rbd_obj_request *obj_request; + struct rbd_device *rbd_dev = arg; int ret; - obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) - return ERR_PTR(-ENOMEM); - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1, - obj_request); - if (!obj_request->osd_req) { - ret = -ENOMEM; - goto out; - } - - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, watch); - rbd_osd_req_format_write(obj_request); - - if (watch) - ceph_osdc_set_request_linger(osdc, obj_request->osd_req); - - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_warn(rbd_dev, "encountered watch error: %d", err); - ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout); - if (ret) - goto out; + __rbd_dev_header_unwatch_sync(rbd_dev); - ret = obj_request->result; + ret = rbd_dev_header_watch_sync(rbd_dev); if (ret) { - if (watch) - rbd_obj_request_end(obj_request); - goto out; + rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); + return; } - return obj_request; - -out: - rbd_obj_request_put(obj_request); - return ERR_PTR(ret); + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); } /* @@ -3215,57 +3140,33 @@ out: static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct rbd_obj_request *obj_request; - int ret; + struct ceph_osd_linger_request *handle; - rbd_assert(!rbd_dev->watch_event); - rbd_assert(!rbd_dev->watch_request); - - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, - &rbd_dev->watch_event); - if (ret < 0) - return ret; - - obj_request = rbd_obj_watch_request_helper(rbd_dev, true); - if (IS_ERR(obj_request)) { - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; - return PTR_ERR(obj_request); - } + rbd_assert(!rbd_dev->watch_handle); - /* - * A watch request is set to linger, so the underlying osd - * request won't go away until we unregister it. We retain - * a pointer to the object request during that time (in - * rbd_dev->watch_request), so we'll keep a reference to it. - * We'll drop that reference after we've unregistered it in - * rbd_dev_header_unwatch_sync(). - */ - rbd_dev->watch_request = obj_request; + handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, rbd_watch_cb, + rbd_watch_errcb, rbd_dev); + if (IS_ERR(handle)) + return PTR_ERR(handle); + rbd_dev->watch_handle = handle; return 0; } static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { - struct rbd_obj_request *obj_request; - - rbd_assert(rbd_dev->watch_event); - rbd_assert(rbd_dev->watch_request); + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int ret; - rbd_obj_request_end(rbd_dev->watch_request); - rbd_obj_request_put(rbd_dev->watch_request); - rbd_dev->watch_request = NULL; + if (!rbd_dev->watch_handle) + return; - obj_request = rbd_obj_watch_request_helper(rbd_dev, false); - if (!IS_ERR(obj_request)) - rbd_obj_request_put(obj_request); - else - rbd_warn(rbd_dev, "unable to tear down watch request (%ld)", - PTR_ERR(obj_request)); + ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); + if (ret) + rbd_warn(rbd_dev, "failed to unwatch: %d", ret); - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; + rbd_dev->watch_handle = NULL; } /* @@ -4081,6 +3982,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, init_rwsem(&rbd_dev->header_rwsem); ceph_oid_init(&rbd_dev->header_oid); + ceph_oloc_init(&rbd_dev->header_oloc); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; @@ -5285,6 +5187,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); if (rbd_dev->image_format == 1) ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", spec->image_name, RBD_SUFFIX); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 37f28bf55ce4..3b911ff889dd 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -153,8 +153,9 @@ struct ceph_dir_layout { /* watch-notify operations */ enum { - WATCH_NOTIFY = 1, /* notifying watcher */ - WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */ + CEPH_WATCH_EVENT_NOTIFY = 1, /* notifying watcher */ + CEPH_WATCH_EVENT_NOTIFY_COMPLETE = 2, /* notifier notified when done */ + CEPH_WATCH_EVENT_DISCONNECT = 3, /* we were disconnected */ }; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 342f22f1f040..cd2dcb8939de 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -34,7 +34,7 @@ struct ceph_osd { struct rb_node o_node; struct ceph_connection o_con; struct rb_root o_requests; - struct list_head o_linger_requests; + struct rb_root o_linger_requests; struct list_head o_osd_lru; struct ceph_auth_handshake o_auth; unsigned long lru_ttl; @@ -108,11 +108,12 @@ struct ceph_osd_req_op { } cls; struct { u64 cookie; - u64 ver; - u32 prot_ver; - u32 timeout; - __u8 flag; + __u8 op; /* CEPH_OSD_WATCH_OP_ */ + u32 gen; } watch; + struct { + struct ceph_osd_data request_data; + } notify_ack; struct { u64 expected_object_size; u64 expected_write_size; @@ -145,8 +146,6 @@ struct ceph_osd_request_target { struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; - struct list_head r_linger_item; - struct list_head r_linger_osd_item; struct ceph_osd *r_osd; struct ceph_osd_request_target r_t; @@ -162,7 +161,6 @@ struct ceph_osd_request { int r_result; bool r_got_reply; - int r_linger; struct ceph_osd_client *r_osdc; struct kref r_kref; @@ -181,6 +179,7 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* for writes */ struct timespec r_mtime; /* ditto */ u64 r_data_offset; /* ditto */ + bool r_linger; /* don't resend on failure */ /* internal */ unsigned long r_stamp; /* jiffies, send or check time */ @@ -195,23 +194,40 @@ struct ceph_request_redirect { struct ceph_object_locator oloc; }; -struct ceph_osd_event { - u64 cookie; - int one_shot; +typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len); +typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err); + +struct ceph_osd_linger_request { struct ceph_osd_client *osdc; - void (*cb)(u64, u64, u8, void *); - void *data; - struct rb_node node; - struct list_head osd_node; + u64 linger_id; + bool committed; + + struct ceph_osd *osd; + struct ceph_osd_request *reg_req; + struct ceph_osd_request *ping_req; + unsigned long ping_sent; + + struct ceph_osd_request_target t; + u32 last_force_resend; + + struct timespec mtime; + struct kref kref; -}; + struct mutex lock; + struct rb_node node; /* osd */ + struct rb_node osdc_node; /* osdc */ + struct list_head scan_item; + + struct completion reg_commit_wait; + int reg_commit_error; + int last_error; + + u32 register_gen; -struct ceph_osd_event_work { - struct work_struct work; - struct ceph_osd_event *event; - u64 ver; - u64 notify_id; - u8 opcode; + rados_watchcb2_t wcb; + rados_watcherrcb_t errcb; + void *data; }; struct ceph_osd_client { @@ -223,9 +239,10 @@ struct ceph_osd_client { struct rb_root osds; /* osds */ struct list_head osd_lru; /* idle osds */ spinlock_t osd_lru_lock; - struct list_head req_linger; /* lingering requests */ struct ceph_osd homeless_osd; atomic64_t last_tid; /* tid of last request */ + u64 last_linger_id; + struct rb_root linger_requests; /* lingering requests */ atomic_t num_requests; atomic_t num_homeless; struct delayed_work timeout_work; @@ -239,10 +256,6 @@ struct ceph_osd_client { struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op_reply; - spinlock_t event_lock; - struct rb_root event_tree; - u64 event_count; - struct workqueue_struct *notify_wq; }; @@ -314,9 +327,6 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode); -extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - u64 cookie, u64 version, int flag); extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, unsigned int which, u64 expected_object_size, @@ -339,9 +349,6 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, u32 truncate_seq, u64 truncate_size, bool use_mempool); -extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); - extern void ceph_osdc_get_request(struct ceph_osd_request *req); extern void ceph_osdc_put_request(struct ceph_osd_request *req); @@ -372,11 +379,23 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct timespec *mtime, struct page **pages, int nr_pages); -/* watch/notify events */ -extern int ceph_osdc_create_event(struct ceph_osd_client *osdc, - void (*event_cb)(u64, u64, u8, void *), - void *data, struct ceph_osd_event **pevent); -extern void ceph_osdc_cancel_event(struct ceph_osd_event *event); -extern void ceph_osdc_put_event(struct ceph_osd_event *event); +/* watch/notify */ +struct ceph_osd_linger_request * +ceph_osdc_watch(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + rados_watchcb2_t wcb, + rados_watcherrcb_t errcb, + void *data); +int ceph_osdc_unwatch(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq); + +int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 notify_id, + u64 cookie, + void *payload, + size_t payload_len); #endif diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 28740a58f32c..204c8c944703 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -427,7 +427,17 @@ enum { CEPH_OSD_CMPXATTR_MODE_U64 = 2 }; -#define RADOS_NOTIFY_VER 1 +enum { + CEPH_OSD_WATCH_OP_UNWATCH = 0, + CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, + /* note: use only ODD ids to prevent pre-giant code from + interpreting the op as UNWATCH */ + CEPH_OSD_WATCH_OP_WATCH = 3, + CEPH_OSD_WATCH_OP_RECONNECT = 5, + CEPH_OSD_WATCH_OP_PING = 7, +}; + +const char *ceph_osd_watch_op_name(int o); /* * an individual object operation. each may be accompanied by some data @@ -462,8 +472,9 @@ struct ceph_osd_op { } __attribute__ ((packed)) snap; struct { __le64 cookie; - __le64 ver; - __u8 flag; /* 0 = unwatch, 1 = watch */ + __le64 ver; /* no longer used */ + __u8 op; /* CEPH_OSD_WATCH_OP_* */ + __le32 gen; /* registration generation */ } __attribute__ ((packed)) watch; struct { __le64 offset, length; diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 139a9cb19b0c..3773a4fa11e3 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -27,6 +27,22 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) } } +const char *ceph_osd_watch_op_name(int o) +{ + switch (o) { + case CEPH_OSD_WATCH_OP_UNWATCH: + return "unwatch"; + case CEPH_OSD_WATCH_OP_WATCH: + return "watch"; + case CEPH_OSD_WATCH_OP_RECONNECT: + return "reconnect"; + case CEPH_OSD_WATCH_OP_PING: + return "ping"; + default: + return "???"; + } +} + const char *ceph_osd_state_name(int s) { switch (s) { diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 61dbd9de4650..e64cb8583533 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -177,6 +177,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req) seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), ceph_osd_op_name(op->op)); + if (op->op == CEPH_OSD_OP_WATCH) + seq_printf(s, "-%s", + ceph_osd_watch_op_name(op->watch.op)); } seq_putc(s, '\n'); @@ -197,6 +200,31 @@ static void dump_requests(struct seq_file *s, struct ceph_osd *osd) mutex_unlock(&osd->lock); } +static void dump_linger_request(struct seq_file *s, + struct ceph_osd_linger_request *lreq) +{ + seq_printf(s, "%llu\t", lreq->linger_id); + dump_target(s, &lreq->t); + + seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen, + lreq->committed ? "C" : "", lreq->last_error); +} + +static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd) +{ + struct rb_node *n; + + mutex_lock(&osd->lock); + for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + dump_linger_request(s, lreq); + } + + mutex_unlock(&osd->lock); +} + static int osdc_show(struct seq_file *s, void *pp) { struct ceph_client *client = s->private; @@ -214,6 +242,14 @@ static int osdc_show(struct seq_file *s, void *pp) } dump_requests(s, &osdc->homeless_osd); + seq_puts(s, "LINGER REQUESTS\n"); + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + dump_linger_requests(s, osd); + } + dump_linger_requests(s, &osdc->homeless_osd); + up_read(&osdc->lock); return 0; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ef1bcbe9af2d..ca0a7b58ba4f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -45,6 +45,10 @@ static const struct ceph_connection_operations osd_con_ops; static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); #if 1 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) @@ -74,10 +78,15 @@ static inline void verify_osd_locked(struct ceph_osd *osd) rwsem_is_locked(&osdc->lock)) && !rwsem_is_wrlocked(&osdc->lock)); } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) +{ + WARN_ON(!mutex_is_locked(&lreq->lock)); +} #else static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } static inline void verify_osd_locked(struct ceph_osd *osd) { } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } #endif /* @@ -322,6 +331,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_STAT: ceph_osd_data_release(&op->raw_data_in); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osd_data_release(&op->notify_ack.request_data); + break; default: break; } @@ -345,6 +357,29 @@ static void target_init(struct ceph_osd_request_target *t) t->osd = CEPH_HOMELESS_OSD; } +static void target_copy(struct ceph_osd_request_target *dest, + const struct ceph_osd_request_target *src) +{ + ceph_oid_copy(&dest->base_oid, &src->base_oid); + ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); + ceph_oid_copy(&dest->target_oid, &src->target_oid); + ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); + + dest->pgid = src->pgid; /* struct */ + dest->pg_num = src->pg_num; + dest->pg_num_mask = src->pg_num_mask; + ceph_osds_copy(&dest->acting, &src->acting); + ceph_osds_copy(&dest->up, &src->up); + dest->size = src->size; + dest->min_size = src->min_size; + dest->sort_bitwise = src->sort_bitwise; + + dest->flags = src->flags; + dest->paused = src->paused; + + dest->osd = src->osd; +} + static void target_destroy(struct ceph_osd_request_target *t) { ceph_oid_destroy(&t->base_oid); @@ -357,8 +392,6 @@ static void target_destroy(struct ceph_osd_request_target *t) static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); - WARN_ON(!list_empty(&req->r_linger_item)); - WARN_ON(!list_empty(&req->r_linger_osd_item)); WARN_ON(!list_empty(&req->r_unsafe_item)); WARN_ON(req->r_osd); } @@ -419,13 +452,48 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); init_completion(&req->r_safe_completion); RB_CLEAR_NODE(&req->r_node); - INIT_LIST_HEAD(&req->r_linger_item); - INIT_LIST_HEAD(&req->r_linger_osd_item); INIT_LIST_HEAD(&req->r_unsafe_item); target_init(&req->r_t); } +/* + * This is ugly, but it allows us to reuse linger registration and ping + * requests, keeping the structure of the code around send_linger{_ping}() + * reasonable. Setting up a min_nr=2 mempool for each linger request + * and dealing with copying ops (this blasts req only, watch op remains + * intact) isn't any better. + */ +static void request_reinit(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool mempool = req->r_mempool; + unsigned int num_ops = req->r_num_ops; + u64 snapid = req->r_snapid; + struct ceph_snap_context *snapc = req->r_snapc; + bool linger = req->r_linger; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + + dout("%s req %p\n", __func__, req); + WARN_ON(atomic_read(&req->r_kref.refcount) != 1); + request_release_checks(req); + + WARN_ON(atomic_read(&request_msg->kref.refcount) != 1); + WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1); + target_destroy(&req->r_t); + + request_init(req); + req->r_osdc = osdc; + req->r_mempool = mempool; + req->r_num_ops = num_ops; + req->r_snapid = snapid; + req->r_snapc = snapc; + req->r_linger = linger; + req->r_request = request_msg; + req->r_reply = reply_msg; +} + struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, unsigned int num_ops, @@ -681,21 +749,19 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, } EXPORT_SYMBOL(osd_req_op_xattr_init); -void osd_req_op_watch_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - u64 cookie, u64 version, int flag) +/* + * @watch_opcode: CEPH_OSD_WATCH_OP_* + */ +static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, + u64 cookie, u8 watch_opcode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); - - BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); + struct ceph_osd_req_op *op; + op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); op->watch.cookie = cookie; - op->watch.ver = version; - if (opcode == CEPH_OSD_OP_WATCH && flag) - op->watch.flag = (u8)1; + op->watch.op = watch_opcode; + op->watch.gen = 0; } -EXPORT_SYMBOL(osd_req_op_watch_init); void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, unsigned int which, @@ -771,11 +837,13 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); - dst->watch.ver = cpu_to_le64(src->watch.ver); - dst->watch.flag = src->watch.flag; + dst->watch.ver = cpu_to_le64(0); + dst->watch.op = src->watch.op; + dst->watch.gen = cpu_to_le32(src->watch.gen); + break; + case CEPH_OSD_OP_NOTIFY_ACK: break; case CEPH_OSD_OP_SETALLOCHINT: dst->alloc_hint.expected_object_size = @@ -915,7 +983,7 @@ static void osd_init(struct ceph_osd *osd) atomic_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); osd->o_requests = RB_ROOT; - INIT_LIST_HEAD(&osd->o_linger_requests); + osd->o_linger_requests = RB_ROOT; INIT_LIST_HEAD(&osd->o_osd_lru); INIT_LIST_HEAD(&osd->o_keepalive_item); osd->o_incarnation = 1; @@ -926,7 +994,7 @@ static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); @@ -996,7 +1064,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd) static void maybe_move_osd_to_lru(struct ceph_osd *osd) { if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) + RB_EMPTY_ROOT(&osd->o_linger_requests)) __move_osd_to_lru(osd); } @@ -1036,6 +1104,17 @@ static void close_osd(struct ceph_osd *osd) unlink_request(osd, req); link_request(&osdc->homeless_osd, req); } + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + n = rb_next(n); /* unlink_linger() */ + + dout(" reassigning lreq %p linger_id %llu\n", lreq, + lreq->linger_id); + unlink_linger(osd, lreq); + link_linger(&osdc->homeless_osd, lreq); + } __remove_osd_from_lru(osd); erase_osd(&osdc->osds, osd); @@ -1052,7 +1131,7 @@ static int reopen_osd(struct ceph_osd *osd) dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) { + RB_EMPTY_ROOT(&osd->o_linger_requests)) { close_osd(osd); return -ENODEV; } @@ -1148,52 +1227,6 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) atomic_dec(&osd->o_osdc->num_homeless); } -static void __register_linger_request(struct ceph_osd *osd, - struct ceph_osd_request *req) -{ - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - WARN_ON(!req->r_linger); - - ceph_osdc_get_request(req); - list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger); - list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests); - __remove_osd_from_lru(osd); - req->r_osd = osd; -} - -static void __unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - WARN_ON(!req->r_linger); - - if (list_empty(&req->r_linger_item)) { - dout("%s %p tid %llu not registered\n", __func__, req, - req->r_tid); - return; - } - - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - list_del_init(&req->r_linger_item); - - if (req->r_osd) { - list_del_init(&req->r_linger_osd_item); - maybe_move_osd_to_lru(req->r_osd); - if (RB_EMPTY_ROOT(&req->r_osd->o_requests)) - req->r_osd = NULL; - } - ceph_osdc_put_request(req); -} - -void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - if (!req->r_linger) { - dout("set_request_linger %p\n", req); - req->r_linger = 1; - } -} -EXPORT_SYMBOL(ceph_osdc_set_request_linger); - static bool __pool_full(struct ceph_pg_pool_info *pi) { return pi->flags & CEPH_POOL_FLAG_FULL; @@ -1379,6 +1412,10 @@ static void setup_request_data(struct ceph_osd_request *req, op->xattr.value_len); ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osdc_msg_data_add(msg, + &op->notify_ack.request_data); + break; /* reply */ case CEPH_OSD_OP_STAT: @@ -1683,6 +1720,460 @@ static void cancel_request(struct ceph_osd_request *req) finish_request(req); } +/* + * lingering requests, watch/notify v2 infrastructure + */ +static void linger_release(struct kref *kref) +{ + struct ceph_osd_linger_request *lreq = + container_of(kref, struct ceph_osd_linger_request, kref); + + dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, + lreq->reg_req, lreq->ping_req); + WARN_ON(!RB_EMPTY_NODE(&lreq->node)); + WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); + WARN_ON(!list_empty(&lreq->scan_item)); + WARN_ON(lreq->osd); + + if (lreq->reg_req) + ceph_osdc_put_request(lreq->reg_req); + if (lreq->ping_req) + ceph_osdc_put_request(lreq->ping_req); + target_destroy(&lreq->t); + kfree(lreq); +} + +static void linger_put(struct ceph_osd_linger_request *lreq) +{ + if (lreq) + kref_put(&lreq->kref, linger_release); +} + +static struct ceph_osd_linger_request * +linger_get(struct ceph_osd_linger_request *lreq) +{ + kref_get(&lreq->kref); + return lreq; +} + +static struct ceph_osd_linger_request * +linger_alloc(struct ceph_osd_client *osdc) +{ + struct ceph_osd_linger_request *lreq; + + lreq = kzalloc(sizeof(*lreq), GFP_NOIO); + if (!lreq) + return NULL; + + kref_init(&lreq->kref); + mutex_init(&lreq->lock); + RB_CLEAR_NODE(&lreq->node); + RB_CLEAR_NODE(&lreq->osdc_node); + INIT_LIST_HEAD(&lreq->scan_item); + init_completion(&lreq->reg_commit_wait); + + lreq->osdc = osdc; + target_init(&lreq->t); + + dout("%s lreq %p\n", __func__, lreq); + return lreq; +} + +DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) +DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) + +/* + * Create linger request <-> OSD session relation. + * + * @lreq has to be registered, @osd may be homeless. + */ +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(!lreq->linger_id || lreq->osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + if (!osd_homeless(osd)) + __remove_osd_from_lru(osd); + else + atomic_inc(&osd->o_osdc->num_homeless); + + get_osd(osd); + insert_linger(&osd->o_linger_requests, lreq); + lreq->osd = osd; +} + +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(lreq->osd != osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + lreq->osd = NULL; + erase_linger(&osd->o_linger_requests, lreq); + put_osd(osd); + + if (!osd_homeless(osd)) + maybe_move_osd_to_lru(osd); + else + atomic_dec(&osd->o_osdc->num_homeless); +} + +static bool __linger_registered(struct ceph_osd_linger_request *lreq) +{ + verify_osdc_locked(lreq->osdc); + + return !RB_EMPTY_NODE(&lreq->osdc_node); +} + +static bool linger_registered(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + bool registered; + + down_read(&osdc->lock); + registered = __linger_registered(lreq); + up_read(&osdc->lock); + + return registered; +} + +static void linger_register(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + WARN_ON(lreq->linger_id); + + linger_get(lreq); + lreq->linger_id = ++osdc->last_linger_id; + insert_linger_osdc(&osdc->linger_requests, lreq); +} + +static void linger_unregister(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + + erase_linger_osdc(&osdc->linger_requests, lreq); + linger_put(lreq); +} + +static void cancel_linger_request(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + WARN_ON(!req->r_linger); + cancel_request(req); + linger_put(lreq); +} + +struct linger_work { + struct work_struct work; + struct ceph_osd_linger_request *lreq; + + union { + struct { + u64 notify_id; + u64 notifier_id; + void *payload; /* points into @msg front */ + size_t payload_len; + + struct ceph_msg *msg; /* for ceph_msg_put() */ + } notify; + struct { + int err; + } error; + }; +}; + +static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, + work_func_t workfn) +{ + struct linger_work *lwork; + + lwork = kzalloc(sizeof(*lwork), GFP_NOIO); + if (!lwork) + return NULL; + + INIT_WORK(&lwork->work, workfn); + lwork->lreq = linger_get(lreq); + + return lwork; +} + +static void lwork_free(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + + linger_put(lreq); + kfree(lwork); +} + +static void lwork_queue(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + struct ceph_osd_client *osdc = lreq->osdc; + + verify_lreq_locked(lreq); + queue_work(osdc->notify_wq, &lwork->work); +} + +static void do_watch_notify(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", + __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, + lwork->notify.payload_len); + lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, + lwork->notify.notifier_id, lwork->notify.payload, + lwork->notify.payload_len); + +out: + ceph_msg_put(lwork->notify.msg); + lwork_free(lwork); +} + +static void do_watch_error(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); + lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); + +out: + lwork_free(lwork); +} + +static void queue_watch_error(struct ceph_osd_linger_request *lreq) +{ + struct linger_work *lwork; + + lwork = lwork_alloc(lreq, do_watch_error); + if (!lwork) { + pr_err("failed to allocate error-lwork\n"); + return; + } + + lwork->error.err = lreq->last_error; + lwork_queue(lwork); +} + +static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, + int result) +{ + if (!completion_done(&lreq->reg_commit_wait)) { + lreq->reg_commit_error = (result <= 0 ? result : 0); + complete_all(&lreq->reg_commit_wait); + } +} + +static void linger_commit_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, + lreq->linger_id, req->r_result); + WARN_ON(!__linger_registered(lreq)); + linger_reg_commit_complete(lreq, req->r_result); + lreq->committed = true; + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static int normalize_watch_error(int err) +{ + /* + * Translate ENOENT -> ENOTCONN so that a delete->disconnection + * notification and a failure to reconnect because we raced with + * the delete appear the same to the user. + */ + if (err == -ENOENT) + err = -ENOTCONN; + + return err; +} + +static void linger_reconnect_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, + lreq, lreq->linger_id, req->r_result, lreq->last_error); + if (req->r_result < 0) { + if (!lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req = lreq->reg_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + verify_osdc_wrlocked(req->r_osdc); + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = lreq->t.flags; + req->r_mtime = lreq->mtime; + + mutex_lock(&lreq->lock); + if (lreq->committed) { + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id); + op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; + op->watch.gen = ++lreq->register_gen; + dout("lreq %p reconnect register_gen %u\n", lreq, + op->watch.gen); + req->r_callback = linger_reconnect_cb; + } else { + WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); + dout("lreq %p register\n", lreq); + req->r_callback = linger_commit_cb; + } + mutex_unlock(&lreq->lock); + + req->r_priv = linger_get(lreq); + req->r_linger = true; + + submit_request(req, true); +} + +static void linger_ping_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", + __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, + lreq->last_error); + if (lreq->register_gen == req->r_ops[0].watch.gen) { + if (req->r_result && !lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } else { + dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, + lreq->register_gen, req->r_ops[0].watch.gen); + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger_ping(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd_request *req = lreq->ping_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) { + dout("%s PAUSERD\n", __func__); + return; + } + + lreq->ping_sent = jiffies; + dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", + __func__, lreq, lreq->linger_id, lreq->ping_sent, + lreq->register_gen); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + target_copy(&req->r_t, &lreq->t); + + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id || + op->watch.op != CEPH_OSD_WATCH_OP_PING); + op->watch.gen = lreq->register_gen; + req->r_callback = linger_ping_cb; + req->r_priv = linger_get(lreq); + req->r_linger = true; + + ceph_osdc_get_request(req); + account_request(req); + req->r_tid = atomic64_inc_return(&osdc->last_tid); + link_request(lreq->osd, req); + send_request(req); +} + +static void linger_submit(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd *osd; + + calc_target(osdc, &lreq->t, &lreq->last_force_resend, false); + osd = lookup_create_osd(osdc, lreq->t.osd, true); + link_linger(osd, lreq); + + send_linger(lreq); +} + +/* + * @lreq has to be both registered and linked. + */ +static void __linger_cancel(struct ceph_osd_linger_request *lreq) +{ + if (lreq->ping_req->r_osd) + cancel_linger_request(lreq->ping_req); + if (lreq->reg_req->r_osd) + cancel_linger_request(lreq->reg_req); + unlink_linger(lreq->osd, lreq); + linger_unregister(lreq); +} + +static void linger_cancel(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + down_write(&osdc->lock); + if (__linger_registered(lreq)) + __linger_cancel(lreq); + up_write(&osdc->lock); +} + +static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) +{ + int ret; + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); + return ret ?: lreq->reg_commit_error; +} + /* * Timeout callback, called every N seconds. When 1 or more OSD * requests has been active for more than N seconds, we send a keepalive @@ -1720,6 +2211,19 @@ static void handle_timeout(struct work_struct *work) found = true; } } + for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { + struct ceph_osd_linger_request *lreq = + rb_entry(p, struct ceph_osd_linger_request, node); + + dout(" lreq %p linger_id %llu is served by osd%d\n", + lreq, lreq->linger_id, osd->o_osd); + found = true; + + mutex_lock(&lreq->lock); + if (lreq->committed && !lreq->last_error) + send_linger_ping(lreq); + mutex_unlock(&lreq->lock); + } if (found) list_move_tail(&osd->o_keepalive_item, &slow_osds); @@ -1756,7 +2260,7 @@ static void handle_osds_timeout(struct work_struct *work) break; WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); close_osd(osd); } @@ -2082,7 +2586,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) __finish_request(req); if (req->r_linger) { WARN_ON(req->r_unsafe_callback); - __register_linger_request(osd, req); + dout("req %p tid %llu cb (locked)\n", req, req->r_tid); + __complete_request(req); } } @@ -2093,7 +2598,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) if (already_acked && req->r_unsafe_callback) { dout("req %p tid %llu safe-cb\n", req, req->r_tid); req->r_unsafe_callback(req, false); - } else { + } else if (!req->r_linger) { dout("req %p tid %llu cb\n", req, req->r_tid); __complete_request(req); } @@ -2145,6 +2650,26 @@ static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) return pi->was_full && !__pool_full(pi); } +static enum calc_target_result +recalc_linger_target(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + enum calc_target_result ct_res; + + ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true); + if (ct_res == CALC_TARGET_NEED_RESEND) { + struct ceph_osd *osd; + + osd = lookup_create_osd(osdc, lreq->t.osd, true); + if (osd != lreq->osd) { + unlink_linger(lreq->osd, lreq); + link_linger(osd, lreq); + } + } + + return ct_res; +} + /* * Requeue requests whose mapping to an OSD has changed. */ @@ -2159,6 +2684,39 @@ static void scan_requests(struct ceph_osd *osd, struct rb_node *n; bool force_resend_writes; + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + enum calc_target_result ct_res; + + n = rb_next(n); /* recalc_linger_target() */ + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, + lreq->linger_id); + ct_res = recalc_linger_target(lreq); + switch (ct_res) { + case CALC_TARGET_NO_ACTION: + force_resend_writes = cleared_full || + (check_pool_cleared_full && + pool_cleared_full(osdc, lreq->t.base_oloc.pool)); + if (!force_resend && !force_resend_writes) + break; + + /* fall through */ + case CALC_TARGET_NEED_RESEND: + /* + * scan_requests() for the previous epoch(s) + * may have already added it to the list, since + * it's not unlinked here. + */ + if (list_empty(&lreq->scan_item)) + list_add_tail(&lreq->scan_item, need_resend_linger); + break; + case CALC_TARGET_POOL_DNE: + break; + } + } + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); @@ -2263,6 +2821,7 @@ static void kick_requests(struct ceph_osd_client *osdc, struct rb_root *need_resend, struct list_head *need_resend_linger) { + struct ceph_osd_linger_request *lreq, *nlreq; struct rb_node *n; for (n = rb_first(need_resend); n; ) { @@ -2280,8 +2839,17 @@ static void kick_requests(struct ceph_osd_client *osdc, if (!req->r_linger) { if (!osd_homeless(osd) && !req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + + list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { + if (!osd_homeless(lreq->osd)) + send_linger(lreq); + + list_del_init(&lreq->scan_item); + } } /* @@ -2406,15 +2974,25 @@ static void kick_osd_requests(struct ceph_osd *osd) { struct rb_node *n; - for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); + n = rb_next(n); /* cancel_linger_request() */ + if (!req->r_linger) { if (!req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + send_linger(lreq); + } } /* @@ -2441,193 +3019,77 @@ out_unlock: up_write(&osdc->lock); } -/* - * watch/notify callback event infrastructure - * - * These callbacks are used both for watch and notify operations. - */ -static void __release_event(struct kref *kref) -{ - struct ceph_osd_event *event = - container_of(kref, struct ceph_osd_event, kref); - - dout("__release_event %p\n", event); - kfree(event); -} - -static void get_event(struct ceph_osd_event *event) -{ - kref_get(&event->kref); -} - -void ceph_osdc_put_event(struct ceph_osd_event *event) -{ - kref_put(&event->kref, __release_event); -} -EXPORT_SYMBOL(ceph_osdc_put_event); - -static void __insert_event(struct ceph_osd_client *osdc, - struct ceph_osd_event *new) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (new->cookie < event->cookie) - p = &(*p)->rb_left; - else if (new->cookie > event->cookie) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&new->node, parent, p); - rb_insert_color(&new->node, &osdc->event_tree); -} - -static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, - u64 cookie) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (cookie < event->cookie) - p = &(*p)->rb_left; - else if (cookie > event->cookie) - p = &(*p)->rb_right; - else - return event; - } - return NULL; -} - -static void __remove_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - if (!RB_EMPTY_NODE(&event->node)) { - dout("__remove_event removed %p\n", event); - rb_erase(&event->node, &osdc->event_tree); - ceph_osdc_put_event(event); - } else { - dout("__remove_event didn't remove %p\n", event); - } -} - -int ceph_osdc_create_event(struct ceph_osd_client *osdc, - void (*event_cb)(u64, u64, u8, void *), - void *data, struct ceph_osd_event **pevent) -{ - struct ceph_osd_event *event; - - event = kmalloc(sizeof(*event), GFP_NOIO); - if (!event) - return -ENOMEM; - - dout("create_event %p\n", event); - event->cb = event_cb; - event->one_shot = 0; - event->data = data; - event->osdc = osdc; - INIT_LIST_HEAD(&event->osd_node); - RB_CLEAR_NODE(&event->node); - kref_init(&event->kref); /* one ref for us */ - kref_get(&event->kref); /* one ref for the caller */ - - spin_lock(&osdc->event_lock); - event->cookie = ++osdc->event_count; - __insert_event(osdc, event); - spin_unlock(&osdc->event_lock); - - *pevent = event; - return 0; -} -EXPORT_SYMBOL(ceph_osdc_create_event); - -void ceph_osdc_cancel_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - dout("cancel_event %p\n", event); - spin_lock(&osdc->event_lock); - __remove_event(event); - spin_unlock(&osdc->event_lock); - ceph_osdc_put_event(event); /* caller's */ -} -EXPORT_SYMBOL(ceph_osdc_cancel_event); - - -static void do_event_work(struct work_struct *work) -{ - struct ceph_osd_event_work *event_work = - container_of(work, struct ceph_osd_event_work, work); - struct ceph_osd_event *event = event_work->event; - u64 ver = event_work->ver; - u64 notify_id = event_work->notify_id; - u8 opcode = event_work->opcode; - - dout("do_event_work completing %p\n", event); - event->cb(ver, notify_id, opcode, event->data); - dout("do_event_work completed %p\n", event); - ceph_osdc_put_event(event); - kfree(event_work); -} - - /* * Process osd watch notifications */ static void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) { - void *p, *end; - u8 proto_ver; - u64 cookie, ver, notify_id; - u8 opcode; - struct ceph_osd_event *event; - struct ceph_osd_event_work *event_work; - - p = msg->front.iov_base; - end = p + msg->front.iov_len; + void *p = msg->front.iov_base; + void *const end = p + msg->front.iov_len; + struct ceph_osd_linger_request *lreq; + struct linger_work *lwork; + u8 proto_ver, opcode; + u64 cookie, notify_id; + u64 notifier_id = 0; + void *payload = NULL; + u32 payload_len = 0; ceph_decode_8_safe(&p, end, proto_ver, bad); ceph_decode_8_safe(&p, end, opcode, bad); ceph_decode_64_safe(&p, end, cookie, bad); - ceph_decode_64_safe(&p, end, ver, bad); + p += 8; /* skip ver */ ceph_decode_64_safe(&p, end, notify_id, bad); - spin_lock(&osdc->event_lock); - event = __find_event(osdc, cookie); - if (event) { - BUG_ON(event->one_shot); - get_event(event); - } - spin_unlock(&osdc->event_lock); - dout("handle_watch_notify cookie %lld ver %lld event %p\n", - cookie, ver, event); - if (event) { - event_work = kmalloc(sizeof(*event_work), GFP_NOIO); - if (!event_work) { - pr_err("couldn't allocate event_work\n"); - ceph_osdc_put_event(event); - return; + if (proto_ver >= 1) { + ceph_decode_32_safe(&p, end, payload_len, bad); + ceph_decode_need(&p, end, payload_len, bad); + payload = p; + p += payload_len; + } + + if (le16_to_cpu(msg->hdr.version) >= 2) + p += 4; /* skip return_code */ + + if (le16_to_cpu(msg->hdr.version) >= 3) + ceph_decode_64_safe(&p, end, notifier_id, bad); + + down_read(&osdc->lock); + lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); + if (!lreq) { + dout("%s opcode %d cookie %llu dne\n", __func__, opcode, + cookie); + goto out_unlock_osdc; + } + + mutex_lock(&lreq->lock); + dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie, + lreq); + if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { + if (!lreq->last_error) { + lreq->last_error = -ENOTCONN; + queue_watch_error(lreq); + } + } else { + /* CEPH_WATCH_EVENT_NOTIFY */ + lwork = lwork_alloc(lreq, do_watch_notify); + if (!lwork) { + pr_err("failed to allocate notify-lwork\n"); + goto out_unlock_lreq; } - INIT_WORK(&event_work->work, do_event_work); - event_work->event = event; - event_work->ver = ver; - event_work->notify_id = notify_id; - event_work->opcode = opcode; - queue_work(osdc->notify_wq, &event_work->work); + lwork->notify.notify_id = notify_id; + lwork->notify.notifier_id = notifier_id; + lwork->notify.payload = payload; + lwork->notify.payload_len = payload_len; + lwork->notify.msg = ceph_msg_get(msg); + lwork_queue(lwork); } +out_unlock_lreq: + mutex_unlock(&lreq->lock); +out_unlock_osdc: + up_read(&osdc->lock); return; bad: @@ -2659,8 +3121,6 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req) struct ceph_osd_client *osdc = req->r_osdc; down_write(&osdc->lock); - if (req->r_linger) - __unregister_linger_request(osdc, req); if (req->r_osd) cancel_request(req); up_write(&osdc->lock); @@ -2743,6 +3203,198 @@ again: } EXPORT_SYMBOL(ceph_osdc_sync); +static struct ceph_osd_request * +alloc_linger_request(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req; + + req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return NULL; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { + ceph_osdc_put_request(req); + return NULL; + } + + return req; +} + +/* + * Returns a handle, caller owns a ref. + */ +struct ceph_osd_linger_request * +ceph_osdc_watch(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + rados_watchcb2_t wcb, + rados_watcherrcb_t errcb, + void *data) +{ + struct ceph_osd_linger_request *lreq; + int ret; + + lreq = linger_alloc(osdc); + if (!lreq) + return ERR_PTR(-ENOMEM); + + lreq->wcb = wcb; + lreq->errcb = errcb; + lreq->data = data; + + ceph_oid_copy(&lreq->t.base_oid, oid); + ceph_oloc_copy(&lreq->t.base_oloc, oloc); + lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + lreq->mtime = CURRENT_TIME; + + lreq->reg_req = alloc_linger_request(lreq); + if (!lreq->reg_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + lreq->ping_req = alloc_linger_request(lreq); + if (!lreq->ping_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + down_write(&osdc->lock); + linger_register(lreq); /* before osd_req_op_* */ + osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_WATCH); + osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_PING); + linger_submit(lreq); + up_write(&osdc->lock); + + ret = linger_reg_commit_wait(lreq); + if (ret) { + linger_cancel(lreq); + goto err_put_lreq; + } + + return lreq; + +err_put_lreq: + linger_put(lreq); + return ERR_PTR(ret); +} +EXPORT_SYMBOL(ceph_osdc_watch); + +/* + * Releases a ref. + * + * Times out after mount_timeout to preserve rbd unmap behaviour + * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap + * with mount_timeout"). + */ +int ceph_osdc_unwatch(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq) +{ + struct ceph_options *opts = osdc->client->options; + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + req->r_mtime = CURRENT_TIME; + osd_req_op_watch_init(req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_UNWATCH); + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + linger_cancel(lreq); + linger_put(lreq); + ret = wait_request_timeout(req, opts->mount_timeout); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_unwatch); + +static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, + u64 notify_id, u64 cookie, void *payload, + size_t payload_len) +{ + struct ceph_osd_req_op *op; + struct ceph_pagelist *pl; + int ret; + + op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); + + pl = kmalloc(sizeof(*pl), GFP_NOIO); + if (!pl) + return -ENOMEM; + + ceph_pagelist_init(pl); + ret = ceph_pagelist_encode_64(pl, notify_id); + ret |= ceph_pagelist_encode_64(pl, cookie); + if (payload) { + ret |= ceph_pagelist_encode_32(pl, payload_len); + ret |= ceph_pagelist_append(pl, payload, payload_len); + } else { + ret |= ceph_pagelist_encode_32(pl, 0); + } + if (ret) { + ceph_pagelist_release(pl); + return -ENOMEM; + } + + ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); + op->indata_len = pl->length; + return 0; +} + +int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 notify_id, + u64 cookie, + void *payload, + size_t payload_len) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_notify_ack); + /* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked @@ -2767,15 +3419,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->osds = RB_ROOT; INIT_LIST_HEAD(&osdc->osd_lru); spin_lock_init(&osdc->osd_lru_lock); - INIT_LIST_HEAD(&osdc->req_linger); osd_init(&osdc->homeless_osd); osdc->homeless_osd.o_osdc = osdc; osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; + osdc->linger_requests = RB_ROOT; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); - spin_lock_init(&osdc->event_lock); - osdc->event_tree = RB_ROOT; - osdc->event_count = 0; err = -ENOMEM; osdc->osdmap = ceph_osdmap_alloc(); @@ -2838,6 +3487,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) osd_cleanup(&osdc->homeless_osd); WARN_ON(!list_empty(&osdc->osd_lru)); + WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); WARN_ON(atomic_read(&osdc->num_requests)); WARN_ON(atomic_read(&osdc->num_homeless)); -- 2.20.1