Lines Matching refs:con

110 static void con_out_kvec_reset(struct ceph_connection *con)  in con_out_kvec_reset()  argument
112 BUG_ON(con->v1.out_skip); in con_out_kvec_reset()
114 con->v1.out_kvec_left = 0; in con_out_kvec_reset()
115 con->v1.out_kvec_bytes = 0; in con_out_kvec_reset()
116 con->v1.out_kvec_cur = &con->v1.out_kvec[0]; in con_out_kvec_reset()
119 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
122 int index = con->v1.out_kvec_left; in con_out_kvec_add()
124 BUG_ON(con->v1.out_skip); in con_out_kvec_add()
125 BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec)); in con_out_kvec_add()
127 con->v1.out_kvec[index].iov_len = size; in con_out_kvec_add()
128 con->v1.out_kvec[index].iov_base = data; in con_out_kvec_add()
129 con->v1.out_kvec_left++; in con_out_kvec_add()
130 con->v1.out_kvec_bytes += size; in con_out_kvec_add()
138 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
142 if (con->v1.out_kvec_bytes > 0) { in con_out_kvec_skip()
143 skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len; in con_out_kvec_skip()
144 BUG_ON(con->v1.out_kvec_bytes < skip); in con_out_kvec_skip()
145 BUG_ON(!con->v1.out_kvec_left); in con_out_kvec_skip()
146 con->v1.out_kvec_bytes -= skip; in con_out_kvec_skip()
147 con->v1.out_kvec_left--; in con_out_kvec_skip()
153 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
155 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
171 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
173 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
177 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
178 con_out_kvec_add(con, sizeof_footer(con), &m->footer); in prepare_write_message_footer()
179 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
180 if (con->ops->sign_message) in prepare_write_message_footer()
181 con->ops->sign_message(m); in prepare_write_message_footer()
187 con->v1.out_more = m->more_to_follow; in prepare_write_message_footer()
188 con->v1.out_msg_done = true; in prepare_write_message_footer()
194 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
199 con_out_kvec_reset(con); in prepare_write_message()
200 con->v1.out_msg_done = false; in prepare_write_message()
204 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
205 con->in_seq_acked = con->in_seq; in prepare_write_message()
206 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
207 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
208 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_message()
209 &con->v1.out_temp_ack); in prepare_write_message()
212 ceph_con_get_out_msg(con); in prepare_write_message()
213 m = con->out_msg; in prepare_write_message()
216 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
223 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
224 con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr); in prepare_write_message()
225 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
228 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
233 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
234 memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr)); in prepare_write_message()
238 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
242 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
244 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
246 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
247 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
248 con->out_msg->footer.flags = 0; in prepare_write_message()
251 con->out_msg->footer.data_crc = 0; in prepare_write_message()
253 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
254 con->v1.out_more = 1; /* data + footer will follow */ in prepare_write_message()
257 prepare_write_message_footer(con); in prepare_write_message()
260 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_message()
266 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
268 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
269 con->in_seq_acked, con->in_seq); in prepare_write_ack()
270 con->in_seq_acked = con->in_seq; in prepare_write_ack()
272 con_out_kvec_reset(con); in prepare_write_ack()
274 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
276 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
277 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_ack()
278 &con->v1.out_temp_ack); in prepare_write_ack()
280 con->v1.out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
281 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_ack()
287 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
289 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
290 con->in_seq_acked, con->in_seq); in prepare_write_seq()
291 con->in_seq_acked = con->in_seq; in prepare_write_seq()
293 con_out_kvec_reset(con); in prepare_write_seq()
295 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
296 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), in prepare_write_seq()
297 &con->v1.out_temp_ack); in prepare_write_seq()
299 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_seq()
305 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
307 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
308 con_out_kvec_reset(con); in prepare_write_keepalive()
309 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
313 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
314 ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now); in prepare_write_keepalive()
315 con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2), in prepare_write_keepalive()
316 &con->v1.out_temp_keepalive2); in prepare_write_keepalive()
318 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
320 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_keepalive()
327 static int get_connect_authorizer(struct ceph_connection *con) in get_connect_authorizer() argument
332 if (!con->ops->get_authorizer) { in get_connect_authorizer()
333 con->v1.auth = NULL; in get_connect_authorizer()
334 con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
335 con->v1.out_connect.authorizer_len = 0; in get_connect_authorizer()
339 auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry); in get_connect_authorizer()
343 con->v1.auth = auth; in get_connect_authorizer()
344 con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in get_connect_authorizer()
345 con->v1.out_connect.authorizer_len = in get_connect_authorizer()
353 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
355 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
356 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
357 &con->msgr->my_enc_addr); in prepare_write_banner()
359 con->v1.out_more = 0; in prepare_write_banner()
360 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in prepare_write_banner()
363 static void __prepare_write_connect(struct ceph_connection *con) in __prepare_write_connect() argument
365 con_out_kvec_add(con, sizeof(con->v1.out_connect), in __prepare_write_connect()
366 &con->v1.out_connect); in __prepare_write_connect()
367 if (con->v1.auth) in __prepare_write_connect()
368 con_out_kvec_add(con, con->v1.auth->authorizer_buf_len, in __prepare_write_connect()
369 con->v1.auth->authorizer_buf); in __prepare_write_connect()
371 con->v1.out_more = 0; in __prepare_write_connect()
372 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); in __prepare_write_connect()
375 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
377 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); in prepare_write_connect()
381 switch (con->peer_name.type) { in prepare_write_connect()
395 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
396 con->v1.connect_seq, global_seq, proto); in prepare_write_connect()
398 con->v1.out_connect.features = in prepare_write_connect()
399 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
400 con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
401 con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq); in prepare_write_connect()
402 con->v1.out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
403 con->v1.out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
404 con->v1.out_connect.flags = 0; in prepare_write_connect()
406 ret = get_connect_authorizer(con); in prepare_write_connect()
410 __prepare_write_connect(con); in prepare_write_connect()
420 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
424 dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes); in write_partial_kvec()
425 while (con->v1.out_kvec_bytes > 0) { in write_partial_kvec()
426 ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur, in write_partial_kvec()
427 con->v1.out_kvec_left, in write_partial_kvec()
428 con->v1.out_kvec_bytes, in write_partial_kvec()
429 con->v1.out_more); in write_partial_kvec()
432 con->v1.out_kvec_bytes -= ret; in write_partial_kvec()
433 if (!con->v1.out_kvec_bytes) in write_partial_kvec()
437 while (ret >= con->v1.out_kvec_cur->iov_len) { in write_partial_kvec()
438 BUG_ON(!con->v1.out_kvec_left); in write_partial_kvec()
439 ret -= con->v1.out_kvec_cur->iov_len; in write_partial_kvec()
440 con->v1.out_kvec_cur++; in write_partial_kvec()
441 con->v1.out_kvec_left--; in write_partial_kvec()
445 con->v1.out_kvec_cur->iov_len -= ret; in write_partial_kvec()
446 con->v1.out_kvec_cur->iov_base += ret; in write_partial_kvec()
449 con->v1.out_kvec_left = 0; in write_partial_kvec()
452 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
453 con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret); in write_partial_kvec()
464 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
466 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
468 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
471 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
497 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, in write_partial_message_data()
510 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
517 con_out_kvec_reset(con); in write_partial_message_data()
518 prepare_write_message_footer(con); in write_partial_message_data()
526 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
530 dout("%s %p %d left\n", __func__, con, con->v1.out_skip); in write_partial_skip()
531 while (con->v1.out_skip > 0) { in write_partial_skip()
532 size_t size = min(con->v1.out_skip, (int)PAGE_SIZE); in write_partial_skip()
534 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, in write_partial_skip()
538 con->v1.out_skip -= ret; in write_partial_skip()
548 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
550 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
551 con->v1.in_base_pos = 0; in prepare_read_banner()
554 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
556 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
557 con->v1.in_base_pos = 0; in prepare_read_connect()
560 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
562 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
563 con->v1.in_base_pos = 0; in prepare_read_ack()
566 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
568 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
569 con->v1.in_base_pos = 0; in prepare_read_seq()
570 con->v1.in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
573 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
575 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
576 con->v1.in_base_pos = 0; in prepare_read_tag()
577 con->v1.in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
580 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
582 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
583 con->v1.in_base_pos = 0; in prepare_read_keepalive_ack()
589 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
591 dout("prepare_read_message %p\n", con); in prepare_read_message()
592 BUG_ON(con->in_msg != NULL); in prepare_read_message()
593 con->v1.in_base_pos = 0; in prepare_read_message()
594 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
598 static int read_partial(struct ceph_connection *con, in read_partial() argument
601 while (con->v1.in_base_pos < end) { in read_partial()
602 int left = end - con->v1.in_base_pos; in read_partial()
604 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
607 con->v1.in_base_pos += ret; in read_partial()
615 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
621 dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos); in read_partial_banner()
626 ret = read_partial(con, end, size, con->v1.in_banner); in read_partial_banner()
630 size = sizeof(con->v1.actual_peer_addr); in read_partial_banner()
632 ret = read_partial(con, end, size, &con->v1.actual_peer_addr); in read_partial_banner()
635 ceph_decode_banner_addr(&con->v1.actual_peer_addr); in read_partial_banner()
637 size = sizeof(con->v1.peer_addr_for_me); in read_partial_banner()
639 ret = read_partial(con, end, size, &con->v1.peer_addr_for_me); in read_partial_banner()
642 ceph_decode_banner_addr(&con->v1.peer_addr_for_me); in read_partial_banner()
648 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
654 dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos); in read_partial_connect()
656 size = sizeof(con->v1.in_reply); in read_partial_connect()
658 ret = read_partial(con, end, size, &con->v1.in_reply); in read_partial_connect()
662 if (con->v1.auth) { in read_partial_connect()
663 size = le32_to_cpu(con->v1.in_reply.authorizer_len); in read_partial_connect()
664 if (size > con->v1.auth->authorizer_reply_buf_len) { in read_partial_connect()
666 con->v1.auth->authorizer_reply_buf_len); in read_partial_connect()
672 ret = read_partial(con, end, size, in read_partial_connect()
673 con->v1.auth->authorizer_reply_buf); in read_partial_connect()
679 con, con->v1.in_reply.tag, in read_partial_connect()
680 le32_to_cpu(con->v1.in_reply.connect_seq), in read_partial_connect()
681 le32_to_cpu(con->v1.in_reply.global_seq)); in read_partial_connect()
689 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
691 if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
693 ceph_pr_addr(&con->peer_addr)); in verify_hello()
694 con->error_msg = "protocol error, bad banner"; in verify_hello()
700 static int process_banner(struct ceph_connection *con) in process_banner() argument
702 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; in process_banner()
704 dout("process_banner on %p\n", con); in process_banner()
706 if (verify_hello(con) < 0) in process_banner()
714 if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr, in process_banner()
715 sizeof(con->peer_addr)) != 0 && in process_banner()
716 !(ceph_addr_is_blank(&con->v1.actual_peer_addr) && in process_banner()
717 con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
719 ceph_pr_addr(&con->peer_addr), in process_banner()
720 le32_to_cpu(con->peer_addr.nonce), in process_banner()
721 ceph_pr_addr(&con->v1.actual_peer_addr), in process_banner()
722 le32_to_cpu(con->v1.actual_peer_addr.nonce)); in process_banner()
723 con->error_msg = "wrong peer at address"; in process_banner()
732 &con->v1.peer_addr_for_me.in_addr, in process_banner()
733 sizeof(con->v1.peer_addr_for_me.in_addr)); in process_banner()
735 ceph_encode_my_addr(con->msgr); in process_banner()
743 static int process_connect(struct ceph_connection *con) in process_connect() argument
745 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
746 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
747 u64 server_feat = le64_to_cpu(con->v1.in_reply.features); in process_connect()
750 dout("process_connect on %p tag %d\n", con, con->v1.in_tag); in process_connect()
752 if (con->v1.auth) { in process_connect()
753 int len = le32_to_cpu(con->v1.in_reply.authorizer_len); in process_connect()
762 if (con->v1.in_reply.tag == in process_connect()
764 ret = con->ops->add_authorizer_challenge( in process_connect()
765 con, con->v1.auth->authorizer_reply_buf, len); in process_connect()
769 con_out_kvec_reset(con); in process_connect()
770 __prepare_write_connect(con); in process_connect()
771 prepare_read_connect(con); in process_connect()
776 ret = con->ops->verify_authorizer_reply(con); in process_connect()
778 con->error_msg = "bad authorize reply"; in process_connect()
784 switch (con->v1.in_reply.tag) { in process_connect()
788 ENTITY_NAME(con->peer_name), in process_connect()
789 ceph_pr_addr(&con->peer_addr), in process_connect()
791 con->error_msg = "missing required protocol features"; in process_connect()
797 ENTITY_NAME(con->peer_name), in process_connect()
798 ceph_pr_addr(&con->peer_addr), in process_connect()
799 le32_to_cpu(con->v1.out_connect.protocol_version), in process_connect()
800 le32_to_cpu(con->v1.in_reply.protocol_version)); in process_connect()
801 con->error_msg = "protocol version mismatch"; in process_connect()
805 con->v1.auth_retry++; in process_connect()
806 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
807 con->v1.auth_retry); in process_connect()
808 if (con->v1.auth_retry == 2) { in process_connect()
809 con->error_msg = "connect authorization failure"; in process_connect()
812 con_out_kvec_reset(con); in process_connect()
813 ret = prepare_write_connect(con); in process_connect()
816 prepare_read_connect(con); in process_connect()
828 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
830 ENTITY_NAME(con->peer_name), in process_connect()
831 ceph_pr_addr(&con->peer_addr)); in process_connect()
832 ceph_con_reset_session(con); in process_connect()
833 con_out_kvec_reset(con); in process_connect()
834 ret = prepare_write_connect(con); in process_connect()
837 prepare_read_connect(con); in process_connect()
840 mutex_unlock(&con->mutex); in process_connect()
841 if (con->ops->peer_reset) in process_connect()
842 con->ops->peer_reset(con); in process_connect()
843 mutex_lock(&con->mutex); in process_connect()
844 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) in process_connect()
854 le32_to_cpu(con->v1.out_connect.connect_seq), in process_connect()
855 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
856 con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq); in process_connect()
857 con_out_kvec_reset(con); in process_connect()
858 ret = prepare_write_connect(con); in process_connect()
861 prepare_read_connect(con); in process_connect()
870 con->v1.peer_global_seq, in process_connect()
871 le32_to_cpu(con->v1.in_reply.global_seq)); in process_connect()
872 ceph_get_global_seq(con->msgr, in process_connect()
873 le32_to_cpu(con->v1.in_reply.global_seq)); in process_connect()
874 con_out_kvec_reset(con); in process_connect()
875 ret = prepare_write_connect(con); in process_connect()
878 prepare_read_connect(con); in process_connect()
886 ENTITY_NAME(con->peer_name), in process_connect()
887 ceph_pr_addr(&con->peer_addr), in process_connect()
889 con->error_msg = "missing required protocol features"; in process_connect()
893 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); in process_connect()
894 con->state = CEPH_CON_S_OPEN; in process_connect()
895 con->v1.auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
896 con->v1.peer_global_seq = in process_connect()
897 le32_to_cpu(con->v1.in_reply.global_seq); in process_connect()
898 con->v1.connect_seq++; in process_connect()
899 con->peer_features = server_feat; in process_connect()
901 con->v1.peer_global_seq, in process_connect()
902 le32_to_cpu(con->v1.in_reply.connect_seq), in process_connect()
903 con->v1.connect_seq); in process_connect()
904 WARN_ON(con->v1.connect_seq != in process_connect()
905 le32_to_cpu(con->v1.in_reply.connect_seq)); in process_connect()
907 if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
908 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); in process_connect()
910 con->delay = 0; /* reset backoff memory */ in process_connect()
912 if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
913 prepare_write_seq(con); in process_connect()
914 prepare_read_seq(con); in process_connect()
916 prepare_read_tag(con); in process_connect()
927 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
931 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
940 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
942 int size = sizeof(con->v1.in_temp_ack); in read_partial_ack()
945 return read_partial(con, end, size, &con->v1.in_temp_ack); in read_partial_ack()
951 static void process_ack(struct ceph_connection *con) in process_ack() argument
953 u64 ack = le64_to_cpu(con->v1.in_temp_ack); in process_ack()
955 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK) in process_ack()
956 ceph_con_discard_sent(con, ack); in process_ack()
958 ceph_con_discard_requeued(con, ack); in process_ack()
960 prepare_read_tag(con); in process_ack()
963 static int read_partial_message_chunk(struct ceph_connection *con, in read_partial_message_chunk() argument
974 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_chunk()
986 static inline int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
991 return read_partial_message_chunk(con, section, sec_len, crc); in read_partial_message_section()
994 static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc) in read_sparse_msg_extent() argument
996 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_sparse_msg_extent()
997 bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE); in read_sparse_msg_extent()
999 if (do_bounce && unlikely(!con->bounce_page)) { in read_sparse_msg_extent()
1000 con->bounce_page = alloc_page(GFP_NOIO); in read_sparse_msg_extent()
1001 if (!con->bounce_page) { in read_sparse_msg_extent()
1013 rpage = do_bounce ? con->bounce_page : page; in read_sparse_msg_extent()
1017 ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len); in read_sparse_msg_extent()
1029 static int read_sparse_msg_data(struct ceph_connection *con) in read_sparse_msg_data() argument
1031 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_sparse_msg_data()
1032 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_sparse_msg_data()
1037 crc = con->in_data_crc; in read_sparse_msg_data()
1040 if (con->v1.in_sr_kvec.iov_base) in read_sparse_msg_data()
1041 ret = read_partial_message_chunk(con, in read_sparse_msg_data()
1042 &con->v1.in_sr_kvec, in read_sparse_msg_data()
1043 con->v1.in_sr_len, in read_sparse_msg_data()
1046 ret = read_sparse_msg_extent(con, &crc); in read_sparse_msg_data()
1050 con->in_data_crc = crc; in read_sparse_msg_data()
1054 memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec)); in read_sparse_msg_data()
1055 ret = con->ops->sparse_read(con, cursor, in read_sparse_msg_data()
1056 (char **)&con->v1.in_sr_kvec.iov_base); in read_sparse_msg_data()
1057 con->v1.in_sr_len = ret; in read_sparse_msg_data()
1061 con->in_data_crc = crc; in read_sparse_msg_data()
1066 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
1068 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_msg_data()
1069 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
1077 crc = con->in_data_crc; in read_partial_msg_data()
1085 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
1088 con->in_data_crc = crc; in read_partial_msg_data()
1098 con->in_data_crc = crc; in read_partial_msg_data()
1103 static int read_partial_msg_data_bounce(struct ceph_connection *con) in read_partial_msg_data_bounce() argument
1105 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; in read_partial_msg_data_bounce()
1111 if (unlikely(!con->bounce_page)) { in read_partial_msg_data_bounce()
1112 con->bounce_page = alloc_page(GFP_NOIO); in read_partial_msg_data_bounce()
1113 if (!con->bounce_page) { in read_partial_msg_data_bounce()
1119 crc = con->in_data_crc; in read_partial_msg_data_bounce()
1127 ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len); in read_partial_msg_data_bounce()
1129 con->in_data_crc = crc; in read_partial_msg_data_bounce()
1133 crc = crc32c(crc, page_address(con->bounce_page), ret); in read_partial_msg_data_bounce()
1134 memcpy_to_page(page, off, page_address(con->bounce_page), ret); in read_partial_msg_data_bounce()
1138 con->in_data_crc = crc; in read_partial_msg_data_bounce()
1146 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
1148 struct ceph_msg *m = con->in_msg; in read_partial_message()
1153 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
1154 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
1158 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
1161 size = sizeof(con->v1.in_hdr); in read_partial_message()
1163 ret = read_partial(con, end, size, &con->v1.in_hdr); in read_partial_message()
1167 crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
1168 if (cpu_to_le32(crc) != con->v1.in_hdr.crc) { in read_partial_message()
1170 crc, con->v1.in_hdr.crc); in read_partial_message()
1174 front_len = le32_to_cpu(con->v1.in_hdr.front_len); in read_partial_message()
1177 middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); in read_partial_message()
1180 data_len = le32_to_cpu(con->v1.in_hdr.data_len); in read_partial_message()
1185 seq = le64_to_cpu(con->v1.in_hdr.seq); in read_partial_message()
1186 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
1188 ENTITY_NAME(con->peer_name), in read_partial_message()
1189 ceph_pr_addr(&con->peer_addr), in read_partial_message()
1190 seq, con->in_seq + 1); in read_partial_message()
1191 con->v1.in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
1192 sizeof_footer(con); in read_partial_message()
1193 con->v1.in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
1195 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
1197 seq, con->in_seq + 1); in read_partial_message()
1198 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
1203 if (!con->in_msg) { in read_partial_message()
1206 dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type, in read_partial_message()
1208 ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip); in read_partial_message()
1212 BUG_ON((!con->in_msg) ^ skip); in read_partial_message()
1216 con->v1.in_base_pos = -front_len - middle_len - in read_partial_message()
1217 data_len - sizeof_footer(con); in read_partial_message()
1218 con->v1.in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
1219 con->in_seq++; in read_partial_message()
1223 BUG_ON(!con->in_msg); in read_partial_message()
1224 BUG_ON(con->in_msg->con != con); in read_partial_message()
1225 m = con->in_msg; in read_partial_message()
1233 prepare_message_data(con->in_msg, data_len); in read_partial_message()
1237 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
1238 &con->in_front_crc); in read_partial_message()
1244 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
1246 &con->in_middle_crc); in read_partial_message()
1257 ret = read_sparse_msg_data(con); in read_partial_message()
1258 else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) in read_partial_message()
1259 ret = read_partial_msg_data_bounce(con); in read_partial_message()
1261 ret = read_partial_msg_data(con); in read_partial_message()
1267 size = sizeof_footer(con); in read_partial_message()
1269 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
1283 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
1285 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
1288 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
1290 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
1295 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
1297 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
1301 if (need_sign && con->ops->check_message_signature && in read_partial_message()
1302 con->ops->check_message_signature(m)) { in read_partial_message()
1310 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
1314 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
1317 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
1318 prepare_read_tag(con); in read_keepalive_ack()
1325 int ceph_con_v1_try_read(struct ceph_connection *con) in ceph_con_v1_try_read() argument
1330 dout("try_read start %p state %d\n", con, con->state); in ceph_con_v1_try_read()
1331 if (con->state != CEPH_CON_S_V1_BANNER && in ceph_con_v1_try_read()
1332 con->state != CEPH_CON_S_V1_CONNECT_MSG && in ceph_con_v1_try_read()
1333 con->state != CEPH_CON_S_OPEN) in ceph_con_v1_try_read()
1336 BUG_ON(!con->sock); in ceph_con_v1_try_read()
1338 dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag, in ceph_con_v1_try_read()
1339 con->v1.in_base_pos); in ceph_con_v1_try_read()
1341 if (con->state == CEPH_CON_S_V1_BANNER) { in ceph_con_v1_try_read()
1342 ret = read_partial_banner(con); in ceph_con_v1_try_read()
1345 ret = process_banner(con); in ceph_con_v1_try_read()
1349 con->state = CEPH_CON_S_V1_CONNECT_MSG; in ceph_con_v1_try_read()
1356 ret = prepare_write_connect(con); in ceph_con_v1_try_read()
1359 prepare_read_connect(con); in ceph_con_v1_try_read()
1365 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { in ceph_con_v1_try_read()
1366 ret = read_partial_connect(con); in ceph_con_v1_try_read()
1369 ret = process_connect(con); in ceph_con_v1_try_read()
1375 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_con_v1_try_read()
1377 if (con->v1.in_base_pos < 0) { in ceph_con_v1_try_read()
1381 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos); in ceph_con_v1_try_read()
1384 dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos); in ceph_con_v1_try_read()
1385 con->v1.in_base_pos += ret; in ceph_con_v1_try_read()
1386 if (con->v1.in_base_pos) in ceph_con_v1_try_read()
1389 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) { in ceph_con_v1_try_read()
1393 ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1); in ceph_con_v1_try_read()
1396 dout("try_read got tag %d\n", con->v1.in_tag); in ceph_con_v1_try_read()
1397 switch (con->v1.in_tag) { in ceph_con_v1_try_read()
1399 prepare_read_message(con); in ceph_con_v1_try_read()
1402 prepare_read_ack(con); in ceph_con_v1_try_read()
1405 prepare_read_keepalive_ack(con); in ceph_con_v1_try_read()
1408 ceph_con_close_socket(con); in ceph_con_v1_try_read()
1409 con->state = CEPH_CON_S_CLOSED; in ceph_con_v1_try_read()
1415 if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) { in ceph_con_v1_try_read()
1416 ret = read_partial_message(con); in ceph_con_v1_try_read()
1420 con->error_msg = "bad crc/signature"; in ceph_con_v1_try_read()
1426 con->error_msg = "io error"; in ceph_con_v1_try_read()
1431 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) in ceph_con_v1_try_read()
1433 ceph_con_process_message(con); in ceph_con_v1_try_read()
1434 if (con->state == CEPH_CON_S_OPEN) in ceph_con_v1_try_read()
1435 prepare_read_tag(con); in ceph_con_v1_try_read()
1438 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK || in ceph_con_v1_try_read()
1439 con->v1.in_tag == CEPH_MSGR_TAG_SEQ) { in ceph_con_v1_try_read()
1444 ret = read_partial_ack(con); in ceph_con_v1_try_read()
1447 process_ack(con); in ceph_con_v1_try_read()
1450 if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in ceph_con_v1_try_read()
1451 ret = read_keepalive_ack(con); in ceph_con_v1_try_read()
1458 dout("try_read done on %p ret %d\n", con, ret); in ceph_con_v1_try_read()
1462 pr_err("try_read bad tag %d\n", con->v1.in_tag); in ceph_con_v1_try_read()
1463 con->error_msg = "protocol error, garbage tag"; in ceph_con_v1_try_read()
1472 int ceph_con_v1_try_write(struct ceph_connection *con) in ceph_con_v1_try_write() argument
1476 dout("try_write start %p state %d\n", con, con->state); in ceph_con_v1_try_write()
1477 if (con->state != CEPH_CON_S_PREOPEN && in ceph_con_v1_try_write()
1478 con->state != CEPH_CON_S_V1_BANNER && in ceph_con_v1_try_write()
1479 con->state != CEPH_CON_S_V1_CONNECT_MSG && in ceph_con_v1_try_write()
1480 con->state != CEPH_CON_S_OPEN) in ceph_con_v1_try_write()
1484 if (con->state == CEPH_CON_S_PREOPEN) { in ceph_con_v1_try_write()
1485 BUG_ON(con->sock); in ceph_con_v1_try_write()
1486 con->state = CEPH_CON_S_V1_BANNER; in ceph_con_v1_try_write()
1488 con_out_kvec_reset(con); in ceph_con_v1_try_write()
1489 prepare_write_banner(con); in ceph_con_v1_try_write()
1490 prepare_read_banner(con); in ceph_con_v1_try_write()
1492 BUG_ON(con->in_msg); in ceph_con_v1_try_write()
1493 con->v1.in_tag = CEPH_MSGR_TAG_READY; in ceph_con_v1_try_write()
1495 con, con->state); in ceph_con_v1_try_write()
1496 ret = ceph_tcp_connect(con); in ceph_con_v1_try_write()
1498 con->error_msg = "connect error"; in ceph_con_v1_try_write()
1504 dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes); in ceph_con_v1_try_write()
1505 BUG_ON(!con->sock); in ceph_con_v1_try_write()
1508 if (con->v1.out_kvec_left) { in ceph_con_v1_try_write()
1509 ret = write_partial_kvec(con); in ceph_con_v1_try_write()
1513 if (con->v1.out_skip) { in ceph_con_v1_try_write()
1514 ret = write_partial_skip(con); in ceph_con_v1_try_write()
1520 if (con->out_msg) { in ceph_con_v1_try_write()
1521 if (con->v1.out_msg_done) { in ceph_con_v1_try_write()
1522 ceph_msg_put(con->out_msg); in ceph_con_v1_try_write()
1523 con->out_msg = NULL; /* we're done with this one */ in ceph_con_v1_try_write()
1527 ret = write_partial_message_data(con); in ceph_con_v1_try_write()
1540 if (con->state == CEPH_CON_S_OPEN) { in ceph_con_v1_try_write()
1541 if (ceph_con_flag_test_and_clear(con, in ceph_con_v1_try_write()
1543 prepare_write_keepalive(con); in ceph_con_v1_try_write()
1547 if (!list_empty(&con->out_queue)) { in ceph_con_v1_try_write()
1548 prepare_write_message(con); in ceph_con_v1_try_write()
1551 if (con->in_seq > con->in_seq_acked) { in ceph_con_v1_try_write()
1552 prepare_write_ack(con); in ceph_con_v1_try_write()
1558 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in ceph_con_v1_try_write()
1562 dout("try_write done on %p ret %d\n", con, ret); in ceph_con_v1_try_write()
1566 void ceph_con_v1_revoke(struct ceph_connection *con) in ceph_con_v1_revoke() argument
1568 struct ceph_msg *msg = con->out_msg; in ceph_con_v1_revoke()
1570 WARN_ON(con->v1.out_skip); in ceph_con_v1_revoke()
1572 if (con->v1.out_msg_done) { in ceph_con_v1_revoke()
1573 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1576 con->v1.out_skip += sizeof_footer(con); in ceph_con_v1_revoke()
1580 con->v1.out_skip += msg->cursor.total_resid; in ceph_con_v1_revoke()
1582 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1583 con->v1.out_skip += con_out_kvec_skip(con); in ceph_con_v1_revoke()
1585 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, in ceph_con_v1_revoke()
1586 con->v1.out_kvec_bytes, con->v1.out_skip); in ceph_con_v1_revoke()
1589 void ceph_con_v1_revoke_incoming(struct ceph_connection *con) in ceph_con_v1_revoke_incoming() argument
1591 unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len); in ceph_con_v1_revoke_incoming()
1592 unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); in ceph_con_v1_revoke_incoming()
1593 unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len); in ceph_con_v1_revoke_incoming()
1596 con->v1.in_base_pos = con->v1.in_base_pos - in ceph_con_v1_revoke_incoming()
1603 con->v1.in_tag = CEPH_MSGR_TAG_READY; in ceph_con_v1_revoke_incoming()
1604 con->in_seq++; in ceph_con_v1_revoke_incoming()
1606 dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos); in ceph_con_v1_revoke_incoming()
1609 bool ceph_con_v1_opened(struct ceph_connection *con) in ceph_con_v1_opened() argument
1611 return con->v1.connect_seq; in ceph_con_v1_opened()
1614 void ceph_con_v1_reset_session(struct ceph_connection *con) in ceph_con_v1_reset_session() argument
1616 con->v1.connect_seq = 0; in ceph_con_v1_reset_session()
1617 con->v1.peer_global_seq = 0; in ceph_con_v1_reset_session()
1620 void ceph_con_v1_reset_protocol(struct ceph_connection *con) in ceph_con_v1_reset_protocol() argument
1622 con->v1.out_skip = 0; in ceph_con_v1_reset_protocol()