Lines Matching refs:con

119 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)  in con_flag_clear()  argument
123 clear_bit(con_flag, &con->flags); in con_flag_clear()
126 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) in con_flag_set() argument
130 set_bit(con_flag, &con->flags); in con_flag_set()
133 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) in con_flag_test() argument
137 return test_bit(con_flag, &con->flags); in con_flag_test()
140 static bool con_flag_test_and_clear(struct ceph_connection *con, in con_flag_test_and_clear() argument
145 return test_and_clear_bit(con_flag, &con->flags); in con_flag_test_and_clear()
148 static bool con_flag_test_and_set(struct ceph_connection *con, in con_flag_test_and_set() argument
153 return test_and_set_bit(con_flag, &con->flags); in con_flag_test_and_set()
171 static void queue_con(struct ceph_connection *con);
172 static void cancel_con(struct ceph_connection *con);
174 static void con_fault(struct ceph_connection *con);
312 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
316 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
319 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
323 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
327 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
330 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
334 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
338 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
341 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
345 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
349 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
354 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
358 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
362 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
368 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
379 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
380 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
386 con, con->state); in ceph_sock_data_ready()
387 queue_con(con); in ceph_sock_data_ready()
394 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
403 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { in ceph_sock_write_space()
405 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
407 queue_con(con); in ceph_sock_write_space()
410 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
417 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
420 con, con->state, sk->sk_state); in ceph_sock_state_change()
428 con_sock_state_closing(con); in ceph_sock_state_change()
429 con_flag_set(con, CON_FLAG_SOCK_CLOSED); in ceph_sock_state_change()
430 queue_con(con); in ceph_sock_state_change()
434 con_sock_state_connected(con); in ceph_sock_state_change()
435 queue_con(con); in ceph_sock_state_change()
446 struct ceph_connection *con) in set_sock_callbacks() argument
449 sk->sk_user_data = con; in set_sock_callbacks()
463 static int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
465 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; in ceph_tcp_connect()
470 BUG_ON(con->sock); in ceph_tcp_connect()
474 ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family, in ceph_tcp_connect()
485 set_sock_callbacks(sock, con); in ceph_tcp_connect()
487 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_tcp_connect()
489 con_sock_state_connecting(con); in ceph_tcp_connect()
494 ceph_pr_addr(&con->peer_addr.in_addr), in ceph_tcp_connect()
498 ceph_pr_addr(&con->peer_addr.in_addr), ret); in ceph_tcp_connect()
503 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { in ceph_tcp_connect()
513 con->sock = sock; in ceph_tcp_connect()
621 static int con_close_socket(struct ceph_connection *con) in con_close_socket() argument
625 dout("con_close_socket on %p sock %p\n", con, con->sock); in con_close_socket()
626 if (con->sock) { in con_close_socket()
627 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in con_close_socket()
628 sock_release(con->sock); in con_close_socket()
629 con->sock = NULL; in con_close_socket()
638 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); in con_close_socket()
640 con_sock_state_closed(con); in con_close_socket()
663 static void reset_connection(struct ceph_connection *con) in reset_connection() argument
667 dout("reset_connection %p\n", con); in reset_connection()
668 ceph_msg_remove_list(&con->out_queue); in reset_connection()
669 ceph_msg_remove_list(&con->out_sent); in reset_connection()
671 if (con->in_msg) { in reset_connection()
672 BUG_ON(con->in_msg->con != con); in reset_connection()
673 ceph_msg_put(con->in_msg); in reset_connection()
674 con->in_msg = NULL; in reset_connection()
677 con->connect_seq = 0; in reset_connection()
678 con->out_seq = 0; in reset_connection()
679 if (con->out_msg) { in reset_connection()
680 BUG_ON(con->out_msg->con != con); in reset_connection()
681 ceph_msg_put(con->out_msg); in reset_connection()
682 con->out_msg = NULL; in reset_connection()
684 con->in_seq = 0; in reset_connection()
685 con->in_seq_acked = 0; in reset_connection()
687 con->out_skip = 0; in reset_connection()
693 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
695 mutex_lock(&con->mutex); in ceph_con_close()
696 dout("con_close %p peer %s\n", con, in ceph_con_close()
697 ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_con_close()
698 con->state = CON_STATE_CLOSED; in ceph_con_close()
700 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ in ceph_con_close()
701 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_close()
702 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in ceph_con_close()
703 con_flag_clear(con, CON_FLAG_BACKOFF); in ceph_con_close()
705 reset_connection(con); in ceph_con_close()
706 con->peer_global_seq = 0; in ceph_con_close()
707 cancel_con(con); in ceph_con_close()
708 con_close_socket(con); in ceph_con_close()
709 mutex_unlock(&con->mutex); in ceph_con_close()
716 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
720 mutex_lock(&con->mutex); in ceph_con_open()
721 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); in ceph_con_open()
723 WARN_ON(con->state != CON_STATE_CLOSED); in ceph_con_open()
724 con->state = CON_STATE_PREOPEN; in ceph_con_open()
726 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
727 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
729 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
730 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
731 mutex_unlock(&con->mutex); in ceph_con_open()
732 queue_con(con); in ceph_con_open()
739 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
741 return con->connect_seq > 0; in ceph_con_opened()
747 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
751 dout("con_init %p\n", con); in ceph_con_init()
752 memset(con, 0, sizeof(*con)); in ceph_con_init()
753 con->private = private; in ceph_con_init()
754 con->ops = ops; in ceph_con_init()
755 con->msgr = msgr; in ceph_con_init()
757 con_sock_state_init(con); in ceph_con_init()
759 mutex_init(&con->mutex); in ceph_con_init()
760 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
761 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
762 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
764 con->state = CON_STATE_CLOSED; in ceph_con_init()
785 static void con_out_kvec_reset(struct ceph_connection *con) in con_out_kvec_reset() argument
787 BUG_ON(con->out_skip); in con_out_kvec_reset()
789 con->out_kvec_left = 0; in con_out_kvec_reset()
790 con->out_kvec_bytes = 0; in con_out_kvec_reset()
791 con->out_kvec_cur = &con->out_kvec[0]; in con_out_kvec_reset()
794 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
797 int index = con->out_kvec_left; in con_out_kvec_add()
799 BUG_ON(con->out_skip); in con_out_kvec_add()
800 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); in con_out_kvec_add()
802 con->out_kvec[index].iov_len = size; in con_out_kvec_add()
803 con->out_kvec[index].iov_base = data; in con_out_kvec_add()
804 con->out_kvec_left++; in con_out_kvec_add()
805 con->out_kvec_bytes += size; in con_out_kvec_add()
813 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
815 int off = con->out_kvec_cur - con->out_kvec; in con_out_kvec_skip()
818 if (con->out_kvec_bytes > 0) { in con_out_kvec_skip()
819 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; in con_out_kvec_skip()
820 BUG_ON(con->out_kvec_bytes < skip); in con_out_kvec_skip()
821 BUG_ON(!con->out_kvec_left); in con_out_kvec_skip()
822 con->out_kvec_bytes -= skip; in con_out_kvec_skip()
823 con->out_kvec_left--; in con_out_kvec_skip()
1242 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
1244 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
1263 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
1265 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
1269 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
1270 con_out_kvec_add(con, sizeof_footer(con), &m->footer); in prepare_write_message_footer()
1271 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
1272 if (con->ops->sign_message) in prepare_write_message_footer()
1273 con->ops->sign_message(m); in prepare_write_message_footer()
1279 con->out_more = m->more_to_follow; in prepare_write_message_footer()
1280 con->out_msg_done = true; in prepare_write_message_footer()
1286 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
1291 con_out_kvec_reset(con); in prepare_write_message()
1292 con->out_msg_done = false; in prepare_write_message()
1296 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
1297 con->in_seq_acked = con->in_seq; in prepare_write_message()
1298 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
1299 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
1300 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_message()
1301 &con->out_temp_ack); in prepare_write_message()
1304 BUG_ON(list_empty(&con->out_queue)); in prepare_write_message()
1305 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in prepare_write_message()
1306 con->out_msg = m; in prepare_write_message()
1307 BUG_ON(m->con != con); in prepare_write_message()
1311 list_move_tail(&m->list_head, &con->out_sent); in prepare_write_message()
1318 m->hdr.seq = cpu_to_le64(++con->out_seq); in prepare_write_message()
1321 if (con->ops->reencode_message) in prepare_write_message()
1322 con->ops->reencode_message(m); in prepare_write_message()
1326 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
1333 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
1334 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); in prepare_write_message()
1335 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
1338 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
1343 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
1344 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); in prepare_write_message()
1348 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
1352 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
1354 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
1356 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
1357 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
1358 con->out_msg->footer.flags = 0; in prepare_write_message()
1361 con->out_msg->footer.data_crc = 0; in prepare_write_message()
1363 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
1364 con->out_more = 1; /* data + footer will follow */ in prepare_write_message()
1367 prepare_write_message_footer(con); in prepare_write_message()
1370 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_message()
1376 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
1378 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
1379 con->in_seq_acked, con->in_seq); in prepare_write_ack()
1380 con->in_seq_acked = con->in_seq; in prepare_write_ack()
1382 con_out_kvec_reset(con); in prepare_write_ack()
1384 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
1386 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
1387 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_ack()
1388 &con->out_temp_ack); in prepare_write_ack()
1390 con->out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
1391 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_ack()
1397 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
1399 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
1400 con->in_seq_acked, con->in_seq); in prepare_write_seq()
1401 con->in_seq_acked = con->in_seq; in prepare_write_seq()
1403 con_out_kvec_reset(con); in prepare_write_seq()
1405 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
1406 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_seq()
1407 &con->out_temp_ack); in prepare_write_seq()
1409 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_seq()
1415 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
1417 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
1418 con_out_kvec_reset(con); in prepare_write_keepalive()
1419 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
1423 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
1424 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); in prepare_write_keepalive()
1425 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), in prepare_write_keepalive()
1426 &con->out_temp_keepalive2); in prepare_write_keepalive()
1428 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
1430 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_keepalive()
1437 static int get_connect_authorizer(struct ceph_connection *con) in get_connect_authorizer() argument
1442 if (!con->ops->get_authorizer) { in get_connect_authorizer()
1443 con->auth = NULL; in get_connect_authorizer()
1444 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
1445 con->out_connect.authorizer_len = 0; in get_connect_authorizer()
1449 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); in get_connect_authorizer()
1453 con->auth = auth; in get_connect_authorizer()
1454 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in get_connect_authorizer()
1455 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); in get_connect_authorizer()
1462 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
1464 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
1465 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
1466 &con->msgr->my_enc_addr); in prepare_write_banner()
1468 con->out_more = 0; in prepare_write_banner()
1469 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_banner()
1472 static void __prepare_write_connect(struct ceph_connection *con) in __prepare_write_connect() argument
1474 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); in __prepare_write_connect()
1475 if (con->auth) in __prepare_write_connect()
1476 con_out_kvec_add(con, con->auth->authorizer_buf_len, in __prepare_write_connect()
1477 con->auth->authorizer_buf); in __prepare_write_connect()
1479 con->out_more = 0; in __prepare_write_connect()
1480 con_flag_set(con, CON_FLAG_WRITE_PENDING); in __prepare_write_connect()
1483 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
1485 unsigned int global_seq = get_global_seq(con->msgr, 0); in prepare_write_connect()
1489 switch (con->peer_name.type) { in prepare_write_connect()
1503 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
1504 con->connect_seq, global_seq, proto); in prepare_write_connect()
1506 con->out_connect.features = in prepare_write_connect()
1507 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
1508 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
1509 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); in prepare_write_connect()
1510 con->out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
1511 con->out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
1512 con->out_connect.flags = 0; in prepare_write_connect()
1514 ret = get_connect_authorizer(con); in prepare_write_connect()
1518 __prepare_write_connect(con); in prepare_write_connect()
1528 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
1532 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); in write_partial_kvec()
1533 while (con->out_kvec_bytes > 0) { in write_partial_kvec()
1534 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, in write_partial_kvec()
1535 con->out_kvec_left, con->out_kvec_bytes, in write_partial_kvec()
1536 con->out_more); in write_partial_kvec()
1539 con->out_kvec_bytes -= ret; in write_partial_kvec()
1540 if (con->out_kvec_bytes == 0) in write_partial_kvec()
1544 while (ret >= con->out_kvec_cur->iov_len) { in write_partial_kvec()
1545 BUG_ON(!con->out_kvec_left); in write_partial_kvec()
1546 ret -= con->out_kvec_cur->iov_len; in write_partial_kvec()
1547 con->out_kvec_cur++; in write_partial_kvec()
1548 con->out_kvec_left--; in write_partial_kvec()
1552 con->out_kvec_cur->iov_len -= ret; in write_partial_kvec()
1553 con->out_kvec_cur->iov_base += ret; in write_partial_kvec()
1556 con->out_kvec_left = 0; in write_partial_kvec()
1559 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
1560 con->out_kvec_bytes, con->out_kvec_left, ret); in write_partial_kvec()
1584 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
1586 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
1588 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
1591 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
1619 ret = ceph_tcp_sendpage(con->sock, page, page_offset, in write_partial_message_data()
1632 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
1639 con_out_kvec_reset(con); in write_partial_message_data()
1640 prepare_write_message_footer(con); in write_partial_message_data()
1648 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
1652 dout("%s %p %d left\n", __func__, con, con->out_skip); in write_partial_skip()
1653 while (con->out_skip > 0) { in write_partial_skip()
1654 size_t size = min(con->out_skip, (int) PAGE_SIZE); in write_partial_skip()
1656 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true); in write_partial_skip()
1659 con->out_skip -= ret; in write_partial_skip()
1669 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
1671 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
1672 con->in_base_pos = 0; in prepare_read_banner()
1675 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
1677 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
1678 con->in_base_pos = 0; in prepare_read_connect()
1681 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
1683 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
1684 con->in_base_pos = 0; in prepare_read_ack()
1687 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
1689 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
1690 con->in_base_pos = 0; in prepare_read_seq()
1691 con->in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
1694 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
1696 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
1697 con->in_base_pos = 0; in prepare_read_tag()
1698 con->in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
1701 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
1703 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
1704 con->in_base_pos = 0; in prepare_read_keepalive_ack()
1710 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
1712 dout("prepare_read_message %p\n", con); in prepare_read_message()
1713 BUG_ON(con->in_msg != NULL); in prepare_read_message()
1714 con->in_base_pos = 0; in prepare_read_message()
1715 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
1720 static int read_partial(struct ceph_connection *con, in read_partial() argument
1723 while (con->in_base_pos < end) { in read_partial()
1724 int left = end - con->in_base_pos; in read_partial()
1726 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
1729 con->in_base_pos += ret; in read_partial()
1738 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
1744 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); in read_partial_banner()
1749 ret = read_partial(con, end, size, con->in_banner); in read_partial_banner()
1753 size = sizeof (con->actual_peer_addr); in read_partial_banner()
1755 ret = read_partial(con, end, size, &con->actual_peer_addr); in read_partial_banner()
1759 size = sizeof (con->peer_addr_for_me); in read_partial_banner()
1761 ret = read_partial(con, end, size, &con->peer_addr_for_me); in read_partial_banner()
1769 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
1775 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); in read_partial_connect()
1777 size = sizeof (con->in_reply); in read_partial_connect()
1779 ret = read_partial(con, end, size, &con->in_reply); in read_partial_connect()
1783 if (con->auth) { in read_partial_connect()
1784 size = le32_to_cpu(con->in_reply.authorizer_len); in read_partial_connect()
1785 if (size > con->auth->authorizer_reply_buf_len) { in read_partial_connect()
1787 con->auth->authorizer_reply_buf_len); in read_partial_connect()
1793 ret = read_partial(con, end, size, in read_partial_connect()
1794 con->auth->authorizer_reply_buf); in read_partial_connect()
1800 con, (int)con->in_reply.tag, in read_partial_connect()
1801 le32_to_cpu(con->in_reply.connect_seq), in read_partial_connect()
1802 le32_to_cpu(con->in_reply.global_seq)); in read_partial_connect()
1810 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
1812 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
1814 ceph_pr_addr(&con->peer_addr.in_addr)); in verify_hello()
1815 con->error_msg = "protocol error, bad banner"; in verify_hello()
2032 static int process_banner(struct ceph_connection *con) in process_banner() argument
2034 dout("process_banner on %p\n", con); in process_banner()
2036 if (verify_hello(con) < 0) in process_banner()
2039 ceph_decode_addr(&con->actual_peer_addr); in process_banner()
2040 ceph_decode_addr(&con->peer_addr_for_me); in process_banner()
2047 if (memcmp(&con->peer_addr, &con->actual_peer_addr, in process_banner()
2048 sizeof(con->peer_addr)) != 0 && in process_banner()
2049 !(addr_is_blank(&con->actual_peer_addr.in_addr) && in process_banner()
2050 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
2052 ceph_pr_addr(&con->peer_addr.in_addr), in process_banner()
2053 (int)le32_to_cpu(con->peer_addr.nonce), in process_banner()
2054 ceph_pr_addr(&con->actual_peer_addr.in_addr), in process_banner()
2055 (int)le32_to_cpu(con->actual_peer_addr.nonce)); in process_banner()
2056 con->error_msg = "wrong peer at address"; in process_banner()
2063 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { in process_banner()
2064 int port = addr_port(&con->msgr->inst.addr.in_addr); in process_banner()
2066 memcpy(&con->msgr->inst.addr.in_addr, in process_banner()
2067 &con->peer_addr_for_me.in_addr, in process_banner()
2068 sizeof(con->peer_addr_for_me.in_addr)); in process_banner()
2069 addr_set_port(&con->msgr->inst.addr.in_addr, port); in process_banner()
2070 encode_my_addr(con->msgr); in process_banner()
2072 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); in process_banner()
2078 static int process_connect(struct ceph_connection *con) in process_connect() argument
2080 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
2081 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
2082 u64 server_feat = le64_to_cpu(con->in_reply.features); in process_connect()
2085 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); in process_connect()
2087 if (con->auth) { in process_connect()
2095 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { in process_connect()
2096 ret = con->ops->add_authorizer_challenge( in process_connect()
2097 con, con->auth->authorizer_reply_buf, in process_connect()
2098 le32_to_cpu(con->in_reply.authorizer_len)); in process_connect()
2102 con_out_kvec_reset(con); in process_connect()
2103 __prepare_write_connect(con); in process_connect()
2104 prepare_read_connect(con); in process_connect()
2108 ret = con->ops->verify_authorizer_reply(con); in process_connect()
2110 con->error_msg = "bad authorize reply"; in process_connect()
2115 switch (con->in_reply.tag) { in process_connect()
2119 ENTITY_NAME(con->peer_name), in process_connect()
2120 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2122 con->error_msg = "missing required protocol features"; in process_connect()
2123 reset_connection(con); in process_connect()
2129 ENTITY_NAME(con->peer_name), in process_connect()
2130 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2131 le32_to_cpu(con->out_connect.protocol_version), in process_connect()
2132 le32_to_cpu(con->in_reply.protocol_version)); in process_connect()
2133 con->error_msg = "protocol version mismatch"; in process_connect()
2134 reset_connection(con); in process_connect()
2138 con->auth_retry++; in process_connect()
2139 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
2140 con->auth_retry); in process_connect()
2141 if (con->auth_retry == 2) { in process_connect()
2142 con->error_msg = "connect authorization failure"; in process_connect()
2145 con_out_kvec_reset(con); in process_connect()
2146 ret = prepare_write_connect(con); in process_connect()
2149 prepare_read_connect(con); in process_connect()
2161 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2163 ENTITY_NAME(con->peer_name), in process_connect()
2164 ceph_pr_addr(&con->peer_addr.in_addr)); in process_connect()
2165 reset_connection(con); in process_connect()
2166 con_out_kvec_reset(con); in process_connect()
2167 ret = prepare_write_connect(con); in process_connect()
2170 prepare_read_connect(con); in process_connect()
2173 mutex_unlock(&con->mutex); in process_connect()
2174 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); in process_connect()
2175 if (con->ops->peer_reset) in process_connect()
2176 con->ops->peer_reset(con); in process_connect()
2177 mutex_lock(&con->mutex); in process_connect()
2178 if (con->state != CON_STATE_NEGOTIATING) in process_connect()
2188 le32_to_cpu(con->out_connect.connect_seq), in process_connect()
2189 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2190 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); in process_connect()
2191 con_out_kvec_reset(con); in process_connect()
2192 ret = prepare_write_connect(con); in process_connect()
2195 prepare_read_connect(con); in process_connect()
2204 con->peer_global_seq, in process_connect()
2205 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2206 get_global_seq(con->msgr, in process_connect()
2207 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2208 con_out_kvec_reset(con); in process_connect()
2209 ret = prepare_write_connect(con); in process_connect()
2212 prepare_read_connect(con); in process_connect()
2220 ENTITY_NAME(con->peer_name), in process_connect()
2221 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2223 con->error_msg = "missing required protocol features"; in process_connect()
2224 reset_connection(con); in process_connect()
2228 WARN_ON(con->state != CON_STATE_NEGOTIATING); in process_connect()
2229 con->state = CON_STATE_OPEN; in process_connect()
2230 con->auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
2231 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); in process_connect()
2232 con->connect_seq++; in process_connect()
2233 con->peer_features = server_feat; in process_connect()
2235 con->peer_global_seq, in process_connect()
2236 le32_to_cpu(con->in_reply.connect_seq), in process_connect()
2237 con->connect_seq); in process_connect()
2238 WARN_ON(con->connect_seq != in process_connect()
2239 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2241 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
2242 con_flag_set(con, CON_FLAG_LOSSYTX); in process_connect()
2244 con->delay = 0; /* reset backoff memory */ in process_connect()
2246 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
2247 prepare_write_seq(con); in process_connect()
2248 prepare_read_seq(con); in process_connect()
2250 prepare_read_tag(con); in process_connect()
2261 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
2265 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
2275 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
2277 int size = sizeof (con->in_temp_ack); in read_partial_ack()
2280 return read_partial(con, end, size, &con->in_temp_ack); in read_partial_ack()
2286 static void process_ack(struct ceph_connection *con) in process_ack() argument
2289 u64 ack = le64_to_cpu(con->in_temp_ack); in process_ack()
2291 bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ); in process_ack()
2292 struct list_head *list = reconnect ? &con->out_queue : &con->out_sent; in process_ack()
2312 prepare_read_tag(con); in process_ack()
2316 static int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
2327 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_section()
2339 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
2341 struct ceph_msg *msg = con->in_msg; in read_partial_msg_data()
2343 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
2355 crc = con->in_data_crc; in read_partial_msg_data()
2363 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
2366 con->in_data_crc = crc; in read_partial_msg_data()
2376 con->in_data_crc = crc; in read_partial_msg_data()
2384 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2386 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
2388 struct ceph_msg *m = con->in_msg; in read_partial_message()
2393 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
2394 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
2398 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
2401 size = sizeof (con->in_hdr); in read_partial_message()
2403 ret = read_partial(con, end, size, &con->in_hdr); in read_partial_message()
2407 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
2408 if (cpu_to_le32(crc) != con->in_hdr.crc) { in read_partial_message()
2410 crc, con->in_hdr.crc); in read_partial_message()
2414 front_len = le32_to_cpu(con->in_hdr.front_len); in read_partial_message()
2417 middle_len = le32_to_cpu(con->in_hdr.middle_len); in read_partial_message()
2420 data_len = le32_to_cpu(con->in_hdr.data_len); in read_partial_message()
2425 seq = le64_to_cpu(con->in_hdr.seq); in read_partial_message()
2426 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
2428 ENTITY_NAME(con->peer_name), in read_partial_message()
2429 ceph_pr_addr(&con->peer_addr.in_addr), in read_partial_message()
2430 seq, con->in_seq + 1); in read_partial_message()
2431 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2432 sizeof_footer(con); in read_partial_message()
2433 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2435 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
2437 seq, con->in_seq + 1); in read_partial_message()
2438 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
2443 if (!con->in_msg) { in read_partial_message()
2446 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, in read_partial_message()
2448 ret = ceph_con_in_msg_alloc(con, &skip); in read_partial_message()
2452 BUG_ON(!con->in_msg ^ skip); in read_partial_message()
2456 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2457 sizeof_footer(con); in read_partial_message()
2458 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2459 con->in_seq++; in read_partial_message()
2463 BUG_ON(!con->in_msg); in read_partial_message()
2464 BUG_ON(con->in_msg->con != con); in read_partial_message()
2465 m = con->in_msg; in read_partial_message()
2473 prepare_message_data(con->in_msg, data_len); in read_partial_message()
2477 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
2478 &con->in_front_crc); in read_partial_message()
2484 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
2486 &con->in_middle_crc); in read_partial_message()
2493 ret = read_partial_msg_data(con); in read_partial_message()
2499 size = sizeof_footer(con); in read_partial_message()
2501 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
2515 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
2517 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
2520 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
2522 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
2527 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
2529 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
2533 if (need_sign && con->ops->check_message_signature && in read_partial_message()
2534 con->ops->check_message_signature(m)) { in read_partial_message()
2547 static void process_message(struct ceph_connection *con) in process_message() argument
2549 struct ceph_msg *msg = con->in_msg; in process_message()
2551 BUG_ON(con->in_msg->con != con); in process_message()
2552 con->in_msg = NULL; in process_message()
2555 if (con->peer_name.type == 0) in process_message()
2556 con->peer_name = msg->hdr.src; in process_message()
2558 con->in_seq++; in process_message()
2559 mutex_unlock(&con->mutex); in process_message()
2568 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in process_message()
2569 con->ops->dispatch(con, msg); in process_message()
2571 mutex_lock(&con->mutex); in process_message()
2574 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
2578 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
2581 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
2582 prepare_read_tag(con); in read_keepalive_ack()
2590 static int try_write(struct ceph_connection *con) in try_write() argument
2594 dout("try_write start %p state %lu\n", con, con->state); in try_write()
2595 if (con->state != CON_STATE_PREOPEN && in try_write()
2596 con->state != CON_STATE_CONNECTING && in try_write()
2597 con->state != CON_STATE_NEGOTIATING && in try_write()
2598 con->state != CON_STATE_OPEN) in try_write()
2602 if (con->state == CON_STATE_PREOPEN) { in try_write()
2603 BUG_ON(con->sock); in try_write()
2604 con->state = CON_STATE_CONNECTING; in try_write()
2606 con_out_kvec_reset(con); in try_write()
2607 prepare_write_banner(con); in try_write()
2608 prepare_read_banner(con); in try_write()
2610 BUG_ON(con->in_msg); in try_write()
2611 con->in_tag = CEPH_MSGR_TAG_READY; in try_write()
2613 con, con->state); in try_write()
2614 ret = ceph_tcp_connect(con); in try_write()
2616 con->error_msg = "connect error"; in try_write()
2622 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); in try_write()
2623 BUG_ON(!con->sock); in try_write()
2626 if (con->out_kvec_left) { in try_write()
2627 ret = write_partial_kvec(con); in try_write()
2631 if (con->out_skip) { in try_write()
2632 ret = write_partial_skip(con); in try_write()
2638 if (con->out_msg) { in try_write()
2639 if (con->out_msg_done) { in try_write()
2640 ceph_msg_put(con->out_msg); in try_write()
2641 con->out_msg = NULL; /* we're done with this one */ in try_write()
2645 ret = write_partial_message_data(con); in try_write()
2658 if (con->state == CON_STATE_OPEN) { in try_write()
2659 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { in try_write()
2660 prepare_write_keepalive(con); in try_write()
2664 if (!list_empty(&con->out_queue)) { in try_write()
2665 prepare_write_message(con); in try_write()
2668 if (con->in_seq > con->in_seq_acked) { in try_write()
2669 prepare_write_ack(con); in try_write()
2675 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in try_write()
2679 dout("try_write done on %p ret %d\n", con, ret); in try_write()
2686 static int try_read(struct ceph_connection *con) in try_read() argument
2691 dout("try_read start on %p state %lu\n", con, con->state); in try_read()
2692 if (con->state != CON_STATE_CONNECTING && in try_read()
2693 con->state != CON_STATE_NEGOTIATING && in try_read()
2694 con->state != CON_STATE_OPEN) in try_read()
2697 BUG_ON(!con->sock); in try_read()
2699 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, in try_read()
2700 con->in_base_pos); in try_read()
2702 if (con->state == CON_STATE_CONNECTING) { in try_read()
2704 ret = read_partial_banner(con); in try_read()
2707 ret = process_banner(con); in try_read()
2711 con->state = CON_STATE_NEGOTIATING; in try_read()
2718 ret = prepare_write_connect(con); in try_read()
2721 prepare_read_connect(con); in try_read()
2727 if (con->state == CON_STATE_NEGOTIATING) { in try_read()
2729 ret = read_partial_connect(con); in try_read()
2732 ret = process_connect(con); in try_read()
2738 WARN_ON(con->state != CON_STATE_OPEN); in try_read()
2740 if (con->in_base_pos < 0) { in try_read()
2744 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); in try_read()
2747 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); in try_read()
2748 con->in_base_pos += ret; in try_read()
2749 if (con->in_base_pos) in try_read()
2752 if (con->in_tag == CEPH_MSGR_TAG_READY) { in try_read()
2756 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); in try_read()
2759 dout("try_read got tag %d\n", (int)con->in_tag); in try_read()
2760 switch (con->in_tag) { in try_read()
2762 prepare_read_message(con); in try_read()
2765 prepare_read_ack(con); in try_read()
2768 prepare_read_keepalive_ack(con); in try_read()
2771 con_close_socket(con); in try_read()
2772 con->state = CON_STATE_CLOSED; in try_read()
2778 if (con->in_tag == CEPH_MSGR_TAG_MSG) { in try_read()
2779 ret = read_partial_message(con); in try_read()
2783 con->error_msg = "bad crc/signature"; in try_read()
2789 con->error_msg = "io error"; in try_read()
2794 if (con->in_tag == CEPH_MSGR_TAG_READY) in try_read()
2796 process_message(con); in try_read()
2797 if (con->state == CON_STATE_OPEN) in try_read()
2798 prepare_read_tag(con); in try_read()
2801 if (con->in_tag == CEPH_MSGR_TAG_ACK || in try_read()
2802 con->in_tag == CEPH_MSGR_TAG_SEQ) { in try_read()
2807 ret = read_partial_ack(con); in try_read()
2810 process_ack(con); in try_read()
2813 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in try_read()
2814 ret = read_keepalive_ack(con); in try_read()
2821 dout("try_read done on %p ret %d\n", con, ret); in try_read()
2825 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); in try_read()
2826 con->error_msg = "protocol error, garbage tag"; in try_read()
2837 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
2839 if (!con->ops->get(con)) { in queue_con_delay()
2840 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
2844 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
2845 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
2846 con->ops->put(con); in queue_con_delay()
2850 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
2854 static void queue_con(struct ceph_connection *con) in queue_con() argument
2856 (void) queue_con_delay(con, 0); in queue_con()
2859 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
2861 if (cancel_delayed_work(&con->work)) { in cancel_con()
2862 dout("%s %p\n", __func__, con); in cancel_con()
2863 con->ops->put(con); in cancel_con()
2867 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
2869 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) in con_sock_closed()
2874 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
2877 switch (con->state) { in con_sock_closed()
2886 __func__, con, con->state); in con_sock_closed()
2887 con->error_msg = "unrecognized con state"; in con_sock_closed()
2896 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
2900 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) in con_backoff()
2903 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); in con_backoff()
2906 con, con->delay); in con_backoff()
2908 con_flag_set(con, CON_FLAG_BACKOFF); in con_backoff()
2916 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
2918 dout("%s %p\n", __func__, con); in con_fault_finish()
2924 if (con->auth_retry) { in con_fault_finish()
2925 dout("auth_retry %d, invalidating\n", con->auth_retry); in con_fault_finish()
2926 if (con->ops->invalidate_authorizer) in con_fault_finish()
2927 con->ops->invalidate_authorizer(con); in con_fault_finish()
2928 con->auth_retry = 0; in con_fault_finish()
2931 if (con->ops->fault) in con_fault_finish()
2932 con->ops->fault(con); in con_fault_finish()
2940 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
2944 mutex_lock(&con->mutex); in ceph_con_workfn()
2948 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
2949 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
2952 if (con_backoff(con)) { in ceph_con_workfn()
2953 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
2956 if (con->state == CON_STATE_STANDBY) { in ceph_con_workfn()
2957 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
2960 if (con->state == CON_STATE_CLOSED) { in ceph_con_workfn()
2961 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
2962 BUG_ON(con->sock); in ceph_con_workfn()
2965 if (con->state == CON_STATE_PREOPEN) { in ceph_con_workfn()
2966 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
2967 BUG_ON(con->sock); in ceph_con_workfn()
2970 ret = try_read(con); in ceph_con_workfn()
2974 if (!con->error_msg) in ceph_con_workfn()
2975 con->error_msg = "socket error on read"; in ceph_con_workfn()
2980 ret = try_write(con); in ceph_con_workfn()
2984 if (!con->error_msg) in ceph_con_workfn()
2985 con->error_msg = "socket error on write"; in ceph_con_workfn()
2992 con_fault(con); in ceph_con_workfn()
2993 mutex_unlock(&con->mutex); in ceph_con_workfn()
2996 con_fault_finish(con); in ceph_con_workfn()
2998 con->ops->put(con); in ceph_con_workfn()
3005 static void con_fault(struct ceph_connection *con) in con_fault() argument
3008 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); in con_fault()
3010 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
3011 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); in con_fault()
3012 con->error_msg = NULL; in con_fault()
3014 WARN_ON(con->state != CON_STATE_CONNECTING && in con_fault()
3015 con->state != CON_STATE_NEGOTIATING && in con_fault()
3016 con->state != CON_STATE_OPEN); in con_fault()
3018 con_close_socket(con); in con_fault()
3020 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { in con_fault()
3022 con->state = CON_STATE_CLOSED; in con_fault()
3026 if (con->in_msg) { in con_fault()
3027 BUG_ON(con->in_msg->con != con); in con_fault()
3028 ceph_msg_put(con->in_msg); in con_fault()
3029 con->in_msg = NULL; in con_fault()
3033 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
3037 if (list_empty(&con->out_queue) && in con_fault()
3038 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { in con_fault()
3039 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
3040 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in con_fault()
3041 con->state = CON_STATE_STANDBY; in con_fault()
3044 con->state = CON_STATE_PREOPEN; in con_fault()
3045 if (con->delay == 0) in con_fault()
3046 con->delay = BASE_DELAY_INTERVAL; in con_fault()
3047 else if (con->delay < MAX_DELAY_INTERVAL) in con_fault()
3048 con->delay *= 2; in con_fault()
3049 con_flag_set(con, CON_FLAG_BACKOFF); in con_fault()
3050 queue_con(con); in con_fault()
3085 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
3087 if (msg->con) in msg_con_set()
3088 msg->con->ops->put(msg->con); in msg_con_set()
3090 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
3091 BUG_ON(msg->con != con); in msg_con_set()
3094 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
3097 if (con->state == CON_STATE_STANDBY) { in clear_standby()
3098 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
3099 con->state = CON_STATE_PREOPEN; in clear_standby()
3100 con->connect_seq++; in clear_standby()
3101 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); in clear_standby()
3102 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); in clear_standby()
3109 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
3112 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
3116 mutex_lock(&con->mutex); in ceph_con_send()
3118 if (con->state == CON_STATE_CLOSED) { in ceph_con_send()
3119 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
3121 mutex_unlock(&con->mutex); in ceph_con_send()
3125 msg_con_set(msg, con); in ceph_con_send()
3128 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
3130 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
3136 clear_standby(con); in ceph_con_send()
3137 mutex_unlock(&con->mutex); in ceph_con_send()
3141 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_send()
3142 queue_con(con); in ceph_con_send()
3151 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
3153 if (!con) { in ceph_msg_revoke()
3158 mutex_lock(&con->mutex); in ceph_msg_revoke()
3160 dout("%s %p msg %p - was on queue\n", __func__, con, msg); in ceph_msg_revoke()
3166 if (con->out_msg == msg) { in ceph_msg_revoke()
3167 BUG_ON(con->out_skip); in ceph_msg_revoke()
3169 if (con->out_msg_done) { in ceph_msg_revoke()
3170 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3173 con->out_skip += sizeof_footer(con); in ceph_msg_revoke()
3177 con->out_skip += msg->cursor.total_resid; in ceph_msg_revoke()
3179 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3180 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3183 __func__, con, msg, con->out_kvec_bytes, con->out_skip); in ceph_msg_revoke()
3185 con->out_msg = NULL; in ceph_msg_revoke()
3189 mutex_unlock(&con->mutex); in ceph_msg_revoke()
3197 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
3199 if (!con) { in ceph_msg_revoke_incoming()
3204 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
3205 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
3206 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); in ceph_msg_revoke_incoming()
3207 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); in ceph_msg_revoke_incoming()
3208 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); in ceph_msg_revoke_incoming()
3211 dout("%s %p msg %p revoked\n", __func__, con, msg); in ceph_msg_revoke_incoming()
3212 con->in_base_pos = con->in_base_pos - in ceph_msg_revoke_incoming()
3218 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
3219 con->in_msg = NULL; in ceph_msg_revoke_incoming()
3220 con->in_tag = CEPH_MSGR_TAG_READY; in ceph_msg_revoke_incoming()
3221 con->in_seq++; in ceph_msg_revoke_incoming()
3224 __func__, con, con->in_msg, msg); in ceph_msg_revoke_incoming()
3226 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
3232 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
3234 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
3235 mutex_lock(&con->mutex); in ceph_con_keepalive()
3236 clear_standby(con); in ceph_con_keepalive()
3237 mutex_unlock(&con->mutex); in ceph_con_keepalive()
3238 if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && in ceph_con_keepalive()
3239 con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_keepalive()
3240 queue_con(con); in ceph_con_keepalive()
3244 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
3248 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
3253 ts = timespec64_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
3413 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
3444 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) in ceph_con_in_msg_alloc() argument
3446 struct ceph_msg_header *hdr = &con->in_hdr; in ceph_con_in_msg_alloc()
3451 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
3452 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
3454 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
3455 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
3456 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
3457 if (con->state != CON_STATE_OPEN) { in ceph_con_in_msg_alloc()
3464 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
3465 con->in_msg = msg; in ceph_con_in_msg_alloc()
3475 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
3478 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); in ceph_con_in_msg_alloc()
3480 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
3481 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
3483 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
3484 con->in_msg = NULL; in ceph_con_in_msg_alloc()