ceph: make page alignment explicit in osd interface
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / net / ceph / osd_client.c
CommitLineData
3d14c5d2 1#include <linux/ceph/ceph_debug.h>
f24e9980 2
3d14c5d2 3#include <linux/module.h>
f24e9980
SW
4#include <linux/err.h>
5#include <linux/highmem.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/slab.h>
9#include <linux/uaccess.h>
68b4476b
YS
10#ifdef CONFIG_BLOCK
11#include <linux/bio.h>
12#endif
f24e9980 13
3d14c5d2
YS
14#include <linux/ceph/libceph.h>
15#include <linux/ceph/osd_client.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/auth.h>
19#include <linux/ceph/pagelist.h>
f24e9980 20
c16e7869
SW
21#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 23
9e32789f 24static const struct ceph_connection_operations osd_con_ops;
422d2cb8
YS
25static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
f24e9980
SW
27
28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
29
68b4476b
YS
30static int op_needs_trail(int op)
31{
32 switch (op) {
33 case CEPH_OSD_OP_GETXATTR:
34 case CEPH_OSD_OP_SETXATTR:
35 case CEPH_OSD_OP_CMPXATTR:
36 case CEPH_OSD_OP_CALL:
37 return 1;
38 default:
39 return 0;
40 }
41}
42
43static int op_has_extent(int op)
44{
45 return (op == CEPH_OSD_OP_READ ||
46 op == CEPH_OSD_OP_WRITE);
47}
48
3499e8a5
YS
49void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
50 struct ceph_file_layout *layout,
51 u64 snapid,
68b4476b
YS
52 u64 off, u64 *plen, u64 *bno,
53 struct ceph_osd_request *req,
54 struct ceph_osd_req_op *op)
3499e8a5
YS
55{
56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
68b4476b 57 u64 orig_len = *plen;
3499e8a5
YS
58 u64 objoff, objlen; /* extent in object */
59
60 reqhead->snapid = cpu_to_le64(snapid);
61
62 /* object extent? */
68b4476b 63 ceph_calc_file_object_mapping(layout, off, plen, bno,
3499e8a5 64 &objoff, &objlen);
68b4476b 65 if (*plen < orig_len)
3499e8a5 66 dout(" skipping last %llu, final file extent %llu~%llu\n",
68b4476b 67 orig_len - *plen, off, *plen);
3499e8a5 68
68b4476b
YS
69 if (op_has_extent(op->op)) {
70 op->extent.offset = objoff;
71 op->extent.length = objlen;
72 }
73 req->r_num_pages = calc_pages_for(off, *plen);
b7495fc2 74 req->r_page_alignment = off & ~PAGE_MASK;
3d14c5d2
YS
75 if (op->op == CEPH_OSD_OP_WRITE)
76 op->payload_len = *plen;
3499e8a5
YS
77
78 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
79 *bno, objoff, objlen, req->r_num_pages);
80
81}
3d14c5d2 82EXPORT_SYMBOL(ceph_calc_raw_layout);
3499e8a5 83
f24e9980
SW
84/*
85 * Implement client access to distributed object storage cluster.
86 *
87 * All data objects are stored within a cluster/cloud of OSDs, or
88 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
89 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
90 * remote daemons serving up and coordinating consistent and safe
91 * access to storage.
92 *
93 * Cluster membership and the mapping of data objects onto storage devices
94 * are described by the osd map.
95 *
96 * We keep track of pending OSD requests (read, write), resubmit
97 * requests to different OSDs when the cluster topology/data layout
98 * change, or retry the affected requests when the communications
99 * channel with an OSD is reset.
100 */
101
102/*
103 * calculate the mapping of a file extent onto an object, and fill out the
104 * request accordingly. shorten extent as necessary if it crosses an
105 * object boundary.
106 *
107 * fill osd op in request message.
108 */
109static void calc_layout(struct ceph_osd_client *osdc,
3499e8a5
YS
110 struct ceph_vino vino,
111 struct ceph_file_layout *layout,
f24e9980 112 u64 off, u64 *plen,
68b4476b
YS
113 struct ceph_osd_request *req,
114 struct ceph_osd_req_op *op)
f24e9980 115{
f24e9980
SW
116 u64 bno;
117
68b4476b
YS
118 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
119 plen, &bno, req, op);
f24e9980
SW
120
121 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
122 req->r_oid_len = strlen(req->r_oid);
f24e9980
SW
123}
124
f24e9980
SW
125/*
126 * requests
127 */
415e49a9 128void ceph_osdc_release_request(struct kref *kref)
f24e9980 129{
415e49a9
SW
130 struct ceph_osd_request *req = container_of(kref,
131 struct ceph_osd_request,
132 r_kref);
133
134 if (req->r_request)
135 ceph_msg_put(req->r_request);
136 if (req->r_reply)
137 ceph_msg_put(req->r_reply);
0d59ab81 138 if (req->r_con_filling_msg) {
350b1c32 139 dout("release_request revoking pages %p from con %p\n",
0d59ab81
YS
140 req->r_pages, req->r_con_filling_msg);
141 ceph_con_revoke_message(req->r_con_filling_msg,
142 req->r_reply);
143 ceph_con_put(req->r_con_filling_msg);
350b1c32 144 }
415e49a9
SW
145 if (req->r_own_pages)
146 ceph_release_page_vector(req->r_pages,
147 req->r_num_pages);
68b4476b
YS
148#ifdef CONFIG_BLOCK
149 if (req->r_bio)
150 bio_put(req->r_bio);
151#endif
415e49a9 152 ceph_put_snap_context(req->r_snapc);
68b4476b
YS
153 if (req->r_trail) {
154 ceph_pagelist_release(req->r_trail);
155 kfree(req->r_trail);
156 }
415e49a9
SW
157 if (req->r_mempool)
158 mempool_free(req, req->r_osdc->req_mempool);
159 else
160 kfree(req);
f24e9980 161}
3d14c5d2 162EXPORT_SYMBOL(ceph_osdc_release_request);
68b4476b
YS
163
164static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
165{
166 int i = 0;
167
168 if (needs_trail)
169 *needs_trail = 0;
170 while (ops[i].op) {
171 if (needs_trail && op_needs_trail(ops[i].op))
172 *needs_trail = 1;
173 i++;
174 }
175
176 return i;
177}
178
3499e8a5
YS
179struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
180 int flags,
f24e9980 181 struct ceph_snap_context *snapc,
68b4476b 182 struct ceph_osd_req_op *ops,
3499e8a5
YS
183 bool use_mempool,
184 gfp_t gfp_flags,
68b4476b
YS
185 struct page **pages,
186 struct bio *bio)
f24e9980
SW
187{
188 struct ceph_osd_request *req;
189 struct ceph_msg *msg;
68b4476b
YS
190 int needs_trail;
191 int num_op = get_num_ops(ops, &needs_trail);
192 size_t msg_size = sizeof(struct ceph_osd_request_head);
3499e8a5 193
68b4476b 194 msg_size += num_op*sizeof(struct ceph_osd_op);
f24e9980
SW
195
196 if (use_mempool) {
3499e8a5 197 req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980
SW
198 memset(req, 0, sizeof(*req));
199 } else {
3499e8a5 200 req = kzalloc(sizeof(*req), gfp_flags);
f24e9980
SW
201 }
202 if (req == NULL)
a79832f2 203 return NULL;
f24e9980 204
f24e9980
SW
205 req->r_osdc = osdc;
206 req->r_mempool = use_mempool;
68b4476b 207
415e49a9 208 kref_init(&req->r_kref);
f24e9980
SW
209 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion);
211 INIT_LIST_HEAD(&req->r_unsafe_item);
212 req->r_flags = flags;
213
214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
215
c16e7869
SW
216 /* create reply message */
217 if (use_mempool)
218 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
219 else
220 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
3499e8a5 221 OSD_OPREPLY_FRONT_LEN, gfp_flags);
a79832f2 222 if (!msg) {
c16e7869 223 ceph_osdc_put_request(req);
a79832f2 224 return NULL;
c16e7869
SW
225 }
226 req->r_reply = msg;
227
68b4476b
YS
228 /* allocate space for the trailing data */
229 if (needs_trail) {
230 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
231 if (!req->r_trail) {
232 ceph_osdc_put_request(req);
233 return NULL;
234 }
235 ceph_pagelist_init(req->r_trail);
236 }
c16e7869 237 /* create request message; allow space for oid */
f24e9980
SW
238 msg_size += 40;
239 if (snapc)
240 msg_size += sizeof(u64) * snapc->num_snaps;
241 if (use_mempool)
8f3bc053 242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 243 else
3499e8a5 244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
a79832f2 245 if (!msg) {
f24e9980 246 ceph_osdc_put_request(req);
a79832f2 247 return NULL;
f24e9980 248 }
68b4476b 249
f24e9980
SW
250 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
251 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5
YS
252
253 req->r_request = msg;
254 req->r_pages = pages;
68b4476b
YS
255#ifdef CONFIG_BLOCK
256 if (bio) {
257 req->r_bio = bio;
258 bio_get(req->r_bio);
259 }
260#endif
3499e8a5
YS
261
262 return req;
263}
3d14c5d2 264EXPORT_SYMBOL(ceph_osdc_alloc_request);
3499e8a5 265
68b4476b
YS
266static void osd_req_encode_op(struct ceph_osd_request *req,
267 struct ceph_osd_op *dst,
268 struct ceph_osd_req_op *src)
269{
270 dst->op = cpu_to_le16(src->op);
271
272 switch (dst->op) {
273 case CEPH_OSD_OP_READ:
274 case CEPH_OSD_OP_WRITE:
275 dst->extent.offset =
276 cpu_to_le64(src->extent.offset);
277 dst->extent.length =
278 cpu_to_le64(src->extent.length);
279 dst->extent.truncate_size =
280 cpu_to_le64(src->extent.truncate_size);
281 dst->extent.truncate_seq =
282 cpu_to_le32(src->extent.truncate_seq);
283 break;
284
285 case CEPH_OSD_OP_GETXATTR:
286 case CEPH_OSD_OP_SETXATTR:
287 case CEPH_OSD_OP_CMPXATTR:
288 BUG_ON(!req->r_trail);
289
290 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
291 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
292 dst->xattr.cmp_op = src->xattr.cmp_op;
293 dst->xattr.cmp_mode = src->xattr.cmp_mode;
294 ceph_pagelist_append(req->r_trail, src->xattr.name,
295 src->xattr.name_len);
296 ceph_pagelist_append(req->r_trail, src->xattr.val,
297 src->xattr.value_len);
298 break;
ae1533b6
YS
299 case CEPH_OSD_OP_CALL:
300 BUG_ON(!req->r_trail);
301
302 dst->cls.class_len = src->cls.class_len;
303 dst->cls.method_len = src->cls.method_len;
304 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
305
306 ceph_pagelist_append(req->r_trail, src->cls.class_name,
307 src->cls.class_len);
308 ceph_pagelist_append(req->r_trail, src->cls.method_name,
309 src->cls.method_len);
310 ceph_pagelist_append(req->r_trail, src->cls.indata,
311 src->cls.indata_len);
312 break;
313 case CEPH_OSD_OP_ROLLBACK:
314 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
315 break;
68b4476b
YS
316 case CEPH_OSD_OP_STARTSYNC:
317 break;
318 default:
319 pr_err("unrecognized osd opcode %d\n", dst->op);
320 WARN_ON(1);
321 break;
322 }
323 dst->payload_len = cpu_to_le32(src->payload_len);
324}
325
3499e8a5
YS
326/*
327 * build new request AND message
328 *
329 */
330void ceph_osdc_build_request(struct ceph_osd_request *req,
68b4476b
YS
331 u64 off, u64 *plen,
332 struct ceph_osd_req_op *src_ops,
333 struct ceph_snap_context *snapc,
334 struct timespec *mtime,
335 const char *oid,
336 int oid_len)
3499e8a5
YS
337{
338 struct ceph_msg *msg = req->r_request;
339 struct ceph_osd_request_head *head;
68b4476b 340 struct ceph_osd_req_op *src_op;
3499e8a5
YS
341 struct ceph_osd_op *op;
342 void *p;
68b4476b 343 int num_op = get_num_ops(src_ops, NULL);
3499e8a5 344 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
3499e8a5 345 int flags = req->r_flags;
68b4476b
YS
346 u64 data_len = 0;
347 int i;
3499e8a5 348
f24e9980
SW
349 head = msg->front.iov_base;
350 op = (void *)(head + 1);
351 p = (void *)(op + num_op);
352
f24e9980
SW
353 req->r_snapc = ceph_get_snap_context(snapc);
354
355 head->client_inc = cpu_to_le32(1); /* always, for now. */
356 head->flags = cpu_to_le32(flags);
357 if (flags & CEPH_OSD_FLAG_WRITE)
358 ceph_encode_timespec(&head->mtime, mtime);
359 head->num_ops = cpu_to_le16(num_op);
f24e9980 360
f24e9980
SW
361
362 /* fill in oid */
3499e8a5
YS
363 head->object_len = cpu_to_le32(oid_len);
364 memcpy(p, oid, oid_len);
365 p += oid_len;
f24e9980 366
68b4476b
YS
367 src_op = src_ops;
368 while (src_op->op) {
369 osd_req_encode_op(req, op, src_op);
370 src_op++;
f24e9980 371 op++;
f24e9980 372 }
68b4476b
YS
373
374 if (req->r_trail)
375 data_len += req->r_trail->length;
376
f24e9980
SW
377 if (snapc) {
378 head->snap_seq = cpu_to_le64(snapc->seq);
379 head->num_snaps = cpu_to_le32(snapc->num_snaps);
380 for (i = 0; i < snapc->num_snaps; i++) {
381 put_unaligned_le64(snapc->snaps[i], p);
382 p += sizeof(u64);
383 }
384 }
385
68b4476b
YS
386 if (flags & CEPH_OSD_FLAG_WRITE) {
387 req->r_request->hdr.data_off = cpu_to_le16(off);
388 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
389 } else if (data_len) {
390 req->r_request->hdr.data_off = 0;
391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
392 }
393
f24e9980 394 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
395 msg_size = p - msg->front.iov_base;
396 msg->front.iov_len = msg_size;
397 msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5
YS
398 return;
399}
3d14c5d2 400EXPORT_SYMBOL(ceph_osdc_build_request);
3499e8a5
YS
401
402/*
403 * build new request AND message, calculate layout, and adjust file
404 * extent as needed.
405 *
406 * if the file was recently truncated, we include information about its
407 * old and new size so that the object can be updated appropriately. (we
408 * avoid synchronously deleting truncated objects because it's slow.)
409 *
410 * if @do_sync, include a 'startsync' command so that the osd will flush
411 * data quickly.
412 */
413struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
414 struct ceph_file_layout *layout,
415 struct ceph_vino vino,
416 u64 off, u64 *plen,
417 int opcode, int flags,
418 struct ceph_snap_context *snapc,
419 int do_sync,
420 u32 truncate_seq,
421 u64 truncate_size,
422 struct timespec *mtime,
b7495fc2
SW
423 bool use_mempool, int num_reply,
424 int page_align)
3499e8a5 425{
68b4476b
YS
426 struct ceph_osd_req_op ops[3];
427 struct ceph_osd_request *req;
428
429 ops[0].op = opcode;
430 ops[0].extent.truncate_seq = truncate_seq;
431 ops[0].extent.truncate_size = truncate_size;
432 ops[0].payload_len = 0;
433
434 if (do_sync) {
435 ops[1].op = CEPH_OSD_OP_STARTSYNC;
436 ops[1].payload_len = 0;
437 ops[2].op = 0;
438 } else
439 ops[1].op = 0;
440
441 req = ceph_osdc_alloc_request(osdc, flags,
442 snapc, ops,
3499e8a5 443 use_mempool,
68b4476b 444 GFP_NOFS, NULL, NULL);
3499e8a5
YS
445 if (IS_ERR(req))
446 return req;
447
448 /* calculate max write size */
68b4476b 449 calc_layout(osdc, vino, layout, off, plen, req, ops);
3499e8a5
YS
450 req->r_file_layout = *layout; /* keep a copy */
451
b7495fc2
SW
452 /* in case it differs from natural alignment that calc_layout
453 filled in for us */
454 req->r_page_alignment = page_align;
455
68b4476b
YS
456 ceph_osdc_build_request(req, off, plen, ops,
457 snapc,
3499e8a5
YS
458 mtime,
459 req->r_oid, req->r_oid_len);
460
f24e9980
SW
461 return req;
462}
3d14c5d2 463EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980
SW
464
465/*
466 * We keep osd requests in an rbtree, sorted by ->r_tid.
467 */
468static void __insert_request(struct ceph_osd_client *osdc,
469 struct ceph_osd_request *new)
470{
471 struct rb_node **p = &osdc->requests.rb_node;
472 struct rb_node *parent = NULL;
473 struct ceph_osd_request *req = NULL;
474
475 while (*p) {
476 parent = *p;
477 req = rb_entry(parent, struct ceph_osd_request, r_node);
478 if (new->r_tid < req->r_tid)
479 p = &(*p)->rb_left;
480 else if (new->r_tid > req->r_tid)
481 p = &(*p)->rb_right;
482 else
483 BUG();
484 }
485
486 rb_link_node(&new->r_node, parent, p);
487 rb_insert_color(&new->r_node, &osdc->requests);
488}
489
490static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
491 u64 tid)
492{
493 struct ceph_osd_request *req;
494 struct rb_node *n = osdc->requests.rb_node;
495
496 while (n) {
497 req = rb_entry(n, struct ceph_osd_request, r_node);
498 if (tid < req->r_tid)
499 n = n->rb_left;
500 else if (tid > req->r_tid)
501 n = n->rb_right;
502 else
503 return req;
504 }
505 return NULL;
506}
507
508static struct ceph_osd_request *
509__lookup_request_ge(struct ceph_osd_client *osdc,
510 u64 tid)
511{
512 struct ceph_osd_request *req;
513 struct rb_node *n = osdc->requests.rb_node;
514
515 while (n) {
516 req = rb_entry(n, struct ceph_osd_request, r_node);
517 if (tid < req->r_tid) {
518 if (!n->rb_left)
519 return req;
520 n = n->rb_left;
521 } else if (tid > req->r_tid) {
522 n = n->rb_right;
523 } else {
524 return req;
525 }
526 }
527 return NULL;
528}
529
530
531/*
81b024e7 532 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
533 */
534static void osd_reset(struct ceph_connection *con)
535{
536 struct ceph_osd *osd = con->private;
537 struct ceph_osd_client *osdc;
538
539 if (!osd)
540 return;
541 dout("osd_reset osd%d\n", osd->o_osd);
542 osdc = osd->o_osdc;
f24e9980
SW
543 down_read(&osdc->map_sem);
544 kick_requests(osdc, osd);
545 up_read(&osdc->map_sem);
546}
547
548/*
549 * Track open sessions with osds.
550 */
551static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
552{
553 struct ceph_osd *osd;
554
555 osd = kzalloc(sizeof(*osd), GFP_NOFS);
556 if (!osd)
557 return NULL;
558
559 atomic_set(&osd->o_ref, 1);
560 osd->o_osdc = osdc;
561 INIT_LIST_HEAD(&osd->o_requests);
f5a2041b 562 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
563 osd->o_incarnation = 1;
564
565 ceph_con_init(osdc->client->msgr, &osd->o_con);
566 osd->o_con.private = osd;
567 osd->o_con.ops = &osd_con_ops;
568 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd 569
422d2cb8 570 INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980
SW
571 return osd;
572}
573
574static struct ceph_osd *get_osd(struct ceph_osd *osd)
575{
576 if (atomic_inc_not_zero(&osd->o_ref)) {
577 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
578 atomic_read(&osd->o_ref));
579 return osd;
580 } else {
581 dout("get_osd %p FAIL\n", osd);
582 return NULL;
583 }
584}
585
586static void put_osd(struct ceph_osd *osd)
587{
588 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
589 atomic_read(&osd->o_ref) - 1);
79494d1b
SW
590 if (atomic_dec_and_test(&osd->o_ref)) {
591 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
592
593 if (osd->o_authorizer)
594 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
f24e9980 595 kfree(osd);
79494d1b 596 }
f24e9980
SW
597}
598
599/*
600 * remove an osd from our map
601 */
f5a2041b 602static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 603{
f5a2041b 604 dout("__remove_osd %p\n", osd);
f24e9980
SW
605 BUG_ON(!list_empty(&osd->o_requests));
606 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 607 list_del_init(&osd->o_osd_lru);
f24e9980
SW
608 ceph_con_close(&osd->o_con);
609 put_osd(osd);
610}
611
f5a2041b
YS
612static void __move_osd_to_lru(struct ceph_osd_client *osdc,
613 struct ceph_osd *osd)
614{
615 dout("__move_osd_to_lru %p\n", osd);
616 BUG_ON(!list_empty(&osd->o_osd_lru));
617 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
3d14c5d2 618 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
f5a2041b
YS
619}
620
621static void __remove_osd_from_lru(struct ceph_osd *osd)
622{
623 dout("__remove_osd_from_lru %p\n", osd);
624 if (!list_empty(&osd->o_osd_lru))
625 list_del_init(&osd->o_osd_lru);
626}
627
628static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
629{
630 struct ceph_osd *osd, *nosd;
631
632 dout("__remove_old_osds %p\n", osdc);
633 mutex_lock(&osdc->request_mutex);
634 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
635 if (!remove_all && time_before(jiffies, osd->lru_ttl))
636 break;
637 __remove_osd(osdc, osd);
638 }
639 mutex_unlock(&osdc->request_mutex);
640}
641
f24e9980
SW
642/*
643 * reset osd connect
644 */
f5a2041b 645static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 646{
87b315a5 647 struct ceph_osd_request *req;
f24e9980
SW
648 int ret = 0;
649
f5a2041b 650 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
f24e9980 651 if (list_empty(&osd->o_requests)) {
f5a2041b 652 __remove_osd(osdc, osd);
87b315a5
SW
653 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
654 &osd->o_con.peer_addr,
655 sizeof(osd->o_con.peer_addr)) == 0 &&
656 !ceph_con_opened(&osd->o_con)) {
657 dout(" osd addr hasn't changed and connection never opened,"
658 " letting msgr retry");
659 /* touch each r_stamp for handle_timeout()'s benfit */
660 list_for_each_entry(req, &osd->o_requests, r_osd_item)
661 req->r_stamp = jiffies;
662 ret = -EAGAIN;
f24e9980
SW
663 } else {
664 ceph_con_close(&osd->o_con);
665 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
666 osd->o_incarnation++;
667 }
668 return ret;
669}
670
671static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
672{
673 struct rb_node **p = &osdc->osds.rb_node;
674 struct rb_node *parent = NULL;
675 struct ceph_osd *osd = NULL;
676
677 while (*p) {
678 parent = *p;
679 osd = rb_entry(parent, struct ceph_osd, o_node);
680 if (new->o_osd < osd->o_osd)
681 p = &(*p)->rb_left;
682 else if (new->o_osd > osd->o_osd)
683 p = &(*p)->rb_right;
684 else
685 BUG();
686 }
687
688 rb_link_node(&new->o_node, parent, p);
689 rb_insert_color(&new->o_node, &osdc->osds);
690}
691
692static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
693{
694 struct ceph_osd *osd;
695 struct rb_node *n = osdc->osds.rb_node;
696
697 while (n) {
698 osd = rb_entry(n, struct ceph_osd, o_node);
699 if (o < osd->o_osd)
700 n = n->rb_left;
701 else if (o > osd->o_osd)
702 n = n->rb_right;
703 else
704 return osd;
705 }
706 return NULL;
707}
708
422d2cb8
YS
709static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
710{
711 schedule_delayed_work(&osdc->timeout_work,
3d14c5d2 712 osdc->client->options->osd_keepalive_timeout * HZ);
422d2cb8
YS
713}
714
715static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
716{
717 cancel_delayed_work(&osdc->timeout_work);
718}
f24e9980
SW
719
720/*
721 * Register request, assign tid. If this is the first request, set up
722 * the timeout event.
723 */
724static void register_request(struct ceph_osd_client *osdc,
725 struct ceph_osd_request *req)
726{
f24e9980
SW
727 mutex_lock(&osdc->request_mutex);
728 req->r_tid = ++osdc->last_tid;
6df058c0 729 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
422d2cb8 730 INIT_LIST_HEAD(&req->r_req_lru_item);
f24e9980
SW
731
732 dout("register_request %p tid %lld\n", req, req->r_tid);
733 __insert_request(osdc, req);
734 ceph_osdc_get_request(req);
735 osdc->num_requests++;
736
f24e9980 737 if (osdc->num_requests == 1) {
422d2cb8
YS
738 dout(" first request, scheduling timeout\n");
739 __schedule_osd_timeout(osdc);
f24e9980
SW
740 }
741 mutex_unlock(&osdc->request_mutex);
742}
743
744/*
745 * called under osdc->request_mutex
746 */
747static void __unregister_request(struct ceph_osd_client *osdc,
748 struct ceph_osd_request *req)
749{
750 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
751 rb_erase(&req->r_node, &osdc->requests);
752 osdc->num_requests--;
753
0ba6478d
SW
754 if (req->r_osd) {
755 /* make sure the original request isn't in flight. */
756 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
757
758 list_del_init(&req->r_osd_item);
759 if (list_empty(&req->r_osd->o_requests))
f5a2041b 760 __move_osd_to_lru(osdc, req->r_osd);
0ba6478d
SW
761 req->r_osd = NULL;
762 }
f24e9980
SW
763
764 ceph_osdc_put_request(req);
765
422d2cb8
YS
766 list_del_init(&req->r_req_lru_item);
767 if (osdc->num_requests == 0) {
768 dout(" no requests, canceling timeout\n");
769 __cancel_osd_timeout(osdc);
f24e9980
SW
770 }
771}
772
773/*
774 * Cancel a previously queued request message
775 */
776static void __cancel_request(struct ceph_osd_request *req)
777{
6bc18876 778 if (req->r_sent && req->r_osd) {
f24e9980
SW
779 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
780 req->r_sent = 0;
781 }
422d2cb8 782 list_del_init(&req->r_req_lru_item);
f24e9980
SW
783}
784
785/*
786 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
787 * (as needed), and set the request r_osd appropriately. If there is
788 * no up osd, set r_osd to NULL.
789 *
790 * Return 0 if unchanged, 1 if changed, or negative on error.
791 *
792 * Caller should hold map_sem for read and request_mutex.
793 */
794static int __map_osds(struct ceph_osd_client *osdc,
795 struct ceph_osd_request *req)
796{
797 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 798 struct ceph_pg pgid;
d85b7056
SW
799 int acting[CEPH_PG_MAX_SIZE];
800 int o = -1, num = 0;
f24e9980 801 int err;
f24e9980
SW
802
803 dout("map_osds %p tid %lld\n", req, req->r_tid);
804 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
805 &req->r_file_layout, osdc->osdmap);
806 if (err)
807 return err;
51042122 808 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
809 req->r_pgid = pgid;
810
d85b7056
SW
811 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
812 if (err > 0) {
813 o = acting[0];
814 num = err;
815 }
f24e9980
SW
816
817 if ((req->r_osd && req->r_osd->o_osd == o &&
d85b7056
SW
818 req->r_sent >= req->r_osd->o_incarnation &&
819 req->r_num_pg_osds == num &&
820 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980
SW
821 (req->r_osd == NULL && o == -1))
822 return 0; /* no change */
823
51042122
SW
824 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
825 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
826 req->r_osd ? req->r_osd->o_osd : -1);
827
d85b7056
SW
828 /* record full pg acting set */
829 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
830 req->r_num_pg_osds = num;
831
f24e9980
SW
832 if (req->r_osd) {
833 __cancel_request(req);
834 list_del_init(&req->r_osd_item);
f24e9980
SW
835 req->r_osd = NULL;
836 }
837
838 req->r_osd = __lookup_osd(osdc, o);
839 if (!req->r_osd && o >= 0) {
c99eb1c7
SW
840 err = -ENOMEM;
841 req->r_osd = create_osd(osdc);
842 if (!req->r_osd)
843 goto out;
f24e9980
SW
844
845 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
846 req->r_osd->o_osd = o;
847 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
848 __insert_osd(osdc, req->r_osd);
849
850 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
851 }
852
f5a2041b
YS
853 if (req->r_osd) {
854 __remove_osd_from_lru(req->r_osd);
f24e9980 855 list_add(&req->r_osd_item, &req->r_osd->o_requests);
f5a2041b 856 }
d85b7056 857 err = 1; /* osd or pg changed */
f24e9980
SW
858
859out:
f24e9980
SW
860 return err;
861}
862
863/*
864 * caller should hold map_sem (for read) and request_mutex
865 */
866static int __send_request(struct ceph_osd_client *osdc,
867 struct ceph_osd_request *req)
868{
869 struct ceph_osd_request_head *reqhead;
870 int err;
871
872 err = __map_osds(osdc, req);
873 if (err < 0)
874 return err;
875 if (req->r_osd == NULL) {
876 dout("send_request %p no up osds in pg\n", req);
877 ceph_monc_request_next_osdmap(&osdc->client->monc);
878 return 0;
879 }
880
881 dout("send_request %p tid %llu to osd%d flags %d\n",
882 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
883
884 reqhead = req->r_request->front.iov_base;
885 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
886 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
887 reqhead->reassert_version = req->r_reassert_version;
888
3dd72fc0 889 req->r_stamp = jiffies;
07a27e22 890 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980
SW
891
892 ceph_msg_get(req->r_request); /* send consumes a ref */
893 ceph_con_send(&req->r_osd->o_con, req->r_request);
894 req->r_sent = req->r_osd->o_incarnation;
895 return 0;
896}
897
898/*
899 * Timeout callback, called every N seconds when 1 or more osd
900 * requests has been active for more than N seconds. When this
901 * happens, we ping all OSDs with requests who have timed out to
902 * ensure any communications channel reset is detected. Reset the
903 * request timeouts another N seconds in the future as we go.
904 * Reschedule the timeout event another N seconds in future (unless
905 * there are no open requests).
906 */
907static void handle_timeout(struct work_struct *work)
908{
909 struct ceph_osd_client *osdc =
910 container_of(work, struct ceph_osd_client, timeout_work.work);
422d2cb8 911 struct ceph_osd_request *req, *last_req = NULL;
f24e9980 912 struct ceph_osd *osd;
3d14c5d2 913 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
422d2cb8 914 unsigned long keepalive =
3d14c5d2 915 osdc->client->options->osd_keepalive_timeout * HZ;
3dd72fc0 916 unsigned long last_stamp = 0;
f24e9980 917 struct rb_node *p;
422d2cb8 918 struct list_head slow_osds;
f24e9980
SW
919
920 dout("timeout\n");
921 down_read(&osdc->map_sem);
922
923 ceph_monc_request_next_osdmap(&osdc->client->monc);
924
925 mutex_lock(&osdc->request_mutex);
926 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
927 req = rb_entry(p, struct ceph_osd_request, r_node);
928
929 if (req->r_resend) {
930 int err;
931
932 dout("osdc resending prev failed %lld\n", req->r_tid);
933 err = __send_request(osdc, req);
934 if (err)
935 dout("osdc failed again on %lld\n", req->r_tid);
936 else
937 req->r_resend = false;
938 continue;
939 }
940 }
f24e9980 941
422d2cb8
YS
942 /*
943 * reset osds that appear to be _really_ unresponsive. this
944 * is a failsafe measure.. we really shouldn't be getting to
945 * this point if the system is working properly. the monitors
946 * should mark the osd as failed and we should find out about
947 * it from an updated osd map.
948 */
f26e681d 949 while (timeout && !list_empty(&osdc->req_lru)) {
422d2cb8
YS
950 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
951 r_req_lru_item);
952
3dd72fc0 953 if (time_before(jiffies, req->r_stamp + timeout))
422d2cb8
YS
954 break;
955
3dd72fc0 956 BUG_ON(req == last_req && req->r_stamp == last_stamp);
422d2cb8 957 last_req = req;
3dd72fc0 958 last_stamp = req->r_stamp;
422d2cb8
YS
959
960 osd = req->r_osd;
961 BUG_ON(!osd);
962 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
963 req->r_tid, osd->o_osd);
964 __kick_requests(osdc, osd);
965 }
966
967 /*
968 * ping osds that are a bit slow. this ensures that if there
969 * is a break in the TCP connection we will notice, and reopen
970 * a connection with that osd (from the fault callback).
971 */
972 INIT_LIST_HEAD(&slow_osds);
973 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0 974 if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8
YS
975 break;
976
977 osd = req->r_osd;
978 BUG_ON(!osd);
979 dout(" tid %llu is slow, will send keepalive on osd%d\n",
f24e9980 980 req->r_tid, osd->o_osd);
422d2cb8
YS
981 list_move_tail(&osd->o_keepalive_item, &slow_osds);
982 }
983 while (!list_empty(&slow_osds)) {
984 osd = list_entry(slow_osds.next, struct ceph_osd,
985 o_keepalive_item);
986 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
987 ceph_con_keepalive(&osd->o_con);
988 }
989
422d2cb8 990 __schedule_osd_timeout(osdc);
f24e9980
SW
991 mutex_unlock(&osdc->request_mutex);
992
993 up_read(&osdc->map_sem);
994}
995
f5a2041b
YS
996static void handle_osds_timeout(struct work_struct *work)
997{
998 struct ceph_osd_client *osdc =
999 container_of(work, struct ceph_osd_client,
1000 osds_timeout_work.work);
1001 unsigned long delay =
3d14c5d2 1002 osdc->client->options->osd_idle_ttl * HZ >> 2;
f5a2041b
YS
1003
1004 dout("osds timeout\n");
1005 down_read(&osdc->map_sem);
1006 remove_old_osds(osdc, 0);
1007 up_read(&osdc->map_sem);
1008
1009 schedule_delayed_work(&osdc->osds_timeout_work,
1010 round_jiffies_relative(delay));
1011}
1012
f24e9980
SW
1013/*
1014 * handle osd op reply. either call the callback if it is specified,
1015 * or do the completion to wake up the waiting thread.
1016 */
350b1c32
SW
1017static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1018 struct ceph_connection *con)
f24e9980
SW
1019{
1020 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1021 struct ceph_osd_request *req;
1022 u64 tid;
1023 int numops, object_len, flags;
0ceed5db 1024 s32 result;
f24e9980 1025
6df058c0 1026 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
1027 if (msg->front.iov_len < sizeof(*rhead))
1028 goto bad;
f24e9980
SW
1029 numops = le32_to_cpu(rhead->num_ops);
1030 object_len = le32_to_cpu(rhead->object_len);
0ceed5db 1031 result = le32_to_cpu(rhead->result);
f24e9980
SW
1032 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1033 numops * sizeof(struct ceph_osd_op))
1034 goto bad;
0ceed5db 1035 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
f24e9980
SW
1036
1037 /* lookup */
1038 mutex_lock(&osdc->request_mutex);
1039 req = __lookup_request(osdc, tid);
1040 if (req == NULL) {
1041 dout("handle_reply tid %llu dne\n", tid);
1042 mutex_unlock(&osdc->request_mutex);
1043 return;
1044 }
1045 ceph_osdc_get_request(req);
1046 flags = le32_to_cpu(rhead->flags);
1047
350b1c32 1048 /*
0d59ab81 1049 * if this connection filled our message, drop our reference now, to
350b1c32
SW
1050 * avoid a (safe but slower) revoke later.
1051 */
0d59ab81 1052 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 1053 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 1054 req->r_con_filling_msg = NULL;
350b1c32
SW
1055 ceph_con_put(con);
1056 }
1057
f24e9980
SW
1058 if (!req->r_got_reply) {
1059 unsigned bytes;
1060
1061 req->r_result = le32_to_cpu(rhead->result);
1062 bytes = le32_to_cpu(msg->hdr.data_len);
1063 dout("handle_reply result %d bytes %d\n", req->r_result,
1064 bytes);
1065 if (req->r_result == 0)
1066 req->r_result = bytes;
1067
1068 /* in case this is a write and we need to replay, */
1069 req->r_reassert_version = rhead->reassert_version;
1070
1071 req->r_got_reply = 1;
1072 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1073 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 1074 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
1075 goto done;
1076 }
1077
1078 dout("handle_reply tid %llu flags %d\n", tid, flags);
1079
1080 /* either this is a read, or we got the safe response */
0ceed5db
SW
1081 if (result < 0 ||
1082 (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980
SW
1083 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1084 __unregister_request(osdc, req);
1085
1086 mutex_unlock(&osdc->request_mutex);
1087
1088 if (req->r_callback)
1089 req->r_callback(req, msg);
1090 else
03066f23 1091 complete_all(&req->r_completion);
f24e9980
SW
1092
1093 if (flags & CEPH_OSD_FLAG_ONDISK) {
1094 if (req->r_safe_callback)
1095 req->r_safe_callback(req, msg);
03066f23 1096 complete_all(&req->r_safe_completion); /* fsync waiter */
f24e9980
SW
1097 }
1098
1099done:
1100 ceph_osdc_put_request(req);
1101 return;
1102
1103bad:
1104 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1105 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1106 (int)sizeof(*rhead));
9ec7cab1 1107 ceph_msg_dump(msg);
f24e9980
SW
1108}
1109
1110
422d2cb8 1111static int __kick_requests(struct ceph_osd_client *osdc,
f24e9980
SW
1112 struct ceph_osd *kickosd)
1113{
1114 struct ceph_osd_request *req;
1115 struct rb_node *p, *n;
1116 int needmap = 0;
1117 int err;
1118
1119 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
153a008b 1120 if (kickosd) {
87b315a5
SW
1121 err = __reset_osd(osdc, kickosd);
1122 if (err == -EAGAIN)
1123 return 1;
153a008b 1124 } else {
f24e9980
SW
1125 for (p = rb_first(&osdc->osds); p; p = n) {
1126 struct ceph_osd *osd =
1127 rb_entry(p, struct ceph_osd, o_node);
1128
1129 n = rb_next(p);
1130 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
103e2d3a
SW
1131 memcmp(&osd->o_con.peer_addr,
1132 ceph_osd_addr(osdc->osdmap,
1133 osd->o_osd),
1134 sizeof(struct ceph_entity_addr)) != 0)
f5a2041b 1135 __reset_osd(osdc, osd);
f24e9980
SW
1136 }
1137 }
1138
1139 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1140 req = rb_entry(p, struct ceph_osd_request, r_node);
1141
1142 if (req->r_resend) {
1143 dout(" r_resend set on tid %llu\n", req->r_tid);
266673db 1144 __cancel_request(req);
f24e9980
SW
1145 goto kick;
1146 }
266673db
SW
1147 if (req->r_osd && kickosd == req->r_osd) {
1148 __cancel_request(req);
f24e9980 1149 goto kick;
266673db 1150 }
f24e9980
SW
1151
1152 err = __map_osds(osdc, req);
1153 if (err == 0)
1154 continue; /* no change */
1155 if (err < 0) {
1156 /*
1157 * FIXME: really, we should set the request
1158 * error and fail if this isn't a 'nofail'
1159 * request, but that's a fair bit more
1160 * complicated to do. So retry!
1161 */
1162 dout(" setting r_resend on %llu\n", req->r_tid);
1163 req->r_resend = true;
1164 continue;
1165 }
1166 if (req->r_osd == NULL) {
1167 dout("tid %llu maps to no valid osd\n", req->r_tid);
1168 needmap++; /* request a newer map */
1169 continue;
1170 }
1171
1172kick:
c1ea8823 1173 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
12eadc19 1174 req->r_osd ? req->r_osd->o_osd : -1);
f24e9980
SW
1175 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1176 err = __send_request(osdc, req);
1177 if (err) {
1178 dout(" setting r_resend on %llu\n", req->r_tid);
1179 req->r_resend = true;
1180 }
1181 }
422d2cb8
YS
1182
1183 return needmap;
1184}
1185
1186/*
1187 * Resubmit osd requests whose osd or osd address has changed. Request
1188 * a new osd map if osds are down, or we are otherwise unable to determine
1189 * how to direct a request.
1190 *
1191 * Close connections to down osds.
1192 *
1193 * If @who is specified, resubmit requests for that specific osd.
1194 *
1195 * Caller should hold map_sem for read and request_mutex.
1196 */
1197static void kick_requests(struct ceph_osd_client *osdc,
1198 struct ceph_osd *kickosd)
1199{
1200 int needmap;
1201
1202 mutex_lock(&osdc->request_mutex);
1203 needmap = __kick_requests(osdc, kickosd);
f24e9980
SW
1204 mutex_unlock(&osdc->request_mutex);
1205
1206 if (needmap) {
1207 dout("%d requests for down osds, need new map\n", needmap);
1208 ceph_monc_request_next_osdmap(&osdc->client->monc);
1209 }
f24e9980 1210
422d2cb8 1211}
f24e9980
SW
1212/*
1213 * Process updated osd map.
1214 *
1215 * The message contains any number of incremental and full maps, normally
1216 * indicating some sort of topology change in the cluster. Kick requests
1217 * off to different OSDs as needed.
1218 */
1219void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1220{
1221 void *p, *end, *next;
1222 u32 nr_maps, maplen;
1223 u32 epoch;
1224 struct ceph_osdmap *newmap = NULL, *oldmap;
1225 int err;
1226 struct ceph_fsid fsid;
1227
1228 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1229 p = msg->front.iov_base;
1230 end = p + msg->front.iov_len;
1231
1232 /* verify fsid */
1233 ceph_decode_need(&p, end, sizeof(fsid), bad);
1234 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
1235 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1236 return;
f24e9980
SW
1237
1238 down_write(&osdc->map_sem);
1239
1240 /* incremental maps */
1241 ceph_decode_32_safe(&p, end, nr_maps, bad);
1242 dout(" %d inc maps\n", nr_maps);
1243 while (nr_maps > 0) {
1244 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1245 epoch = ceph_decode_32(&p);
1246 maplen = ceph_decode_32(&p);
f24e9980
SW
1247 ceph_decode_need(&p, end, maplen, bad);
1248 next = p + maplen;
1249 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1250 dout("applying incremental map %u len %d\n",
1251 epoch, maplen);
1252 newmap = osdmap_apply_incremental(&p, next,
1253 osdc->osdmap,
1254 osdc->client->msgr);
1255 if (IS_ERR(newmap)) {
1256 err = PTR_ERR(newmap);
1257 goto bad;
1258 }
30dc6381 1259 BUG_ON(!newmap);
f24e9980
SW
1260 if (newmap != osdc->osdmap) {
1261 ceph_osdmap_destroy(osdc->osdmap);
1262 osdc->osdmap = newmap;
1263 }
1264 } else {
1265 dout("ignoring incremental map %u len %d\n",
1266 epoch, maplen);
1267 }
1268 p = next;
1269 nr_maps--;
1270 }
1271 if (newmap)
1272 goto done;
1273
1274 /* full maps */
1275 ceph_decode_32_safe(&p, end, nr_maps, bad);
1276 dout(" %d full maps\n", nr_maps);
1277 while (nr_maps) {
1278 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1279 epoch = ceph_decode_32(&p);
1280 maplen = ceph_decode_32(&p);
f24e9980
SW
1281 ceph_decode_need(&p, end, maplen, bad);
1282 if (nr_maps > 1) {
1283 dout("skipping non-latest full map %u len %d\n",
1284 epoch, maplen);
1285 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1286 dout("skipping full map %u len %d, "
1287 "older than our %u\n", epoch, maplen,
1288 osdc->osdmap->epoch);
1289 } else {
1290 dout("taking full map %u len %d\n", epoch, maplen);
1291 newmap = osdmap_decode(&p, p+maplen);
1292 if (IS_ERR(newmap)) {
1293 err = PTR_ERR(newmap);
1294 goto bad;
1295 }
30dc6381 1296 BUG_ON(!newmap);
f24e9980
SW
1297 oldmap = osdc->osdmap;
1298 osdc->osdmap = newmap;
1299 if (oldmap)
1300 ceph_osdmap_destroy(oldmap);
1301 }
1302 p += maplen;
1303 nr_maps--;
1304 }
1305
1306done:
1307 downgrade_write(&osdc->map_sem);
1308 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1309 if (newmap)
1310 kick_requests(osdc, NULL);
1311 up_read(&osdc->map_sem);
03066f23 1312 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
1313 return;
1314
1315bad:
1316 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1317 ceph_msg_dump(msg);
f24e9980
SW
1318 up_write(&osdc->map_sem);
1319 return;
1320}
1321
f24e9980
SW
1322/*
1323 * Register request, send initial attempt.
1324 */
1325int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1326 struct ceph_osd_request *req,
1327 bool nofail)
1328{
c1ea8823 1329 int rc = 0;
f24e9980
SW
1330
1331 req->r_request->pages = req->r_pages;
1332 req->r_request->nr_pages = req->r_num_pages;
68b4476b
YS
1333#ifdef CONFIG_BLOCK
1334 req->r_request->bio = req->r_bio;
1335#endif
1336 req->r_request->trail = req->r_trail;
f24e9980
SW
1337
1338 register_request(osdc, req);
1339
1340 down_read(&osdc->map_sem);
1341 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1342 /*
1343 * a racing kick_requests() may have sent the message for us
1344 * while we dropped request_mutex above, so only send now if
1345 * the request still han't been touched yet.
1346 */
1347 if (req->r_sent == 0) {
1348 rc = __send_request(osdc, req);
1349 if (rc) {
1350 if (nofail) {
1351 dout("osdc_start_request failed send, "
1352 " marking %lld\n", req->r_tid);
1353 req->r_resend = true;
1354 rc = 0;
1355 } else {
1356 __unregister_request(osdc, req);
1357 }
f24e9980
SW
1358 }
1359 }
1360 mutex_unlock(&osdc->request_mutex);
1361 up_read(&osdc->map_sem);
1362 return rc;
1363}
3d14c5d2 1364EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980
SW
1365
1366/*
1367 * wait for a request to complete
1368 */
1369int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1370 struct ceph_osd_request *req)
1371{
1372 int rc;
1373
1374 rc = wait_for_completion_interruptible(&req->r_completion);
1375 if (rc < 0) {
1376 mutex_lock(&osdc->request_mutex);
1377 __cancel_request(req);
529cfcc4 1378 __unregister_request(osdc, req);
f24e9980 1379 mutex_unlock(&osdc->request_mutex);
529cfcc4 1380 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1381 return rc;
1382 }
1383
1384 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1385 return req->r_result;
1386}
3d14c5d2 1387EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980
SW
1388
1389/*
1390 * sync - wait for all in-flight requests to flush. avoid starvation.
1391 */
1392void ceph_osdc_sync(struct ceph_osd_client *osdc)
1393{
1394 struct ceph_osd_request *req;
1395 u64 last_tid, next_tid = 0;
1396
1397 mutex_lock(&osdc->request_mutex);
1398 last_tid = osdc->last_tid;
1399 while (1) {
1400 req = __lookup_request_ge(osdc, next_tid);
1401 if (!req)
1402 break;
1403 if (req->r_tid > last_tid)
1404 break;
1405
1406 next_tid = req->r_tid + 1;
1407 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1408 continue;
1409
1410 ceph_osdc_get_request(req);
1411 mutex_unlock(&osdc->request_mutex);
1412 dout("sync waiting on tid %llu (last is %llu)\n",
1413 req->r_tid, last_tid);
1414 wait_for_completion(&req->r_safe_completion);
1415 mutex_lock(&osdc->request_mutex);
1416 ceph_osdc_put_request(req);
1417 }
1418 mutex_unlock(&osdc->request_mutex);
1419 dout("sync done (thru tid %llu)\n", last_tid);
1420}
3d14c5d2 1421EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980
SW
1422
1423/*
1424 * init, shutdown
1425 */
1426int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1427{
1428 int err;
1429
1430 dout("init\n");
1431 osdc->client = client;
1432 osdc->osdmap = NULL;
1433 init_rwsem(&osdc->map_sem);
1434 init_completion(&osdc->map_waiters);
1435 osdc->last_requested_map = 0;
1436 mutex_init(&osdc->request_mutex);
f24e9980
SW
1437 osdc->last_tid = 0;
1438 osdc->osds = RB_ROOT;
f5a2041b 1439 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980 1440 osdc->requests = RB_ROOT;
422d2cb8 1441 INIT_LIST_HEAD(&osdc->req_lru);
f24e9980
SW
1442 osdc->num_requests = 0;
1443 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
1444 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1445
1446 schedule_delayed_work(&osdc->osds_timeout_work,
3d14c5d2 1447 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
f24e9980 1448
5f44f142 1449 err = -ENOMEM;
f24e9980
SW
1450 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1451 sizeof(struct ceph_osd_request));
1452 if (!osdc->req_mempool)
5f44f142 1453 goto out;
f24e9980 1454
4f48280e
SW
1455 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1456 "osd_op");
f24e9980 1457 if (err < 0)
5f44f142 1458 goto out_mempool;
c16e7869 1459 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
4f48280e
SW
1460 OSD_OPREPLY_FRONT_LEN, 10, true,
1461 "osd_op_reply");
c16e7869
SW
1462 if (err < 0)
1463 goto out_msgpool;
f24e9980 1464 return 0;
5f44f142 1465
c16e7869
SW
1466out_msgpool:
1467 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1468out_mempool:
1469 mempool_destroy(osdc->req_mempool);
1470out:
1471 return err;
f24e9980 1472}
3d14c5d2 1473EXPORT_SYMBOL(ceph_osdc_init);
f24e9980
SW
1474
1475void ceph_osdc_stop(struct ceph_osd_client *osdc)
1476{
1477 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1478 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1479 if (osdc->osdmap) {
1480 ceph_osdmap_destroy(osdc->osdmap);
1481 osdc->osdmap = NULL;
1482 }
f5a2041b 1483 remove_old_osds(osdc, 1);
f24e9980
SW
1484 mempool_destroy(osdc->req_mempool);
1485 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1486 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980 1487}
3d14c5d2 1488EXPORT_SYMBOL(ceph_osdc_stop);
f24e9980
SW
1489
1490/*
1491 * Read some contiguous pages. If we cross a stripe boundary, shorten
1492 * *plen. Return number of bytes read, or error.
1493 */
1494int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1495 struct ceph_vino vino, struct ceph_file_layout *layout,
1496 u64 off, u64 *plen,
1497 u32 truncate_seq, u64 truncate_size,
b7495fc2 1498 struct page **pages, int num_pages, int page_align)
f24e9980
SW
1499{
1500 struct ceph_osd_request *req;
1501 int rc = 0;
1502
1503 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1504 vino.snap, off, *plen);
1505 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1506 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1507 NULL, 0, truncate_seq, truncate_size, NULL,
b7495fc2 1508 false, 1, page_align);
a79832f2
SW
1509 if (!req)
1510 return -ENOMEM;
f24e9980
SW
1511
1512 /* it may be a short read due to an object boundary */
1513 req->r_pages = pages;
f24e9980 1514
b7495fc2
SW
1515 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1516 off, *plen, req->r_num_pages, page_align);
f24e9980
SW
1517
1518 rc = ceph_osdc_start_request(osdc, req, false);
1519 if (!rc)
1520 rc = ceph_osdc_wait_request(osdc, req);
1521
1522 ceph_osdc_put_request(req);
1523 dout("readpages result %d\n", rc);
1524 return rc;
1525}
3d14c5d2 1526EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980
SW
1527
1528/*
1529 * do a synchronous write on N pages
1530 */
1531int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1532 struct ceph_file_layout *layout,
1533 struct ceph_snap_context *snapc,
1534 u64 off, u64 len,
1535 u32 truncate_seq, u64 truncate_size,
1536 struct timespec *mtime,
1537 struct page **pages, int num_pages,
1538 int flags, int do_sync, bool nofail)
1539{
1540 struct ceph_osd_request *req;
1541 int rc = 0;
b7495fc2 1542 int page_align = off & ~PAGE_MASK;
f24e9980
SW
1543
1544 BUG_ON(vino.snap != CEPH_NOSNAP);
1545 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1546 CEPH_OSD_OP_WRITE,
1547 flags | CEPH_OSD_FLAG_ONDISK |
1548 CEPH_OSD_FLAG_WRITE,
1549 snapc, do_sync,
1550 truncate_seq, truncate_size, mtime,
b7495fc2 1551 nofail, 1, page_align);
a79832f2
SW
1552 if (!req)
1553 return -ENOMEM;
f24e9980
SW
1554
1555 /* it may be a short write due to an object boundary */
1556 req->r_pages = pages;
f24e9980
SW
1557 dout("writepages %llu~%llu (%d pages)\n", off, len,
1558 req->r_num_pages);
1559
1560 rc = ceph_osdc_start_request(osdc, req, nofail);
1561 if (!rc)
1562 rc = ceph_osdc_wait_request(osdc, req);
1563
1564 ceph_osdc_put_request(req);
1565 if (rc == 0)
1566 rc = len;
1567 dout("writepages result %d\n", rc);
1568 return rc;
1569}
3d14c5d2 1570EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980
SW
1571
1572/*
1573 * handle incoming message
1574 */
1575static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1576{
1577 struct ceph_osd *osd = con->private;
32c895e7 1578 struct ceph_osd_client *osdc;
f24e9980
SW
1579 int type = le16_to_cpu(msg->hdr.type);
1580
1581 if (!osd)
4a32f93d 1582 goto out;
32c895e7 1583 osdc = osd->o_osdc;
f24e9980
SW
1584
1585 switch (type) {
1586 case CEPH_MSG_OSD_MAP:
1587 ceph_osdc_handle_map(osdc, msg);
1588 break;
1589 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1590 handle_reply(osdc, msg, con);
f24e9980
SW
1591 break;
1592
1593 default:
1594 pr_err("received unknown message type %d %s\n", type,
1595 ceph_msg_type_name(type));
1596 }
4a32f93d 1597out:
f24e9980
SW
1598 ceph_msg_put(msg);
1599}
1600
5b3a4db3 1601/*
21b667f6
SW
1602 * lookup and return message for incoming reply. set up reply message
1603 * pages.
5b3a4db3
SW
1604 */
1605static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1606 struct ceph_msg_header *hdr,
1607 int *skip)
f24e9980
SW
1608{
1609 struct ceph_osd *osd = con->private;
1610 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1611 struct ceph_msg *m;
0547a9b3 1612 struct ceph_osd_request *req;
5b3a4db3
SW
1613 int front = le32_to_cpu(hdr->front_len);
1614 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3 1615 u64 tid;
f24e9980 1616
0547a9b3
YS
1617 tid = le64_to_cpu(hdr->tid);
1618 mutex_lock(&osdc->request_mutex);
1619 req = __lookup_request(osdc, tid);
1620 if (!req) {
1621 *skip = 1;
1622 m = NULL;
c16e7869 1623 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
5b3a4db3 1624 osd->o_osd);
0547a9b3
YS
1625 goto out;
1626 }
c16e7869
SW
1627
1628 if (req->r_con_filling_msg) {
1629 dout("get_reply revoking msg %p from old con %p\n",
1630 req->r_reply, req->r_con_filling_msg);
1631 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1632 ceph_con_put(req->r_con_filling_msg);
6f46cb29 1633 req->r_con_filling_msg = NULL;
0547a9b3
YS
1634 }
1635
c16e7869
SW
1636 if (front > req->r_reply->front.iov_len) {
1637 pr_warning("get_reply front %d > preallocated %d\n",
1638 front, (int)req->r_reply->front.iov_len);
34d23762 1639 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
a79832f2 1640 if (!m)
c16e7869
SW
1641 goto out;
1642 ceph_msg_put(req->r_reply);
1643 req->r_reply = m;
1644 }
1645 m = ceph_msg_get(req->r_reply);
1646
0547a9b3 1647 if (data_len > 0) {
b7495fc2 1648 int want = calc_pages_for(req->r_page_alignment, data_len);
21b667f6
SW
1649
1650 if (unlikely(req->r_num_pages < want)) {
1651 pr_warning("tid %lld reply %d > expected %d pages\n",
1652 tid, want, m->nr_pages);
0547a9b3
YS
1653 *skip = 1;
1654 ceph_msg_put(m);
a79832f2 1655 m = NULL;
21b667f6 1656 goto out;
0547a9b3 1657 }
21b667f6
SW
1658 m->pages = req->r_pages;
1659 m->nr_pages = req->r_num_pages;
68b4476b
YS
1660#ifdef CONFIG_BLOCK
1661 m->bio = req->r_bio;
1662#endif
0547a9b3 1663 }
5b3a4db3 1664 *skip = 0;
c16e7869
SW
1665 req->r_con_filling_msg = ceph_con_get(con);
1666 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
1667
1668out:
1669 mutex_unlock(&osdc->request_mutex);
2450418c 1670 return m;
5b3a4db3
SW
1671
1672}
1673
1674static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1675 struct ceph_msg_header *hdr,
1676 int *skip)
1677{
1678 struct ceph_osd *osd = con->private;
1679 int type = le16_to_cpu(hdr->type);
1680 int front = le32_to_cpu(hdr->front_len);
1681
1682 switch (type) {
1683 case CEPH_MSG_OSD_MAP:
34d23762 1684 return ceph_msg_new(type, front, GFP_NOFS);
5b3a4db3
SW
1685 case CEPH_MSG_OSD_OPREPLY:
1686 return get_reply(con, hdr, skip);
1687 default:
1688 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1689 osd->o_osd);
1690 *skip = 1;
1691 return NULL;
1692 }
f24e9980
SW
1693}
1694
1695/*
1696 * Wrappers to refcount containing ceph_osd struct
1697 */
1698static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1699{
1700 struct ceph_osd *osd = con->private;
1701 if (get_osd(osd))
1702 return con;
1703 return NULL;
1704}
1705
1706static void put_osd_con(struct ceph_connection *con)
1707{
1708 struct ceph_osd *osd = con->private;
1709 put_osd(osd);
1710}
1711
4e7a5dcd
SW
1712/*
1713 * authentication
1714 */
1715static int get_authorizer(struct ceph_connection *con,
213c99ee
SW
1716 void **buf, int *len, int *proto,
1717 void **reply_buf, int *reply_len, int force_new)
4e7a5dcd
SW
1718{
1719 struct ceph_osd *o = con->private;
1720 struct ceph_osd_client *osdc = o->o_osdc;
1721 struct ceph_auth_client *ac = osdc->client->monc.auth;
1722 int ret = 0;
1723
1724 if (force_new && o->o_authorizer) {
1725 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1726 o->o_authorizer = NULL;
1727 }
1728 if (o->o_authorizer == NULL) {
1729 ret = ac->ops->create_authorizer(
1730 ac, CEPH_ENTITY_TYPE_OSD,
1731 &o->o_authorizer,
1732 &o->o_authorizer_buf,
1733 &o->o_authorizer_buf_len,
1734 &o->o_authorizer_reply_buf,
1735 &o->o_authorizer_reply_buf_len);
1736 if (ret)
213c99ee 1737 return ret;
4e7a5dcd
SW
1738 }
1739
1740 *proto = ac->protocol;
1741 *buf = o->o_authorizer_buf;
1742 *len = o->o_authorizer_buf_len;
1743 *reply_buf = o->o_authorizer_reply_buf;
1744 *reply_len = o->o_authorizer_reply_buf_len;
1745 return 0;
1746}
1747
1748
1749static int verify_authorizer_reply(struct ceph_connection *con, int len)
1750{
1751 struct ceph_osd *o = con->private;
1752 struct ceph_osd_client *osdc = o->o_osdc;
1753 struct ceph_auth_client *ac = osdc->client->monc.auth;
1754
1755 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1756}
1757
9bd2e6f8
SW
1758static int invalidate_authorizer(struct ceph_connection *con)
1759{
1760 struct ceph_osd *o = con->private;
1761 struct ceph_osd_client *osdc = o->o_osdc;
1762 struct ceph_auth_client *ac = osdc->client->monc.auth;
1763
1764 if (ac->ops->invalidate_authorizer)
1765 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1766
1767 return ceph_monc_validate_auth(&osdc->client->monc);
1768}
4e7a5dcd 1769
9e32789f 1770static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
1771 .get = get_osd_con,
1772 .put = put_osd_con,
1773 .dispatch = dispatch,
4e7a5dcd
SW
1774 .get_authorizer = get_authorizer,
1775 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 1776 .invalidate_authorizer = invalidate_authorizer,
f24e9980 1777 .alloc_msg = alloc_msg,
81b024e7 1778 .fault = osd_reset,
f24e9980 1779};