1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/mempool.h>
15
16 #include "rtrs-srv.h"
17 #include "rtrs-log.h"
18 #include <rdma/ib_cm.h>
19 #include <rdma/ib_verbs.h>
20
21 MODULE_DESCRIPTION("RDMA Transport Server");
22 MODULE_LICENSE("GPL");
23
24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
26 #define DEFAULT_SESS_QUEUE_DEPTH 512
27 #define MAX_HDR_SIZE PAGE_SIZE
28
29 /* We guarantee to serve 10 paths at least */
30 #define CHUNK_POOL_SZ 10
31
32 static struct rtrs_rdma_dev_pd dev_pd;
33 static mempool_t *chunk_pool;
34 struct class *rtrs_dev_class;
35 static struct rtrs_srv_ib_ctx ib_ctx;
36
37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
39
40 static bool always_invalidate = true;
41 module_param(always_invalidate, bool, 0444);
42 MODULE_PARM_DESC(always_invalidate,
43 "Invalidate memory registration for contiguous memory regions before accessing.");
44
45 module_param_named(max_chunk_size, max_chunk_size, int, 0444);
46 MODULE_PARM_DESC(max_chunk_size,
47 "Max size for each IO request, when change the unit is in byte (default: "
48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
49
50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
51 MODULE_PARM_DESC(sess_queue_depth,
52 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
55
56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
57
58 static struct workqueue_struct *rtrs_wq;
59
to_srv_con(struct rtrs_con * c)60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
61 {
62 return container_of(c, struct rtrs_srv_con, c);
63 }
64
to_srv_sess(struct rtrs_sess * s)65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s)
66 {
67 return container_of(s, struct rtrs_srv_sess, s);
68 }
69
rtrs_srv_change_state(struct rtrs_srv_sess * sess,enum rtrs_srv_state new_state)70 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess,
71 enum rtrs_srv_state new_state)
72 {
73 enum rtrs_srv_state old_state;
74 bool changed = false;
75
76 spin_lock_irq(&sess->state_lock);
77 old_state = sess->state;
78 switch (new_state) {
79 case RTRS_SRV_CONNECTED:
80 if (old_state == RTRS_SRV_CONNECTING)
81 changed = true;
82 break;
83 case RTRS_SRV_CLOSING:
84 if (old_state == RTRS_SRV_CONNECTING ||
85 old_state == RTRS_SRV_CONNECTED)
86 changed = true;
87 break;
88 case RTRS_SRV_CLOSED:
89 if (old_state == RTRS_SRV_CLOSING)
90 changed = true;
91 break;
92 default:
93 break;
94 }
95 if (changed)
96 sess->state = new_state;
97 spin_unlock_irq(&sess->state_lock);
98
99 return changed;
100 }
101
free_id(struct rtrs_srv_op * id)102 static void free_id(struct rtrs_srv_op *id)
103 {
104 if (!id)
105 return;
106 kfree(id);
107 }
108
rtrs_srv_free_ops_ids(struct rtrs_srv_sess * sess)109 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess)
110 {
111 struct rtrs_srv *srv = sess->srv;
112 int i;
113
114 if (sess->ops_ids) {
115 for (i = 0; i < srv->queue_depth; i++)
116 free_id(sess->ops_ids[i]);
117 kfree(sess->ops_ids);
118 sess->ops_ids = NULL;
119 }
120 }
121
122 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
123
124 static struct ib_cqe io_comp_cqe = {
125 .done = rtrs_srv_rdma_done
126 };
127
rtrs_srv_inflight_ref_release(struct percpu_ref * ref)128 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
129 {
130 struct rtrs_srv_sess *sess = container_of(ref, struct rtrs_srv_sess, ids_inflight_ref);
131
132 percpu_ref_exit(&sess->ids_inflight_ref);
133 complete(&sess->complete_done);
134 }
135
rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess * sess)136 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess)
137 {
138 struct rtrs_srv *srv = sess->srv;
139 struct rtrs_srv_op *id;
140 int i, ret;
141
142 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids),
143 GFP_KERNEL);
144 if (!sess->ops_ids)
145 goto err;
146
147 for (i = 0; i < srv->queue_depth; ++i) {
148 id = kzalloc(sizeof(*id), GFP_KERNEL);
149 if (!id)
150 goto err;
151
152 sess->ops_ids[i] = id;
153 }
154
155 ret = percpu_ref_init(&sess->ids_inflight_ref,
156 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
157 if (ret) {
158 pr_err("Percpu reference init failed\n");
159 goto err;
160 }
161 init_completion(&sess->complete_done);
162
163 return 0;
164
165 err:
166 rtrs_srv_free_ops_ids(sess);
167 return -ENOMEM;
168 }
169
rtrs_srv_get_ops_ids(struct rtrs_srv_sess * sess)170 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess)
171 {
172 percpu_ref_get(&sess->ids_inflight_ref);
173 }
174
rtrs_srv_put_ops_ids(struct rtrs_srv_sess * sess)175 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess)
176 {
177 percpu_ref_put(&sess->ids_inflight_ref);
178 }
179
rtrs_srv_reg_mr_done(struct ib_cq * cq,struct ib_wc * wc)180 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
181 {
182 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
183 struct rtrs_sess *s = con->c.sess;
184 struct rtrs_srv_sess *sess = to_srv_sess(s);
185
186 if (wc->status != IB_WC_SUCCESS) {
187 rtrs_err(s, "REG MR failed: %s\n",
188 ib_wc_status_msg(wc->status));
189 close_sess(sess);
190 return;
191 }
192 }
193
194 static struct ib_cqe local_reg_cqe = {
195 .done = rtrs_srv_reg_mr_done
196 };
197
rdma_write_sg(struct rtrs_srv_op * id)198 static int rdma_write_sg(struct rtrs_srv_op *id)
199 {
200 struct rtrs_sess *s = id->con->c.sess;
201 struct rtrs_srv_sess *sess = to_srv_sess(s);
202 dma_addr_t dma_addr = sess->dma_addr[id->msg_id];
203 struct rtrs_srv_mr *srv_mr;
204 struct ib_send_wr inv_wr;
205 struct ib_rdma_wr imm_wr;
206 struct ib_rdma_wr *wr = NULL;
207 enum ib_send_flags flags;
208 size_t sg_cnt;
209 int err, offset;
210 bool need_inval;
211 u32 rkey = 0;
212 struct ib_reg_wr rwr;
213 struct ib_sge *plist;
214 struct ib_sge list;
215
216 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
217 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
218 if (sg_cnt != 1)
219 return -EINVAL;
220
221 offset = 0;
222
223 wr = &id->tx_wr;
224 plist = &id->tx_sg;
225 plist->addr = dma_addr + offset;
226 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
227
228 /* WR will fail with length error
229 * if this is 0
230 */
231 if (plist->length == 0) {
232 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
233 return -EINVAL;
234 }
235
236 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey;
237 offset += plist->length;
238
239 wr->wr.sg_list = plist;
240 wr->wr.num_sge = 1;
241 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
242 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
243 if (rkey == 0)
244 rkey = wr->rkey;
245 else
246 /* Only one key is actually used */
247 WARN_ON_ONCE(rkey != wr->rkey);
248
249 wr->wr.opcode = IB_WR_RDMA_WRITE;
250 wr->wr.wr_cqe = &io_comp_cqe;
251 wr->wr.ex.imm_data = 0;
252 wr->wr.send_flags = 0;
253
254 if (need_inval && always_invalidate) {
255 wr->wr.next = &rwr.wr;
256 rwr.wr.next = &inv_wr;
257 inv_wr.next = &imm_wr.wr;
258 } else if (always_invalidate) {
259 wr->wr.next = &rwr.wr;
260 rwr.wr.next = &imm_wr.wr;
261 } else if (need_inval) {
262 wr->wr.next = &inv_wr;
263 inv_wr.next = &imm_wr.wr;
264 } else {
265 wr->wr.next = &imm_wr.wr;
266 }
267 /*
268 * From time to time we have to post signaled sends,
269 * or send queue will fill up and only QP reset can help.
270 */
271 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
272 0 : IB_SEND_SIGNALED;
273
274 if (need_inval) {
275 inv_wr.sg_list = NULL;
276 inv_wr.num_sge = 0;
277 inv_wr.opcode = IB_WR_SEND_WITH_INV;
278 inv_wr.wr_cqe = &io_comp_cqe;
279 inv_wr.send_flags = 0;
280 inv_wr.ex.invalidate_rkey = rkey;
281 }
282
283 imm_wr.wr.next = NULL;
284 if (always_invalidate) {
285 struct rtrs_msg_rkey_rsp *msg;
286
287 srv_mr = &sess->mrs[id->msg_id];
288 rwr.wr.opcode = IB_WR_REG_MR;
289 rwr.wr.wr_cqe = &local_reg_cqe;
290 rwr.wr.num_sge = 0;
291 rwr.mr = srv_mr->mr;
292 rwr.wr.send_flags = 0;
293 rwr.key = srv_mr->mr->rkey;
294 rwr.access = (IB_ACCESS_LOCAL_WRITE |
295 IB_ACCESS_REMOTE_WRITE);
296 msg = srv_mr->iu->buf;
297 msg->buf_id = cpu_to_le16(id->msg_id);
298 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
299 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
300
301 list.addr = srv_mr->iu->dma_addr;
302 list.length = sizeof(*msg);
303 list.lkey = sess->s.dev->ib_pd->local_dma_lkey;
304 imm_wr.wr.sg_list = &list;
305 imm_wr.wr.num_sge = 1;
306 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
307 ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
308 srv_mr->iu->dma_addr,
309 srv_mr->iu->size, DMA_TO_DEVICE);
310 } else {
311 imm_wr.wr.sg_list = NULL;
312 imm_wr.wr.num_sge = 0;
313 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
314 }
315 imm_wr.wr.send_flags = flags;
316 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
317 0, need_inval));
318
319 imm_wr.wr.wr_cqe = &io_comp_cqe;
320 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr,
321 offset, DMA_BIDIRECTIONAL);
322
323 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
324 if (err)
325 rtrs_err(s,
326 "Posting RDMA-Write-Request to QP failed, err: %d\n",
327 err);
328
329 return err;
330 }
331
332 /**
333 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
334 * requests or on successful WRITE request.
335 * @con: the connection to send back result
336 * @id: the id associated with the IO
337 * @errno: the error number of the IO.
338 *
339 * Return 0 on success, errno otherwise.
340 */
send_io_resp_imm(struct rtrs_srv_con * con,struct rtrs_srv_op * id,int errno)341 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
342 int errno)
343 {
344 struct rtrs_sess *s = con->c.sess;
345 struct rtrs_srv_sess *sess = to_srv_sess(s);
346 struct ib_send_wr inv_wr, *wr = NULL;
347 struct ib_rdma_wr imm_wr;
348 struct ib_reg_wr rwr;
349 struct rtrs_srv_mr *srv_mr;
350 bool need_inval = false;
351 enum ib_send_flags flags;
352 u32 imm;
353 int err;
354
355 if (id->dir == READ) {
356 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
357 size_t sg_cnt;
358
359 need_inval = le16_to_cpu(rd_msg->flags) &
360 RTRS_MSG_NEED_INVAL_F;
361 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
362
363 if (need_inval) {
364 if (sg_cnt) {
365 inv_wr.wr_cqe = &io_comp_cqe;
366 inv_wr.sg_list = NULL;
367 inv_wr.num_sge = 0;
368 inv_wr.opcode = IB_WR_SEND_WITH_INV;
369 inv_wr.send_flags = 0;
370 /* Only one key is actually used */
371 inv_wr.ex.invalidate_rkey =
372 le32_to_cpu(rd_msg->desc[0].key);
373 } else {
374 WARN_ON_ONCE(1);
375 need_inval = false;
376 }
377 }
378 }
379
380 if (need_inval && always_invalidate) {
381 wr = &inv_wr;
382 inv_wr.next = &rwr.wr;
383 rwr.wr.next = &imm_wr.wr;
384 } else if (always_invalidate) {
385 wr = &rwr.wr;
386 rwr.wr.next = &imm_wr.wr;
387 } else if (need_inval) {
388 wr = &inv_wr;
389 inv_wr.next = &imm_wr.wr;
390 } else {
391 wr = &imm_wr.wr;
392 }
393 /*
394 * From time to time we have to post signalled sends,
395 * or send queue will fill up and only QP reset can help.
396 */
397 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
398 0 : IB_SEND_SIGNALED;
399 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
400 imm_wr.wr.next = NULL;
401 if (always_invalidate) {
402 struct ib_sge list;
403 struct rtrs_msg_rkey_rsp *msg;
404
405 srv_mr = &sess->mrs[id->msg_id];
406 rwr.wr.next = &imm_wr.wr;
407 rwr.wr.opcode = IB_WR_REG_MR;
408 rwr.wr.wr_cqe = &local_reg_cqe;
409 rwr.wr.num_sge = 0;
410 rwr.wr.send_flags = 0;
411 rwr.mr = srv_mr->mr;
412 rwr.key = srv_mr->mr->rkey;
413 rwr.access = (IB_ACCESS_LOCAL_WRITE |
414 IB_ACCESS_REMOTE_WRITE);
415 msg = srv_mr->iu->buf;
416 msg->buf_id = cpu_to_le16(id->msg_id);
417 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
418 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
419
420 list.addr = srv_mr->iu->dma_addr;
421 list.length = sizeof(*msg);
422 list.lkey = sess->s.dev->ib_pd->local_dma_lkey;
423 imm_wr.wr.sg_list = &list;
424 imm_wr.wr.num_sge = 1;
425 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
426 ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
427 srv_mr->iu->dma_addr,
428 srv_mr->iu->size, DMA_TO_DEVICE);
429 } else {
430 imm_wr.wr.sg_list = NULL;
431 imm_wr.wr.num_sge = 0;
432 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
433 }
434 imm_wr.wr.send_flags = flags;
435 imm_wr.wr.wr_cqe = &io_comp_cqe;
436
437 imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
438
439 err = ib_post_send(id->con->c.qp, wr, NULL);
440 if (err)
441 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
442 err);
443
444 return err;
445 }
446
close_sess(struct rtrs_srv_sess * sess)447 void close_sess(struct rtrs_srv_sess *sess)
448 {
449 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING))
450 queue_work(rtrs_wq, &sess->close_work);
451 WARN_ON(sess->state != RTRS_SRV_CLOSING);
452 }
453
rtrs_srv_state_str(enum rtrs_srv_state state)454 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
455 {
456 switch (state) {
457 case RTRS_SRV_CONNECTING:
458 return "RTRS_SRV_CONNECTING";
459 case RTRS_SRV_CONNECTED:
460 return "RTRS_SRV_CONNECTED";
461 case RTRS_SRV_CLOSING:
462 return "RTRS_SRV_CLOSING";
463 case RTRS_SRV_CLOSED:
464 return "RTRS_SRV_CLOSED";
465 default:
466 return "UNKNOWN";
467 }
468 }
469
470 /**
471 * rtrs_srv_resp_rdma() - Finish an RDMA request
472 *
473 * @id: Internal RTRS operation identifier
474 * @status: Response Code sent to the other side for this operation.
475 * 0 = success, <=0 error
476 * Context: any
477 *
478 * Finish a RDMA operation. A message is sent to the client and the
479 * corresponding memory areas will be released.
480 */
rtrs_srv_resp_rdma(struct rtrs_srv_op * id,int status)481 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
482 {
483 struct rtrs_srv_sess *sess;
484 struct rtrs_srv_con *con;
485 struct rtrs_sess *s;
486 int err;
487
488 if (WARN_ON(!id))
489 return true;
490
491 con = id->con;
492 s = con->c.sess;
493 sess = to_srv_sess(s);
494
495 id->status = status;
496
497 if (sess->state != RTRS_SRV_CONNECTED) {
498 rtrs_err_rl(s,
499 "Sending I/O response failed, session %s is disconnected, sess state %s\n",
500 kobject_name(&sess->kobj),
501 rtrs_srv_state_str(sess->state));
502 goto out;
503 }
504 if (always_invalidate) {
505 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id];
506
507 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
508 }
509 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
510 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n",
511 kobject_name(&sess->kobj),
512 con->c.cid);
513 atomic_add(1, &con->c.sq_wr_avail);
514 spin_lock(&con->rsp_wr_wait_lock);
515 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
516 spin_unlock(&con->rsp_wr_wait_lock);
517 return false;
518 }
519
520 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
521 err = send_io_resp_imm(con, id, status);
522 else
523 err = rdma_write_sg(id);
524
525 if (err) {
526 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err,
527 kobject_name(&sess->kobj));
528 close_sess(sess);
529 }
530 out:
531 rtrs_srv_put_ops_ids(sess);
532 return true;
533 }
534 EXPORT_SYMBOL(rtrs_srv_resp_rdma);
535
536 /**
537 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
538 * @srv: Session pointer
539 * @priv: The private pointer that is associated with the session.
540 */
rtrs_srv_set_sess_priv(struct rtrs_srv * srv,void * priv)541 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
542 {
543 srv->priv = priv;
544 }
545 EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
546
unmap_cont_bufs(struct rtrs_srv_sess * sess)547 static void unmap_cont_bufs(struct rtrs_srv_sess *sess)
548 {
549 int i;
550
551 for (i = 0; i < sess->mrs_num; i++) {
552 struct rtrs_srv_mr *srv_mr;
553
554 srv_mr = &sess->mrs[i];
555 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1);
556 ib_dereg_mr(srv_mr->mr);
557 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl,
558 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
559 sg_free_table(&srv_mr->sgt);
560 }
561 kfree(sess->mrs);
562 }
563
map_cont_bufs(struct rtrs_srv_sess * sess)564 static int map_cont_bufs(struct rtrs_srv_sess *sess)
565 {
566 struct rtrs_srv *srv = sess->srv;
567 struct rtrs_sess *ss = &sess->s;
568 int i, mri, err, mrs_num;
569 unsigned int chunk_bits;
570 int chunks_per_mr = 1;
571
572 /*
573 * Here we map queue_depth chunks to MR. Firstly we have to
574 * figure out how many chunks can we map per MR.
575 */
576 if (always_invalidate) {
577 /*
578 * in order to do invalidate for each chunks of memory, we needs
579 * more memory regions.
580 */
581 mrs_num = srv->queue_depth;
582 } else {
583 chunks_per_mr =
584 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
585 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
586 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
587 }
588
589 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL);
590 if (!sess->mrs)
591 return -ENOMEM;
592
593 sess->mrs_num = mrs_num;
594
595 for (mri = 0; mri < mrs_num; mri++) {
596 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri];
597 struct sg_table *sgt = &srv_mr->sgt;
598 struct scatterlist *s;
599 struct ib_mr *mr;
600 int nr, chunks;
601
602 chunks = chunks_per_mr * mri;
603 if (!always_invalidate)
604 chunks_per_mr = min_t(int, chunks_per_mr,
605 srv->queue_depth - chunks);
606
607 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
608 if (err)
609 goto err;
610
611 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
612 sg_set_page(s, srv->chunks[chunks + i],
613 max_chunk_size, 0);
614
615 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl,
616 sgt->nents, DMA_BIDIRECTIONAL);
617 if (nr < sgt->nents) {
618 err = nr < 0 ? nr : -EINVAL;
619 goto free_sg;
620 }
621 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
622 sgt->nents);
623 if (IS_ERR(mr)) {
624 err = PTR_ERR(mr);
625 goto unmap_sg;
626 }
627 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
628 NULL, max_chunk_size);
629 if (nr < 0 || nr < sgt->nents) {
630 err = nr < 0 ? nr : -EINVAL;
631 goto dereg_mr;
632 }
633
634 if (always_invalidate) {
635 srv_mr->iu = rtrs_iu_alloc(1,
636 sizeof(struct rtrs_msg_rkey_rsp),
637 GFP_KERNEL, sess->s.dev->ib_dev,
638 DMA_TO_DEVICE, rtrs_srv_rdma_done);
639 if (!srv_mr->iu) {
640 err = -ENOMEM;
641 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
642 goto dereg_mr;
643 }
644 }
645 /* Eventually dma addr for each chunk can be cached */
646 for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
647 sess->dma_addr[chunks + i] = sg_dma_address(s);
648
649 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
650 srv_mr->mr = mr;
651
652 continue;
653 err:
654 while (mri--) {
655 srv_mr = &sess->mrs[mri];
656 sgt = &srv_mr->sgt;
657 mr = srv_mr->mr;
658 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1);
659 dereg_mr:
660 ib_dereg_mr(mr);
661 unmap_sg:
662 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl,
663 sgt->nents, DMA_BIDIRECTIONAL);
664 free_sg:
665 sg_free_table(sgt);
666 }
667 kfree(sess->mrs);
668
669 return err;
670 }
671
672 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
673 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
674
675 return 0;
676 }
677
rtrs_srv_hb_err_handler(struct rtrs_con * c)678 static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
679 {
680 close_sess(to_srv_sess(c->sess));
681 }
682
rtrs_srv_init_hb(struct rtrs_srv_sess * sess)683 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess)
684 {
685 rtrs_init_hb(&sess->s, &io_comp_cqe,
686 RTRS_HB_INTERVAL_MS,
687 RTRS_HB_MISSED_MAX,
688 rtrs_srv_hb_err_handler,
689 rtrs_wq);
690 }
691
rtrs_srv_start_hb(struct rtrs_srv_sess * sess)692 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess)
693 {
694 rtrs_start_hb(&sess->s);
695 }
696
rtrs_srv_stop_hb(struct rtrs_srv_sess * sess)697 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess)
698 {
699 rtrs_stop_hb(&sess->s);
700 }
701
rtrs_srv_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)702 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
703 {
704 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
705 struct rtrs_sess *s = con->c.sess;
706 struct rtrs_srv_sess *sess = to_srv_sess(s);
707 struct rtrs_iu *iu;
708
709 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
710 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
711
712 if (wc->status != IB_WC_SUCCESS) {
713 rtrs_err(s, "Sess info response send failed: %s\n",
714 ib_wc_status_msg(wc->status));
715 close_sess(sess);
716 return;
717 }
718 WARN_ON(wc->opcode != IB_WC_SEND);
719 }
720
rtrs_srv_sess_up(struct rtrs_srv_sess * sess)721 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess)
722 {
723 struct rtrs_srv *srv = sess->srv;
724 struct rtrs_srv_ctx *ctx = srv->ctx;
725 int up;
726
727 mutex_lock(&srv->paths_ev_mutex);
728 up = ++srv->paths_up;
729 if (up == 1)
730 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
731 mutex_unlock(&srv->paths_ev_mutex);
732
733 /* Mark session as established */
734 sess->established = true;
735 }
736
rtrs_srv_sess_down(struct rtrs_srv_sess * sess)737 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess)
738 {
739 struct rtrs_srv *srv = sess->srv;
740 struct rtrs_srv_ctx *ctx = srv->ctx;
741
742 if (!sess->established)
743 return;
744
745 sess->established = false;
746 mutex_lock(&srv->paths_ev_mutex);
747 WARN_ON(!srv->paths_up);
748 if (--srv->paths_up == 0)
749 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
750 mutex_unlock(&srv->paths_ev_mutex);
751 }
752
exist_sessname(struct rtrs_srv_ctx * ctx,const char * sessname,const uuid_t * path_uuid)753 static bool exist_sessname(struct rtrs_srv_ctx *ctx,
754 const char *sessname, const uuid_t *path_uuid)
755 {
756 struct rtrs_srv *srv;
757 struct rtrs_srv_sess *sess;
758 bool found = false;
759
760 mutex_lock(&ctx->srv_mutex);
761 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
762 mutex_lock(&srv->paths_mutex);
763
764 /* when a client with same uuid and same sessname tried to add a path */
765 if (uuid_equal(&srv->paths_uuid, path_uuid)) {
766 mutex_unlock(&srv->paths_mutex);
767 continue;
768 }
769
770 list_for_each_entry(sess, &srv->paths_list, s.entry) {
771 if (strlen(sess->s.sessname) == strlen(sessname) &&
772 !strcmp(sess->s.sessname, sessname)) {
773 found = true;
774 break;
775 }
776 }
777 mutex_unlock(&srv->paths_mutex);
778 if (found)
779 break;
780 }
781 mutex_unlock(&ctx->srv_mutex);
782 return found;
783 }
784
785 static int post_recv_sess(struct rtrs_srv_sess *sess);
786 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
787
process_info_req(struct rtrs_srv_con * con,struct rtrs_msg_info_req * msg)788 static int process_info_req(struct rtrs_srv_con *con,
789 struct rtrs_msg_info_req *msg)
790 {
791 struct rtrs_sess *s = con->c.sess;
792 struct rtrs_srv_sess *sess = to_srv_sess(s);
793 struct ib_send_wr *reg_wr = NULL;
794 struct rtrs_msg_info_rsp *rsp;
795 struct rtrs_iu *tx_iu;
796 struct ib_reg_wr *rwr;
797 int mri, err;
798 size_t tx_sz;
799
800 err = post_recv_sess(sess);
801 if (err) {
802 rtrs_err(s, "post_recv_sess(), err: %d\n", err);
803 return err;
804 }
805
806 if (exist_sessname(sess->srv->ctx,
807 msg->sessname, &sess->srv->paths_uuid)) {
808 rtrs_err(s, "sessname is duplicated: %s\n", msg->sessname);
809 return -EPERM;
810 }
811 strscpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname));
812
813 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL);
814 if (!rwr)
815 return -ENOMEM;
816
817 tx_sz = sizeof(*rsp);
818 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num;
819 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
820 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
821 if (!tx_iu) {
822 err = -ENOMEM;
823 goto rwr_free;
824 }
825
826 rsp = tx_iu->buf;
827 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
828 rsp->sg_cnt = cpu_to_le16(sess->mrs_num);
829
830 for (mri = 0; mri < sess->mrs_num; mri++) {
831 struct ib_mr *mr = sess->mrs[mri].mr;
832
833 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
834 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
835 rsp->desc[mri].len = cpu_to_le32(mr->length);
836
837 /*
838 * Fill in reg MR request and chain them *backwards*
839 */
840 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
841 rwr[mri].wr.opcode = IB_WR_REG_MR;
842 rwr[mri].wr.wr_cqe = &local_reg_cqe;
843 rwr[mri].wr.num_sge = 0;
844 rwr[mri].wr.send_flags = 0;
845 rwr[mri].mr = mr;
846 rwr[mri].key = mr->rkey;
847 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
848 IB_ACCESS_REMOTE_WRITE);
849 reg_wr = &rwr[mri].wr;
850 }
851
852 err = rtrs_srv_create_sess_files(sess);
853 if (err)
854 goto iu_free;
855 kobject_get(&sess->kobj);
856 get_device(&sess->srv->dev);
857 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED);
858 rtrs_srv_start_hb(sess);
859
860 /*
861 * We do not account number of established connections at the current
862 * moment, we rely on the client, which should send info request when
863 * all connections are successfully established. Thus, simply notify
864 * listener with a proper event if we are the first path.
865 */
866 rtrs_srv_sess_up(sess);
867
868 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
869 tx_iu->size, DMA_TO_DEVICE);
870
871 /* Send info response */
872 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
873 if (err) {
874 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
875 iu_free:
876 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
877 }
878 rwr_free:
879 kfree(rwr);
880
881 return err;
882 }
883
rtrs_srv_info_req_done(struct ib_cq * cq,struct ib_wc * wc)884 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
885 {
886 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
887 struct rtrs_sess *s = con->c.sess;
888 struct rtrs_srv_sess *sess = to_srv_sess(s);
889 struct rtrs_msg_info_req *msg;
890 struct rtrs_iu *iu;
891 int err;
892
893 WARN_ON(con->c.cid);
894
895 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
896 if (wc->status != IB_WC_SUCCESS) {
897 rtrs_err(s, "Sess info request receive failed: %s\n",
898 ib_wc_status_msg(wc->status));
899 goto close;
900 }
901 WARN_ON(wc->opcode != IB_WC_RECV);
902
903 if (wc->byte_len < sizeof(*msg)) {
904 rtrs_err(s, "Sess info request is malformed: size %d\n",
905 wc->byte_len);
906 goto close;
907 }
908 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
909 iu->size, DMA_FROM_DEVICE);
910 msg = iu->buf;
911 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
912 rtrs_err(s, "Sess info request is malformed: type %d\n",
913 le16_to_cpu(msg->type));
914 goto close;
915 }
916 err = process_info_req(con, msg);
917 if (err)
918 goto close;
919
920 out:
921 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
922 return;
923 close:
924 close_sess(sess);
925 goto out;
926 }
927
post_recv_info_req(struct rtrs_srv_con * con)928 static int post_recv_info_req(struct rtrs_srv_con *con)
929 {
930 struct rtrs_sess *s = con->c.sess;
931 struct rtrs_srv_sess *sess = to_srv_sess(s);
932 struct rtrs_iu *rx_iu;
933 int err;
934
935 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
936 GFP_KERNEL, sess->s.dev->ib_dev,
937 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
938 if (!rx_iu)
939 return -ENOMEM;
940 /* Prepare for getting info response */
941 err = rtrs_iu_post_recv(&con->c, rx_iu);
942 if (err) {
943 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
944 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
945 return err;
946 }
947
948 return 0;
949 }
950
post_recv_io(struct rtrs_srv_con * con,size_t q_size)951 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
952 {
953 int i, err;
954
955 for (i = 0; i < q_size; i++) {
956 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
957 if (err)
958 return err;
959 }
960
961 return 0;
962 }
963
post_recv_sess(struct rtrs_srv_sess * sess)964 static int post_recv_sess(struct rtrs_srv_sess *sess)
965 {
966 struct rtrs_srv *srv = sess->srv;
967 struct rtrs_sess *s = &sess->s;
968 size_t q_size;
969 int err, cid;
970
971 for (cid = 0; cid < sess->s.con_num; cid++) {
972 if (cid == 0)
973 q_size = SERVICE_CON_QUEUE_DEPTH;
974 else
975 q_size = srv->queue_depth;
976
977 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size);
978 if (err) {
979 rtrs_err(s, "post_recv_io(), err: %d\n", err);
980 return err;
981 }
982 }
983
984 return 0;
985 }
986
process_read(struct rtrs_srv_con * con,struct rtrs_msg_rdma_read * msg,u32 buf_id,u32 off)987 static void process_read(struct rtrs_srv_con *con,
988 struct rtrs_msg_rdma_read *msg,
989 u32 buf_id, u32 off)
990 {
991 struct rtrs_sess *s = con->c.sess;
992 struct rtrs_srv_sess *sess = to_srv_sess(s);
993 struct rtrs_srv *srv = sess->srv;
994 struct rtrs_srv_ctx *ctx = srv->ctx;
995 struct rtrs_srv_op *id;
996
997 size_t usr_len, data_len;
998 void *data;
999 int ret;
1000
1001 if (sess->state != RTRS_SRV_CONNECTED) {
1002 rtrs_err_rl(s,
1003 "Processing read request failed, session is disconnected, sess state %s\n",
1004 rtrs_srv_state_str(sess->state));
1005 return;
1006 }
1007 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
1008 rtrs_err_rl(s,
1009 "Processing read request failed, invalid message\n");
1010 return;
1011 }
1012 rtrs_srv_get_ops_ids(sess);
1013 rtrs_srv_update_rdma_stats(sess->stats, off, READ);
1014 id = sess->ops_ids[buf_id];
1015 id->con = con;
1016 id->dir = READ;
1017 id->msg_id = buf_id;
1018 id->rd_msg = msg;
1019 usr_len = le16_to_cpu(msg->usr_len);
1020 data_len = off - usr_len;
1021 data = page_address(srv->chunks[buf_id]);
1022 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len,
1023 data + data_len, usr_len);
1024
1025 if (ret) {
1026 rtrs_err_rl(s,
1027 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1028 buf_id, ret);
1029 goto send_err_msg;
1030 }
1031
1032 return;
1033
1034 send_err_msg:
1035 ret = send_io_resp_imm(con, id, ret);
1036 if (ret < 0) {
1037 rtrs_err_rl(s,
1038 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1039 buf_id, ret);
1040 close_sess(sess);
1041 }
1042 rtrs_srv_put_ops_ids(sess);
1043 }
1044
process_write(struct rtrs_srv_con * con,struct rtrs_msg_rdma_write * req,u32 buf_id,u32 off)1045 static void process_write(struct rtrs_srv_con *con,
1046 struct rtrs_msg_rdma_write *req,
1047 u32 buf_id, u32 off)
1048 {
1049 struct rtrs_sess *s = con->c.sess;
1050 struct rtrs_srv_sess *sess = to_srv_sess(s);
1051 struct rtrs_srv *srv = sess->srv;
1052 struct rtrs_srv_ctx *ctx = srv->ctx;
1053 struct rtrs_srv_op *id;
1054
1055 size_t data_len, usr_len;
1056 void *data;
1057 int ret;
1058
1059 if (sess->state != RTRS_SRV_CONNECTED) {
1060 rtrs_err_rl(s,
1061 "Processing write request failed, session is disconnected, sess state %s\n",
1062 rtrs_srv_state_str(sess->state));
1063 return;
1064 }
1065 rtrs_srv_get_ops_ids(sess);
1066 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE);
1067 id = sess->ops_ids[buf_id];
1068 id->con = con;
1069 id->dir = WRITE;
1070 id->msg_id = buf_id;
1071
1072 usr_len = le16_to_cpu(req->usr_len);
1073 data_len = off - usr_len;
1074 data = page_address(srv->chunks[buf_id]);
1075 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len,
1076 data + data_len, usr_len);
1077 if (ret) {
1078 rtrs_err_rl(s,
1079 "Processing write request failed, user module callback reports err: %d\n",
1080 ret);
1081 goto send_err_msg;
1082 }
1083
1084 return;
1085
1086 send_err_msg:
1087 ret = send_io_resp_imm(con, id, ret);
1088 if (ret < 0) {
1089 rtrs_err_rl(s,
1090 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1091 buf_id, ret);
1092 close_sess(sess);
1093 }
1094 rtrs_srv_put_ops_ids(sess);
1095 }
1096
process_io_req(struct rtrs_srv_con * con,void * msg,u32 id,u32 off)1097 static void process_io_req(struct rtrs_srv_con *con, void *msg,
1098 u32 id, u32 off)
1099 {
1100 struct rtrs_sess *s = con->c.sess;
1101 struct rtrs_srv_sess *sess = to_srv_sess(s);
1102 struct rtrs_msg_rdma_hdr *hdr;
1103 unsigned int type;
1104
1105 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id],
1106 max_chunk_size, DMA_BIDIRECTIONAL);
1107 hdr = msg;
1108 type = le16_to_cpu(hdr->type);
1109
1110 switch (type) {
1111 case RTRS_MSG_WRITE:
1112 process_write(con, msg, id, off);
1113 break;
1114 case RTRS_MSG_READ:
1115 process_read(con, msg, id, off);
1116 break;
1117 default:
1118 rtrs_err(s,
1119 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1120 type);
1121 goto err;
1122 }
1123
1124 return;
1125
1126 err:
1127 close_sess(sess);
1128 }
1129
rtrs_srv_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)1130 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1131 {
1132 struct rtrs_srv_mr *mr =
1133 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1134 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1135 struct rtrs_sess *s = con->c.sess;
1136 struct rtrs_srv_sess *sess = to_srv_sess(s);
1137 struct rtrs_srv *srv = sess->srv;
1138 u32 msg_id, off;
1139 void *data;
1140
1141 if (wc->status != IB_WC_SUCCESS) {
1142 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1143 ib_wc_status_msg(wc->status));
1144 close_sess(sess);
1145 }
1146 msg_id = mr->msg_id;
1147 off = mr->msg_off;
1148 data = page_address(srv->chunks[msg_id]) + off;
1149 process_io_req(con, data, msg_id, off);
1150 }
1151
rtrs_srv_inv_rkey(struct rtrs_srv_con * con,struct rtrs_srv_mr * mr)1152 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1153 struct rtrs_srv_mr *mr)
1154 {
1155 struct ib_send_wr wr = {
1156 .opcode = IB_WR_LOCAL_INV,
1157 .wr_cqe = &mr->inv_cqe,
1158 .send_flags = IB_SEND_SIGNALED,
1159 .ex.invalidate_rkey = mr->mr->rkey,
1160 };
1161 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1162
1163 return ib_post_send(con->c.qp, &wr, NULL);
1164 }
1165
rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con * con)1166 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1167 {
1168 spin_lock(&con->rsp_wr_wait_lock);
1169 while (!list_empty(&con->rsp_wr_wait_list)) {
1170 struct rtrs_srv_op *id;
1171 int ret;
1172
1173 id = list_entry(con->rsp_wr_wait_list.next,
1174 struct rtrs_srv_op, wait_list);
1175 list_del(&id->wait_list);
1176
1177 spin_unlock(&con->rsp_wr_wait_lock);
1178 ret = rtrs_srv_resp_rdma(id, id->status);
1179 spin_lock(&con->rsp_wr_wait_lock);
1180
1181 if (!ret) {
1182 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1183 break;
1184 }
1185 }
1186 spin_unlock(&con->rsp_wr_wait_lock);
1187 }
1188
rtrs_srv_rdma_done(struct ib_cq * cq,struct ib_wc * wc)1189 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1190 {
1191 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1192 struct rtrs_sess *s = con->c.sess;
1193 struct rtrs_srv_sess *sess = to_srv_sess(s);
1194 struct rtrs_srv *srv = sess->srv;
1195 u32 imm_type, imm_payload;
1196 int err;
1197
1198 if (wc->status != IB_WC_SUCCESS) {
1199 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1200 rtrs_err(s,
1201 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1202 ib_wc_status_msg(wc->status), wc->wr_cqe,
1203 wc->opcode, wc->vendor_err, wc->byte_len);
1204 close_sess(sess);
1205 }
1206 return;
1207 }
1208
1209 switch (wc->opcode) {
1210 case IB_WC_RECV_RDMA_WITH_IMM:
1211 /*
1212 * post_recv() RDMA write completions of IO reqs (read/write)
1213 * and hb
1214 */
1215 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1216 return;
1217 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1218 if (err) {
1219 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1220 close_sess(sess);
1221 break;
1222 }
1223 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1224 &imm_type, &imm_payload);
1225 if (imm_type == RTRS_IO_REQ_IMM) {
1226 u32 msg_id, off;
1227 void *data;
1228
1229 msg_id = imm_payload >> sess->mem_bits;
1230 off = imm_payload & ((1 << sess->mem_bits) - 1);
1231 if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
1232 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1233 msg_id, off);
1234 close_sess(sess);
1235 return;
1236 }
1237 if (always_invalidate) {
1238 struct rtrs_srv_mr *mr = &sess->mrs[msg_id];
1239
1240 mr->msg_off = off;
1241 mr->msg_id = msg_id;
1242 err = rtrs_srv_inv_rkey(con, mr);
1243 if (err) {
1244 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1245 err);
1246 close_sess(sess);
1247 break;
1248 }
1249 } else {
1250 data = page_address(srv->chunks[msg_id]) + off;
1251 process_io_req(con, data, msg_id, off);
1252 }
1253 } else if (imm_type == RTRS_HB_MSG_IMM) {
1254 WARN_ON(con->c.cid);
1255 rtrs_send_hb_ack(&sess->s);
1256 } else if (imm_type == RTRS_HB_ACK_IMM) {
1257 WARN_ON(con->c.cid);
1258 sess->s.hb_missed_cnt = 0;
1259 } else {
1260 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1261 }
1262 break;
1263 case IB_WC_RDMA_WRITE:
1264 case IB_WC_SEND:
1265 /*
1266 * post_send() RDMA write completions of IO reqs (read/write)
1267 * and hb.
1268 */
1269 atomic_add(s->signal_interval, &con->c.sq_wr_avail);
1270
1271 if (!list_empty_careful(&con->rsp_wr_wait_list))
1272 rtrs_rdma_process_wr_wait_list(con);
1273
1274 break;
1275 default:
1276 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1277 return;
1278 }
1279 }
1280
1281 /**
1282 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname.
1283 * @srv: Session
1284 * @sessname: Sessname buffer
1285 * @len: Length of sessname buffer
1286 */
rtrs_srv_get_sess_name(struct rtrs_srv * srv,char * sessname,size_t len)1287 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
1288 {
1289 struct rtrs_srv_sess *sess;
1290 int err = -ENOTCONN;
1291
1292 mutex_lock(&srv->paths_mutex);
1293 list_for_each_entry(sess, &srv->paths_list, s.entry) {
1294 if (sess->state != RTRS_SRV_CONNECTED)
1295 continue;
1296 strscpy(sessname, sess->s.sessname,
1297 min_t(size_t, sizeof(sess->s.sessname), len));
1298 err = 0;
1299 break;
1300 }
1301 mutex_unlock(&srv->paths_mutex);
1302
1303 return err;
1304 }
1305 EXPORT_SYMBOL(rtrs_srv_get_sess_name);
1306
1307 /**
1308 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1309 * @srv: Session
1310 */
rtrs_srv_get_queue_depth(struct rtrs_srv * srv)1311 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
1312 {
1313 return srv->queue_depth;
1314 }
1315 EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1316
find_next_bit_ring(struct rtrs_srv_sess * sess)1317 static int find_next_bit_ring(struct rtrs_srv_sess *sess)
1318 {
1319 struct ib_device *ib_dev = sess->s.dev->ib_dev;
1320 int v;
1321
1322 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask);
1323 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1324 v = cpumask_first(&cq_affinity_mask);
1325 return v;
1326 }
1327
rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess * sess)1328 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess)
1329 {
1330 sess->cur_cq_vector = find_next_bit_ring(sess);
1331
1332 return sess->cur_cq_vector;
1333 }
1334
rtrs_srv_dev_release(struct device * dev)1335 static void rtrs_srv_dev_release(struct device *dev)
1336 {
1337 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev);
1338
1339 kfree(srv);
1340 }
1341
free_srv(struct rtrs_srv * srv)1342 static void free_srv(struct rtrs_srv *srv)
1343 {
1344 int i;
1345
1346 WARN_ON(refcount_read(&srv->refcount));
1347 for (i = 0; i < srv->queue_depth; i++)
1348 mempool_free(srv->chunks[i], chunk_pool);
1349 kfree(srv->chunks);
1350 mutex_destroy(&srv->paths_mutex);
1351 mutex_destroy(&srv->paths_ev_mutex);
1352 /* last put to release the srv structure */
1353 put_device(&srv->dev);
1354 }
1355
get_or_create_srv(struct rtrs_srv_ctx * ctx,const uuid_t * paths_uuid,bool first_conn)1356 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1357 const uuid_t *paths_uuid,
1358 bool first_conn)
1359 {
1360 struct rtrs_srv *srv;
1361 int i;
1362
1363 mutex_lock(&ctx->srv_mutex);
1364 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1365 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1366 refcount_inc_not_zero(&srv->refcount)) {
1367 mutex_unlock(&ctx->srv_mutex);
1368 return srv;
1369 }
1370 }
1371 mutex_unlock(&ctx->srv_mutex);
1372 /*
1373 * If this request is not the first connection request from the
1374 * client for this session then fail and return error.
1375 */
1376 if (!first_conn) {
1377 pr_err_ratelimited("Error: Not the first connection request for this session\n");
1378 return ERR_PTR(-ENXIO);
1379 }
1380
1381 /* need to allocate a new srv */
1382 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1383 if (!srv)
1384 return ERR_PTR(-ENOMEM);
1385
1386 INIT_LIST_HEAD(&srv->paths_list);
1387 mutex_init(&srv->paths_mutex);
1388 mutex_init(&srv->paths_ev_mutex);
1389 uuid_copy(&srv->paths_uuid, paths_uuid);
1390 srv->queue_depth = sess_queue_depth;
1391 srv->ctx = ctx;
1392 device_initialize(&srv->dev);
1393 srv->dev.release = rtrs_srv_dev_release;
1394
1395 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1396 GFP_KERNEL);
1397 if (!srv->chunks)
1398 goto err_free_srv;
1399
1400 for (i = 0; i < srv->queue_depth; i++) {
1401 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1402 if (!srv->chunks[i])
1403 goto err_free_chunks;
1404 }
1405 refcount_set(&srv->refcount, 1);
1406 mutex_lock(&ctx->srv_mutex);
1407 list_add(&srv->ctx_list, &ctx->srv_list);
1408 mutex_unlock(&ctx->srv_mutex);
1409
1410 return srv;
1411
1412 err_free_chunks:
1413 while (i--)
1414 mempool_free(srv->chunks[i], chunk_pool);
1415 kfree(srv->chunks);
1416
1417 err_free_srv:
1418 kfree(srv);
1419 return ERR_PTR(-ENOMEM);
1420 }
1421
put_srv(struct rtrs_srv * srv)1422 static void put_srv(struct rtrs_srv *srv)
1423 {
1424 if (refcount_dec_and_test(&srv->refcount)) {
1425 struct rtrs_srv_ctx *ctx = srv->ctx;
1426
1427 WARN_ON(srv->dev.kobj.state_in_sysfs);
1428
1429 mutex_lock(&ctx->srv_mutex);
1430 list_del(&srv->ctx_list);
1431 mutex_unlock(&ctx->srv_mutex);
1432 free_srv(srv);
1433 }
1434 }
1435
__add_path_to_srv(struct rtrs_srv * srv,struct rtrs_srv_sess * sess)1436 static void __add_path_to_srv(struct rtrs_srv *srv,
1437 struct rtrs_srv_sess *sess)
1438 {
1439 list_add_tail(&sess->s.entry, &srv->paths_list);
1440 srv->paths_num++;
1441 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1442 }
1443
del_path_from_srv(struct rtrs_srv_sess * sess)1444 static void del_path_from_srv(struct rtrs_srv_sess *sess)
1445 {
1446 struct rtrs_srv *srv = sess->srv;
1447
1448 if (WARN_ON(!srv))
1449 return;
1450
1451 mutex_lock(&srv->paths_mutex);
1452 list_del(&sess->s.entry);
1453 WARN_ON(!srv->paths_num);
1454 srv->paths_num--;
1455 mutex_unlock(&srv->paths_mutex);
1456 }
1457
1458 /* return true if addresses are the same, error other wise */
sockaddr_cmp(const struct sockaddr * a,const struct sockaddr * b)1459 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1460 {
1461 switch (a->sa_family) {
1462 case AF_IB:
1463 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1464 &((struct sockaddr_ib *)b)->sib_addr,
1465 sizeof(struct ib_addr)) &&
1466 (b->sa_family == AF_IB);
1467 case AF_INET:
1468 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1469 &((struct sockaddr_in *)b)->sin_addr,
1470 sizeof(struct in_addr)) &&
1471 (b->sa_family == AF_INET);
1472 case AF_INET6:
1473 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1474 &((struct sockaddr_in6 *)b)->sin6_addr,
1475 sizeof(struct in6_addr)) &&
1476 (b->sa_family == AF_INET6);
1477 default:
1478 return -ENOENT;
1479 }
1480 }
1481
__is_path_w_addr_exists(struct rtrs_srv * srv,struct rdma_addr * addr)1482 static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
1483 struct rdma_addr *addr)
1484 {
1485 struct rtrs_srv_sess *sess;
1486
1487 list_for_each_entry(sess, &srv->paths_list, s.entry)
1488 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr,
1489 (struct sockaddr *)&addr->dst_addr) &&
1490 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr,
1491 (struct sockaddr *)&addr->src_addr))
1492 return true;
1493
1494 return false;
1495 }
1496
free_sess(struct rtrs_srv_sess * sess)1497 static void free_sess(struct rtrs_srv_sess *sess)
1498 {
1499 if (sess->kobj.state_in_sysfs) {
1500 kobject_del(&sess->kobj);
1501 kobject_put(&sess->kobj);
1502 } else {
1503 kfree(sess->stats);
1504 kfree(sess);
1505 }
1506 }
1507
rtrs_srv_close_work(struct work_struct * work)1508 static void rtrs_srv_close_work(struct work_struct *work)
1509 {
1510 struct rtrs_srv_sess *sess;
1511 struct rtrs_srv_con *con;
1512 int i;
1513
1514 sess = container_of(work, typeof(*sess), close_work);
1515
1516 rtrs_srv_destroy_sess_files(sess);
1517 rtrs_srv_stop_hb(sess);
1518
1519 for (i = 0; i < sess->s.con_num; i++) {
1520 if (!sess->s.con[i])
1521 continue;
1522 con = to_srv_con(sess->s.con[i]);
1523 rdma_disconnect(con->c.cm_id);
1524 ib_drain_qp(con->c.qp);
1525 }
1526
1527 /*
1528 * Degrade ref count to the usual model with a single shared
1529 * atomic_t counter
1530 */
1531 percpu_ref_kill(&sess->ids_inflight_ref);
1532
1533 /* Wait for all completion */
1534 wait_for_completion(&sess->complete_done);
1535
1536 /* Notify upper layer if we are the last path */
1537 rtrs_srv_sess_down(sess);
1538
1539 unmap_cont_bufs(sess);
1540 rtrs_srv_free_ops_ids(sess);
1541
1542 for (i = 0; i < sess->s.con_num; i++) {
1543 if (!sess->s.con[i])
1544 continue;
1545 con = to_srv_con(sess->s.con[i]);
1546 rtrs_cq_qp_destroy(&con->c);
1547 rdma_destroy_id(con->c.cm_id);
1548 kfree(con);
1549 }
1550 rtrs_ib_dev_put(sess->s.dev);
1551
1552 del_path_from_srv(sess);
1553 put_srv(sess->srv);
1554 sess->srv = NULL;
1555 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED);
1556
1557 kfree(sess->dma_addr);
1558 kfree(sess->s.con);
1559 free_sess(sess);
1560 }
1561
rtrs_rdma_do_accept(struct rtrs_srv_sess * sess,struct rdma_cm_id * cm_id)1562 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
1563 struct rdma_cm_id *cm_id)
1564 {
1565 struct rtrs_srv *srv = sess->srv;
1566 struct rtrs_msg_conn_rsp msg;
1567 struct rdma_conn_param param;
1568 int err;
1569
1570 param = (struct rdma_conn_param) {
1571 .rnr_retry_count = 7,
1572 .private_data = &msg,
1573 .private_data_len = sizeof(msg),
1574 };
1575
1576 msg = (struct rtrs_msg_conn_rsp) {
1577 .magic = cpu_to_le16(RTRS_MAGIC),
1578 .version = cpu_to_le16(RTRS_PROTO_VER),
1579 .queue_depth = cpu_to_le16(srv->queue_depth),
1580 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1581 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1582 };
1583
1584 if (always_invalidate)
1585 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1586
1587 err = rdma_accept(cm_id, ¶m);
1588 if (err)
1589 pr_err("rdma_accept(), err: %d\n", err);
1590
1591 return err;
1592 }
1593
rtrs_rdma_do_reject(struct rdma_cm_id * cm_id,int errno)1594 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1595 {
1596 struct rtrs_msg_conn_rsp msg;
1597 int err;
1598
1599 msg = (struct rtrs_msg_conn_rsp) {
1600 .magic = cpu_to_le16(RTRS_MAGIC),
1601 .version = cpu_to_le16(RTRS_PROTO_VER),
1602 .errno = cpu_to_le16(errno),
1603 };
1604
1605 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1606 if (err)
1607 pr_err("rdma_reject(), err: %d\n", err);
1608
1609 /* Bounce errno back */
1610 return errno;
1611 }
1612
1613 static struct rtrs_srv_sess *
__find_sess(struct rtrs_srv * srv,const uuid_t * sess_uuid)1614 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid)
1615 {
1616 struct rtrs_srv_sess *sess;
1617
1618 list_for_each_entry(sess, &srv->paths_list, s.entry) {
1619 if (uuid_equal(&sess->s.uuid, sess_uuid))
1620 return sess;
1621 }
1622
1623 return NULL;
1624 }
1625
create_con(struct rtrs_srv_sess * sess,struct rdma_cm_id * cm_id,unsigned int cid)1626 static int create_con(struct rtrs_srv_sess *sess,
1627 struct rdma_cm_id *cm_id,
1628 unsigned int cid)
1629 {
1630 struct rtrs_srv *srv = sess->srv;
1631 struct rtrs_sess *s = &sess->s;
1632 struct rtrs_srv_con *con;
1633
1634 u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1635 int err, cq_vector;
1636
1637 con = kzalloc(sizeof(*con), GFP_KERNEL);
1638 if (!con) {
1639 err = -ENOMEM;
1640 goto err;
1641 }
1642
1643 spin_lock_init(&con->rsp_wr_wait_lock);
1644 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1645 con->c.cm_id = cm_id;
1646 con->c.sess = &sess->s;
1647 con->c.cid = cid;
1648 atomic_set(&con->c.wr_cnt, 1);
1649 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1650
1651 if (con->c.cid == 0) {
1652 /*
1653 * All receive and all send (each requiring invalidate)
1654 * + 2 for drain and heartbeat
1655 */
1656 max_send_wr = min_t(int, wr_limit,
1657 SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1658 max_recv_wr = max_send_wr;
1659 s->signal_interval = min_not_zero(srv->queue_depth,
1660 (size_t)SERVICE_CON_QUEUE_DEPTH);
1661 } else {
1662 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1663 if (always_invalidate)
1664 max_send_wr =
1665 min_t(int, wr_limit,
1666 srv->queue_depth * (1 + 4) + 1);
1667 else
1668 max_send_wr =
1669 min_t(int, wr_limit,
1670 srv->queue_depth * (1 + 2) + 1);
1671
1672 max_recv_wr = srv->queue_depth + 1;
1673 /*
1674 * If we have all receive requests posted and
1675 * all write requests posted and each read request
1676 * requires an invalidate request + drain
1677 * and qp gets into error state.
1678 */
1679 }
1680 cq_num = max_send_wr + max_recv_wr;
1681 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1682 cq_vector = rtrs_srv_get_next_cq_vector(sess);
1683
1684 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1685 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_num,
1686 max_send_wr, max_recv_wr,
1687 IB_POLL_WORKQUEUE);
1688 if (err) {
1689 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1690 goto free_con;
1691 }
1692 if (con->c.cid == 0) {
1693 err = post_recv_info_req(con);
1694 if (err)
1695 goto free_cqqp;
1696 }
1697 WARN_ON(sess->s.con[cid]);
1698 sess->s.con[cid] = &con->c;
1699
1700 /*
1701 * Change context from server to current connection. The other
1702 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1703 */
1704 cm_id->context = &con->c;
1705
1706 return 0;
1707
1708 free_cqqp:
1709 rtrs_cq_qp_destroy(&con->c);
1710 free_con:
1711 kfree(con);
1712
1713 err:
1714 return err;
1715 }
1716
__alloc_sess(struct rtrs_srv * srv,struct rdma_cm_id * cm_id,unsigned int con_num,unsigned int recon_cnt,const uuid_t * uuid)1717 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv,
1718 struct rdma_cm_id *cm_id,
1719 unsigned int con_num,
1720 unsigned int recon_cnt,
1721 const uuid_t *uuid)
1722 {
1723 struct rtrs_srv_sess *sess;
1724 int err = -ENOMEM;
1725 char str[NAME_MAX];
1726 struct rtrs_addr path;
1727
1728 if (srv->paths_num >= MAX_PATHS_NUM) {
1729 err = -ECONNRESET;
1730 goto err;
1731 }
1732 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1733 err = -EEXIST;
1734 pr_err("Path with same addr exists\n");
1735 goto err;
1736 }
1737 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1738 if (!sess)
1739 goto err;
1740
1741 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1742 if (!sess->stats)
1743 goto err_free_sess;
1744
1745 sess->stats->sess = sess;
1746
1747 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr),
1748 GFP_KERNEL);
1749 if (!sess->dma_addr)
1750 goto err_free_stats;
1751
1752 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1753 if (!sess->s.con)
1754 goto err_free_dma_addr;
1755
1756 sess->state = RTRS_SRV_CONNECTING;
1757 sess->srv = srv;
1758 sess->cur_cq_vector = -1;
1759 sess->s.dst_addr = cm_id->route.addr.dst_addr;
1760 sess->s.src_addr = cm_id->route.addr.src_addr;
1761
1762 /* temporary until receiving session-name from client */
1763 path.src = &sess->s.src_addr;
1764 path.dst = &sess->s.dst_addr;
1765 rtrs_addr_to_str(&path, str, sizeof(str));
1766 strscpy(sess->s.sessname, str, sizeof(sess->s.sessname));
1767
1768 sess->s.con_num = con_num;
1769 sess->s.recon_cnt = recon_cnt;
1770 uuid_copy(&sess->s.uuid, uuid);
1771 spin_lock_init(&sess->state_lock);
1772 INIT_WORK(&sess->close_work, rtrs_srv_close_work);
1773 rtrs_srv_init_hb(sess);
1774
1775 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1776 if (!sess->s.dev) {
1777 err = -ENOMEM;
1778 goto err_free_con;
1779 }
1780 err = map_cont_bufs(sess);
1781 if (err)
1782 goto err_put_dev;
1783
1784 err = rtrs_srv_alloc_ops_ids(sess);
1785 if (err)
1786 goto err_unmap_bufs;
1787
1788 __add_path_to_srv(srv, sess);
1789
1790 return sess;
1791
1792 err_unmap_bufs:
1793 unmap_cont_bufs(sess);
1794 err_put_dev:
1795 rtrs_ib_dev_put(sess->s.dev);
1796 err_free_con:
1797 kfree(sess->s.con);
1798 err_free_dma_addr:
1799 kfree(sess->dma_addr);
1800 err_free_stats:
1801 kfree(sess->stats);
1802 err_free_sess:
1803 kfree(sess);
1804 err:
1805 return ERR_PTR(err);
1806 }
1807
rtrs_rdma_connect(struct rdma_cm_id * cm_id,const struct rtrs_msg_conn_req * msg,size_t len)1808 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1809 const struct rtrs_msg_conn_req *msg,
1810 size_t len)
1811 {
1812 struct rtrs_srv_ctx *ctx = cm_id->context;
1813 struct rtrs_srv_sess *sess;
1814 struct rtrs_srv *srv;
1815
1816 u16 version, con_num, cid;
1817 u16 recon_cnt;
1818 int err = -ECONNRESET;
1819
1820 if (len < sizeof(*msg)) {
1821 pr_err("Invalid RTRS connection request\n");
1822 goto reject_w_err;
1823 }
1824 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1825 pr_err("Invalid RTRS magic\n");
1826 goto reject_w_err;
1827 }
1828 version = le16_to_cpu(msg->version);
1829 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1830 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1831 version >> 8, RTRS_PROTO_VER_MAJOR);
1832 goto reject_w_err;
1833 }
1834 con_num = le16_to_cpu(msg->cid_num);
1835 if (con_num > 4096) {
1836 /* Sanity check */
1837 pr_err("Too many connections requested: %d\n", con_num);
1838 goto reject_w_err;
1839 }
1840 cid = le16_to_cpu(msg->cid);
1841 if (cid >= con_num) {
1842 /* Sanity check */
1843 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1844 goto reject_w_err;
1845 }
1846 recon_cnt = le16_to_cpu(msg->recon_cnt);
1847 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1848 if (IS_ERR(srv)) {
1849 err = PTR_ERR(srv);
1850 pr_err("get_or_create_srv(), error %d\n", err);
1851 goto reject_w_err;
1852 }
1853 mutex_lock(&srv->paths_mutex);
1854 sess = __find_sess(srv, &msg->sess_uuid);
1855 if (sess) {
1856 struct rtrs_sess *s = &sess->s;
1857
1858 /* Session already holds a reference */
1859 put_srv(srv);
1860
1861 if (sess->state != RTRS_SRV_CONNECTING) {
1862 rtrs_err(s, "Session in wrong state: %s\n",
1863 rtrs_srv_state_str(sess->state));
1864 mutex_unlock(&srv->paths_mutex);
1865 goto reject_w_err;
1866 }
1867 /*
1868 * Sanity checks
1869 */
1870 if (con_num != s->con_num || cid >= s->con_num) {
1871 rtrs_err(s, "Incorrect request: %d, %d\n",
1872 cid, con_num);
1873 mutex_unlock(&srv->paths_mutex);
1874 goto reject_w_err;
1875 }
1876 if (s->con[cid]) {
1877 rtrs_err(s, "Connection already exists: %d\n",
1878 cid);
1879 mutex_unlock(&srv->paths_mutex);
1880 goto reject_w_err;
1881 }
1882 } else {
1883 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt,
1884 &msg->sess_uuid);
1885 if (IS_ERR(sess)) {
1886 mutex_unlock(&srv->paths_mutex);
1887 put_srv(srv);
1888 err = PTR_ERR(sess);
1889 pr_err("RTRS server session allocation failed: %d\n", err);
1890 goto reject_w_err;
1891 }
1892 }
1893 err = create_con(sess, cm_id, cid);
1894 if (err) {
1895 rtrs_err((&sess->s), "create_con(), error %d\n", err);
1896 rtrs_rdma_do_reject(cm_id, err);
1897 /*
1898 * Since session has other connections we follow normal way
1899 * through workqueue, but still return an error to tell cma.c
1900 * to call rdma_destroy_id() for current connection.
1901 */
1902 goto close_and_return_err;
1903 }
1904 err = rtrs_rdma_do_accept(sess, cm_id);
1905 if (err) {
1906 rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err);
1907 rtrs_rdma_do_reject(cm_id, err);
1908 /*
1909 * Since current connection was successfully added to the
1910 * session we follow normal way through workqueue to close the
1911 * session, thus return 0 to tell cma.c we call
1912 * rdma_destroy_id() ourselves.
1913 */
1914 err = 0;
1915 goto close_and_return_err;
1916 }
1917 mutex_unlock(&srv->paths_mutex);
1918
1919 return 0;
1920
1921 reject_w_err:
1922 return rtrs_rdma_do_reject(cm_id, err);
1923
1924 close_and_return_err:
1925 mutex_unlock(&srv->paths_mutex);
1926 close_sess(sess);
1927
1928 return err;
1929 }
1930
rtrs_srv_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1931 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1932 struct rdma_cm_event *ev)
1933 {
1934 struct rtrs_srv_sess *sess = NULL;
1935 struct rtrs_sess *s = NULL;
1936
1937 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1938 struct rtrs_con *c = cm_id->context;
1939
1940 s = c->sess;
1941 sess = to_srv_sess(s);
1942 }
1943
1944 switch (ev->event) {
1945 case RDMA_CM_EVENT_CONNECT_REQUEST:
1946 /*
1947 * In case of error cma.c will destroy cm_id,
1948 * see cma_process_remove()
1949 */
1950 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1951 ev->param.conn.private_data_len);
1952 case RDMA_CM_EVENT_ESTABLISHED:
1953 /* Nothing here */
1954 break;
1955 case RDMA_CM_EVENT_REJECTED:
1956 case RDMA_CM_EVENT_CONNECT_ERROR:
1957 case RDMA_CM_EVENT_UNREACHABLE:
1958 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1959 rdma_event_msg(ev->event), ev->status);
1960 fallthrough;
1961 case RDMA_CM_EVENT_DISCONNECTED:
1962 case RDMA_CM_EVENT_ADDR_CHANGE:
1963 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1964 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1965 close_sess(sess);
1966 break;
1967 default:
1968 pr_err("Ignoring unexpected CM event %s, err %d\n",
1969 rdma_event_msg(ev->event), ev->status);
1970 break;
1971 }
1972
1973 return 0;
1974 }
1975
rtrs_srv_cm_init(struct rtrs_srv_ctx * ctx,struct sockaddr * addr,enum rdma_ucm_port_space ps)1976 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1977 struct sockaddr *addr,
1978 enum rdma_ucm_port_space ps)
1979 {
1980 struct rdma_cm_id *cm_id;
1981 int ret;
1982
1983 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
1984 ctx, ps, IB_QPT_RC);
1985 if (IS_ERR(cm_id)) {
1986 ret = PTR_ERR(cm_id);
1987 pr_err("Creating id for RDMA connection failed, err: %d\n",
1988 ret);
1989 goto err_out;
1990 }
1991 ret = rdma_bind_addr(cm_id, addr);
1992 if (ret) {
1993 pr_err("Binding RDMA address failed, err: %d\n", ret);
1994 goto err_cm;
1995 }
1996 ret = rdma_listen(cm_id, 64);
1997 if (ret) {
1998 pr_err("Listening on RDMA connection failed, err: %d\n",
1999 ret);
2000 goto err_cm;
2001 }
2002
2003 return cm_id;
2004
2005 err_cm:
2006 rdma_destroy_id(cm_id);
2007 err_out:
2008
2009 return ERR_PTR(ret);
2010 }
2011
rtrs_srv_rdma_init(struct rtrs_srv_ctx * ctx,u16 port)2012 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2013 {
2014 struct sockaddr_in6 sin = {
2015 .sin6_family = AF_INET6,
2016 .sin6_addr = IN6ADDR_ANY_INIT,
2017 .sin6_port = htons(port),
2018 };
2019 struct sockaddr_ib sib = {
2020 .sib_family = AF_IB,
2021 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2022 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
2023 .sib_pkey = cpu_to_be16(0xffff),
2024 };
2025 struct rdma_cm_id *cm_ip, *cm_ib;
2026 int ret;
2027
2028 /*
2029 * We accept both IPoIB and IB connections, so we need to keep
2030 * two cm id's, one for each socket type and port space.
2031 * If the cm initialization of one of the id's fails, we abort
2032 * everything.
2033 */
2034 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2035 if (IS_ERR(cm_ip))
2036 return PTR_ERR(cm_ip);
2037
2038 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2039 if (IS_ERR(cm_ib)) {
2040 ret = PTR_ERR(cm_ib);
2041 goto free_cm_ip;
2042 }
2043
2044 ctx->cm_id_ip = cm_ip;
2045 ctx->cm_id_ib = cm_ib;
2046
2047 return 0;
2048
2049 free_cm_ip:
2050 rdma_destroy_id(cm_ip);
2051
2052 return ret;
2053 }
2054
alloc_srv_ctx(struct rtrs_srv_ops * ops)2055 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2056 {
2057 struct rtrs_srv_ctx *ctx;
2058
2059 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2060 if (!ctx)
2061 return NULL;
2062
2063 ctx->ops = *ops;
2064 mutex_init(&ctx->srv_mutex);
2065 INIT_LIST_HEAD(&ctx->srv_list);
2066
2067 return ctx;
2068 }
2069
free_srv_ctx(struct rtrs_srv_ctx * ctx)2070 static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2071 {
2072 WARN_ON(!list_empty(&ctx->srv_list));
2073 mutex_destroy(&ctx->srv_mutex);
2074 kfree(ctx);
2075 }
2076
rtrs_srv_add_one(struct ib_device * device)2077 static int rtrs_srv_add_one(struct ib_device *device)
2078 {
2079 struct rtrs_srv_ctx *ctx;
2080 int ret = 0;
2081
2082 mutex_lock(&ib_ctx.ib_dev_mutex);
2083 if (ib_ctx.ib_dev_count)
2084 goto out;
2085
2086 /*
2087 * Since our CM IDs are NOT bound to any ib device we will create them
2088 * only once
2089 */
2090 ctx = ib_ctx.srv_ctx;
2091 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2092 if (ret) {
2093 /*
2094 * We errored out here.
2095 * According to the ib code, if we encounter an error here then the
2096 * error code is ignored, and no more calls to our ops are made.
2097 */
2098 pr_err("Failed to initialize RDMA connection");
2099 goto err_out;
2100 }
2101
2102 out:
2103 /*
2104 * Keep a track on the number of ib devices added
2105 */
2106 ib_ctx.ib_dev_count++;
2107
2108 err_out:
2109 mutex_unlock(&ib_ctx.ib_dev_mutex);
2110 return ret;
2111 }
2112
rtrs_srv_remove_one(struct ib_device * device,void * client_data)2113 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2114 {
2115 struct rtrs_srv_ctx *ctx;
2116
2117 mutex_lock(&ib_ctx.ib_dev_mutex);
2118 ib_ctx.ib_dev_count--;
2119
2120 if (ib_ctx.ib_dev_count)
2121 goto out;
2122
2123 /*
2124 * Since our CM IDs are NOT bound to any ib device we will remove them
2125 * only once, when the last device is removed
2126 */
2127 ctx = ib_ctx.srv_ctx;
2128 rdma_destroy_id(ctx->cm_id_ip);
2129 rdma_destroy_id(ctx->cm_id_ib);
2130
2131 out:
2132 mutex_unlock(&ib_ctx.ib_dev_mutex);
2133 }
2134
2135 static struct ib_client rtrs_srv_client = {
2136 .name = "rtrs_server",
2137 .add = rtrs_srv_add_one,
2138 .remove = rtrs_srv_remove_one
2139 };
2140
2141 /**
2142 * rtrs_srv_open() - open RTRS server context
2143 * @ops: callback functions
2144 * @port: port to listen on
2145 *
2146 * Creates server context with specified callbacks.
2147 *
2148 * Return a valid pointer on success otherwise PTR_ERR.
2149 */
rtrs_srv_open(struct rtrs_srv_ops * ops,u16 port)2150 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2151 {
2152 struct rtrs_srv_ctx *ctx;
2153 int err;
2154
2155 ctx = alloc_srv_ctx(ops);
2156 if (!ctx)
2157 return ERR_PTR(-ENOMEM);
2158
2159 mutex_init(&ib_ctx.ib_dev_mutex);
2160 ib_ctx.srv_ctx = ctx;
2161 ib_ctx.port = port;
2162
2163 err = ib_register_client(&rtrs_srv_client);
2164 if (err) {
2165 free_srv_ctx(ctx);
2166 return ERR_PTR(err);
2167 }
2168
2169 return ctx;
2170 }
2171 EXPORT_SYMBOL(rtrs_srv_open);
2172
close_sessions(struct rtrs_srv * srv)2173 static void close_sessions(struct rtrs_srv *srv)
2174 {
2175 struct rtrs_srv_sess *sess;
2176
2177 mutex_lock(&srv->paths_mutex);
2178 list_for_each_entry(sess, &srv->paths_list, s.entry)
2179 close_sess(sess);
2180 mutex_unlock(&srv->paths_mutex);
2181 }
2182
close_ctx(struct rtrs_srv_ctx * ctx)2183 static void close_ctx(struct rtrs_srv_ctx *ctx)
2184 {
2185 struct rtrs_srv *srv;
2186
2187 mutex_lock(&ctx->srv_mutex);
2188 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2189 close_sessions(srv);
2190 mutex_unlock(&ctx->srv_mutex);
2191 flush_workqueue(rtrs_wq);
2192 }
2193
2194 /**
2195 * rtrs_srv_close() - close RTRS server context
2196 * @ctx: pointer to server context
2197 *
2198 * Closes RTRS server context with all client sessions.
2199 */
rtrs_srv_close(struct rtrs_srv_ctx * ctx)2200 void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2201 {
2202 ib_unregister_client(&rtrs_srv_client);
2203 mutex_destroy(&ib_ctx.ib_dev_mutex);
2204 close_ctx(ctx);
2205 free_srv_ctx(ctx);
2206 }
2207 EXPORT_SYMBOL(rtrs_srv_close);
2208
check_module_params(void)2209 static int check_module_params(void)
2210 {
2211 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2212 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2213 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2214 return -EINVAL;
2215 }
2216 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2217 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2218 max_chunk_size, MIN_CHUNK_SIZE);
2219 return -EINVAL;
2220 }
2221
2222 /*
2223 * Check if IB immediate data size is enough to hold the mem_id and the
2224 * offset inside the memory chunk
2225 */
2226 if ((ilog2(sess_queue_depth - 1) + 1) +
2227 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2228 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2229 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2230 return -EINVAL;
2231 }
2232
2233 return 0;
2234 }
2235
rtrs_server_init(void)2236 static int __init rtrs_server_init(void)
2237 {
2238 int err;
2239
2240 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2241 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2242 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2243 sess_queue_depth, always_invalidate);
2244
2245 rtrs_rdma_dev_pd_init(0, &dev_pd);
2246
2247 err = check_module_params();
2248 if (err) {
2249 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2250 err);
2251 return err;
2252 }
2253 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2254 get_order(max_chunk_size));
2255 if (!chunk_pool)
2256 return -ENOMEM;
2257 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2258 if (IS_ERR(rtrs_dev_class)) {
2259 err = PTR_ERR(rtrs_dev_class);
2260 goto out_chunk_pool;
2261 }
2262 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2263 if (!rtrs_wq) {
2264 err = -ENOMEM;
2265 goto out_dev_class;
2266 }
2267
2268 return 0;
2269
2270 out_dev_class:
2271 class_destroy(rtrs_dev_class);
2272 out_chunk_pool:
2273 mempool_destroy(chunk_pool);
2274
2275 return err;
2276 }
2277
rtrs_server_exit(void)2278 static void __exit rtrs_server_exit(void)
2279 {
2280 destroy_workqueue(rtrs_wq);
2281 class_destroy(rtrs_dev_class);
2282 mempool_destroy(chunk_pool);
2283 rtrs_rdma_dev_pd_deinit(&dev_pd);
2284 }
2285
2286 module_init(rtrs_server_init);
2287 module_exit(rtrs_server_exit);
2288