Lines Matching refs:con
119 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) in con_flag_clear() argument
123 clear_bit(con_flag, &con->flags); in con_flag_clear()
126 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) in con_flag_set() argument
130 set_bit(con_flag, &con->flags); in con_flag_set()
133 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) in con_flag_test() argument
137 return test_bit(con_flag, &con->flags); in con_flag_test()
140 static bool con_flag_test_and_clear(struct ceph_connection *con, in con_flag_test_and_clear() argument
145 return test_and_clear_bit(con_flag, &con->flags); in con_flag_test_and_clear()
148 static bool con_flag_test_and_set(struct ceph_connection *con, in con_flag_test_and_set() argument
153 return test_and_set_bit(con_flag, &con->flags); in con_flag_test_and_set()
170 static void queue_con(struct ceph_connection *con);
171 static void cancel_con(struct ceph_connection *con);
173 static void con_fault(struct ceph_connection *con);
302 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
306 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
309 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
313 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
317 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
320 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
324 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
328 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
331 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
335 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
339 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
344 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
348 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
352 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
358 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
369 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
370 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
376 con, con->state); in ceph_sock_data_ready()
377 queue_con(con); in ceph_sock_data_ready()
384 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
393 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { in ceph_sock_write_space()
395 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
397 queue_con(con); in ceph_sock_write_space()
400 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
407 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
410 con, con->state, sk->sk_state); in ceph_sock_state_change()
418 con_sock_state_closing(con); in ceph_sock_state_change()
419 con_flag_set(con, CON_FLAG_SOCK_CLOSED); in ceph_sock_state_change()
420 queue_con(con); in ceph_sock_state_change()
424 con_sock_state_connected(con); in ceph_sock_state_change()
425 queue_con(con); in ceph_sock_state_change()
436 struct ceph_connection *con) in set_sock_callbacks() argument
439 sk->sk_user_data = con; in set_sock_callbacks()
453 static int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
455 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ in ceph_tcp_connect()
460 BUG_ON(con->sock); in ceph_tcp_connect()
464 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, in ceph_tcp_connect()
475 set_sock_callbacks(sock, con); in ceph_tcp_connect()
477 dout("connect %s\n", ceph_pr_addr(&con->peer_addr)); in ceph_tcp_connect()
479 con_sock_state_connecting(con); in ceph_tcp_connect()
484 ceph_pr_addr(&con->peer_addr), in ceph_tcp_connect()
488 ceph_pr_addr(&con->peer_addr), ret); in ceph_tcp_connect()
493 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { in ceph_tcp_connect()
503 con->sock = sock; in ceph_tcp_connect()
600 static int con_close_socket(struct ceph_connection *con) in con_close_socket() argument
604 dout("con_close_socket on %p sock %p\n", con, con->sock); in con_close_socket()
605 if (con->sock) { in con_close_socket()
606 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in con_close_socket()
607 sock_release(con->sock); in con_close_socket()
608 con->sock = NULL; in con_close_socket()
617 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); in con_close_socket()
619 con_sock_state_closed(con); in con_close_socket()
642 static void reset_connection(struct ceph_connection *con) in reset_connection() argument
646 dout("reset_connection %p\n", con); in reset_connection()
647 ceph_msg_remove_list(&con->out_queue); in reset_connection()
648 ceph_msg_remove_list(&con->out_sent); in reset_connection()
650 if (con->in_msg) { in reset_connection()
651 BUG_ON(con->in_msg->con != con); in reset_connection()
652 ceph_msg_put(con->in_msg); in reset_connection()
653 con->in_msg = NULL; in reset_connection()
656 con->connect_seq = 0; in reset_connection()
657 con->out_seq = 0; in reset_connection()
658 if (con->out_msg) { in reset_connection()
659 BUG_ON(con->out_msg->con != con); in reset_connection()
660 ceph_msg_put(con->out_msg); in reset_connection()
661 con->out_msg = NULL; in reset_connection()
663 con->in_seq = 0; in reset_connection()
664 con->in_seq_acked = 0; in reset_connection()
666 con->out_skip = 0; in reset_connection()
672 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
674 mutex_lock(&con->mutex); in ceph_con_close()
675 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); in ceph_con_close()
676 con->state = CON_STATE_CLOSED; in ceph_con_close()
678 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ in ceph_con_close()
679 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_close()
680 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in ceph_con_close()
681 con_flag_clear(con, CON_FLAG_BACKOFF); in ceph_con_close()
683 reset_connection(con); in ceph_con_close()
684 con->peer_global_seq = 0; in ceph_con_close()
685 cancel_con(con); in ceph_con_close()
686 con_close_socket(con); in ceph_con_close()
687 mutex_unlock(&con->mutex); in ceph_con_close()
694 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
698 mutex_lock(&con->mutex); in ceph_con_open()
699 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); in ceph_con_open()
701 WARN_ON(con->state != CON_STATE_CLOSED); in ceph_con_open()
702 con->state = CON_STATE_PREOPEN; in ceph_con_open()
704 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
705 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
707 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
708 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
709 mutex_unlock(&con->mutex); in ceph_con_open()
710 queue_con(con); in ceph_con_open()
717 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
719 return con->connect_seq > 0; in ceph_con_opened()
725 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
729 dout("con_init %p\n", con); in ceph_con_init()
730 memset(con, 0, sizeof(*con)); in ceph_con_init()
731 con->private = private; in ceph_con_init()
732 con->ops = ops; in ceph_con_init()
733 con->msgr = msgr; in ceph_con_init()
735 con_sock_state_init(con); in ceph_con_init()
737 mutex_init(&con->mutex); in ceph_con_init()
738 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
739 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
740 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
742 con->state = CON_STATE_CLOSED; in ceph_con_init()
763 static void con_out_kvec_reset(struct ceph_connection *con) in con_out_kvec_reset() argument
765 BUG_ON(con->out_skip); in con_out_kvec_reset()
767 con->out_kvec_left = 0; in con_out_kvec_reset()
768 con->out_kvec_bytes = 0; in con_out_kvec_reset()
769 con->out_kvec_cur = &con->out_kvec[0]; in con_out_kvec_reset()
772 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
775 int index = con->out_kvec_left; in con_out_kvec_add()
777 BUG_ON(con->out_skip); in con_out_kvec_add()
778 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); in con_out_kvec_add()
780 con->out_kvec[index].iov_len = size; in con_out_kvec_add()
781 con->out_kvec[index].iov_base = data; in con_out_kvec_add()
782 con->out_kvec_left++; in con_out_kvec_add()
783 con->out_kvec_bytes += size; in con_out_kvec_add()
791 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
793 int off = con->out_kvec_cur - con->out_kvec; in con_out_kvec_skip()
796 if (con->out_kvec_bytes > 0) { in con_out_kvec_skip()
797 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; in con_out_kvec_skip()
798 BUG_ON(con->out_kvec_bytes < skip); in con_out_kvec_skip()
799 BUG_ON(!con->out_kvec_left); in con_out_kvec_skip()
800 con->out_kvec_bytes -= skip; in con_out_kvec_skip()
801 con->out_kvec_left--; in con_out_kvec_skip()
1220 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
1222 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
1238 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
1240 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
1244 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
1245 con_out_kvec_add(con, sizeof_footer(con), &m->footer); in prepare_write_message_footer()
1246 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
1247 if (con->ops->sign_message) in prepare_write_message_footer()
1248 con->ops->sign_message(m); in prepare_write_message_footer()
1254 con->out_more = m->more_to_follow; in prepare_write_message_footer()
1255 con->out_msg_done = true; in prepare_write_message_footer()
1261 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
1266 con_out_kvec_reset(con); in prepare_write_message()
1267 con->out_msg_done = false; in prepare_write_message()
1271 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
1272 con->in_seq_acked = con->in_seq; in prepare_write_message()
1273 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
1274 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
1275 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_message()
1276 &con->out_temp_ack); in prepare_write_message()
1279 BUG_ON(list_empty(&con->out_queue)); in prepare_write_message()
1280 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in prepare_write_message()
1281 con->out_msg = m; in prepare_write_message()
1282 BUG_ON(m->con != con); in prepare_write_message()
1286 list_move_tail(&m->list_head, &con->out_sent); in prepare_write_message()
1293 m->hdr.seq = cpu_to_le64(++con->out_seq); in prepare_write_message()
1296 if (con->ops->reencode_message) in prepare_write_message()
1297 con->ops->reencode_message(m); in prepare_write_message()
1301 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
1308 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
1309 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); in prepare_write_message()
1310 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
1313 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
1318 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
1319 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); in prepare_write_message()
1323 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
1327 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
1329 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
1331 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
1332 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
1333 con->out_msg->footer.flags = 0; in prepare_write_message()
1336 con->out_msg->footer.data_crc = 0; in prepare_write_message()
1338 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
1339 con->out_more = 1; /* data + footer will follow */ in prepare_write_message()
1342 prepare_write_message_footer(con); in prepare_write_message()
1345 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_message()
1351 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
1353 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
1354 con->in_seq_acked, con->in_seq); in prepare_write_ack()
1355 con->in_seq_acked = con->in_seq; in prepare_write_ack()
1357 con_out_kvec_reset(con); in prepare_write_ack()
1359 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
1361 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
1362 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_ack()
1363 &con->out_temp_ack); in prepare_write_ack()
1365 con->out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
1366 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_ack()
1372 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
1374 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
1375 con->in_seq_acked, con->in_seq); in prepare_write_seq()
1376 con->in_seq_acked = con->in_seq; in prepare_write_seq()
1378 con_out_kvec_reset(con); in prepare_write_seq()
1380 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
1381 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_seq()
1382 &con->out_temp_ack); in prepare_write_seq()
1384 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_seq()
1390 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
1392 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
1393 con_out_kvec_reset(con); in prepare_write_keepalive()
1394 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
1398 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
1399 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); in prepare_write_keepalive()
1400 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), in prepare_write_keepalive()
1401 &con->out_temp_keepalive2); in prepare_write_keepalive()
1403 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
1405 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_keepalive()
1412 static int get_connect_authorizer(struct ceph_connection *con) in get_connect_authorizer() argument
1417 if (!con->ops->get_authorizer) { in get_connect_authorizer()
1418 con->auth = NULL; in get_connect_authorizer()
1419 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
1420 con->out_connect.authorizer_len = 0; in get_connect_authorizer()
1424 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); in get_connect_authorizer()
1428 con->auth = auth; in get_connect_authorizer()
1429 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in get_connect_authorizer()
1430 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); in get_connect_authorizer()
1437 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
1439 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
1440 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
1441 &con->msgr->my_enc_addr); in prepare_write_banner()
1443 con->out_more = 0; in prepare_write_banner()
1444 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_banner()
1447 static void __prepare_write_connect(struct ceph_connection *con) in __prepare_write_connect() argument
1449 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); in __prepare_write_connect()
1450 if (con->auth) in __prepare_write_connect()
1451 con_out_kvec_add(con, con->auth->authorizer_buf_len, in __prepare_write_connect()
1452 con->auth->authorizer_buf); in __prepare_write_connect()
1454 con->out_more = 0; in __prepare_write_connect()
1455 con_flag_set(con, CON_FLAG_WRITE_PENDING); in __prepare_write_connect()
1458 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
1460 unsigned int global_seq = get_global_seq(con->msgr, 0); in prepare_write_connect()
1464 switch (con->peer_name.type) { in prepare_write_connect()
1478 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
1479 con->connect_seq, global_seq, proto); in prepare_write_connect()
1481 con->out_connect.features = in prepare_write_connect()
1482 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
1483 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
1484 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); in prepare_write_connect()
1485 con->out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
1486 con->out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
1487 con->out_connect.flags = 0; in prepare_write_connect()
1489 ret = get_connect_authorizer(con); in prepare_write_connect()
1493 __prepare_write_connect(con); in prepare_write_connect()
1503 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
1507 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); in write_partial_kvec()
1508 while (con->out_kvec_bytes > 0) { in write_partial_kvec()
1509 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, in write_partial_kvec()
1510 con->out_kvec_left, con->out_kvec_bytes, in write_partial_kvec()
1511 con->out_more); in write_partial_kvec()
1514 con->out_kvec_bytes -= ret; in write_partial_kvec()
1515 if (con->out_kvec_bytes == 0) in write_partial_kvec()
1519 while (ret >= con->out_kvec_cur->iov_len) { in write_partial_kvec()
1520 BUG_ON(!con->out_kvec_left); in write_partial_kvec()
1521 ret -= con->out_kvec_cur->iov_len; in write_partial_kvec()
1522 con->out_kvec_cur++; in write_partial_kvec()
1523 con->out_kvec_left--; in write_partial_kvec()
1527 con->out_kvec_cur->iov_len -= ret; in write_partial_kvec()
1528 con->out_kvec_cur->iov_base += ret; in write_partial_kvec()
1531 con->out_kvec_left = 0; in write_partial_kvec()
1534 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
1535 con->out_kvec_bytes, con->out_kvec_left, ret); in write_partial_kvec()
1559 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
1561 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
1563 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
1567 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
1595 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, in write_partial_message_data()
1608 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
1615 con_out_kvec_reset(con); in write_partial_message_data()
1616 prepare_write_message_footer(con); in write_partial_message_data()
1624 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
1629 dout("%s %p %d left\n", __func__, con, con->out_skip); in write_partial_skip()
1630 while (con->out_skip > 0) { in write_partial_skip()
1631 size_t size = min(con->out_skip, (int) PAGE_SIZE); in write_partial_skip()
1633 if (size == con->out_skip) in write_partial_skip()
1635 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more); in write_partial_skip()
1638 con->out_skip -= ret; in write_partial_skip()
1648 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
1650 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
1651 con->in_base_pos = 0; in prepare_read_banner()
1654 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
1656 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
1657 con->in_base_pos = 0; in prepare_read_connect()
1660 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
1662 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
1663 con->in_base_pos = 0; in prepare_read_ack()
1666 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
1668 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
1669 con->in_base_pos = 0; in prepare_read_seq()
1670 con->in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
1673 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
1675 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
1676 con->in_base_pos = 0; in prepare_read_tag()
1677 con->in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
1680 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
1682 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
1683 con->in_base_pos = 0; in prepare_read_keepalive_ack()
1689 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
1691 dout("prepare_read_message %p\n", con); in prepare_read_message()
1692 BUG_ON(con->in_msg != NULL); in prepare_read_message()
1693 con->in_base_pos = 0; in prepare_read_message()
1694 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
1699 static int read_partial(struct ceph_connection *con, in read_partial() argument
1702 while (con->in_base_pos < end) { in read_partial()
1703 int left = end - con->in_base_pos; in read_partial()
1705 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
1708 con->in_base_pos += ret; in read_partial()
1717 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
1723 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); in read_partial_banner()
1728 ret = read_partial(con, end, size, con->in_banner); in read_partial_banner()
1732 size = sizeof (con->actual_peer_addr); in read_partial_banner()
1734 ret = read_partial(con, end, size, &con->actual_peer_addr); in read_partial_banner()
1737 ceph_decode_banner_addr(&con->actual_peer_addr); in read_partial_banner()
1739 size = sizeof (con->peer_addr_for_me); in read_partial_banner()
1741 ret = read_partial(con, end, size, &con->peer_addr_for_me); in read_partial_banner()
1744 ceph_decode_banner_addr(&con->peer_addr_for_me); in read_partial_banner()
1750 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
1756 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); in read_partial_connect()
1758 size = sizeof (con->in_reply); in read_partial_connect()
1760 ret = read_partial(con, end, size, &con->in_reply); in read_partial_connect()
1764 if (con->auth) { in read_partial_connect()
1765 size = le32_to_cpu(con->in_reply.authorizer_len); in read_partial_connect()
1766 if (size > con->auth->authorizer_reply_buf_len) { in read_partial_connect()
1768 con->auth->authorizer_reply_buf_len); in read_partial_connect()
1774 ret = read_partial(con, end, size, in read_partial_connect()
1775 con->auth->authorizer_reply_buf); in read_partial_connect()
1781 con, (int)con->in_reply.tag, in read_partial_connect()
1782 le32_to_cpu(con->in_reply.connect_seq), in read_partial_connect()
1783 le32_to_cpu(con->in_reply.global_seq)); in read_partial_connect()
1791 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
1793 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
1795 ceph_pr_addr(&con->peer_addr)); in verify_hello()
1796 con->error_msg = "protocol error, bad banner"; in verify_hello()
2012 static int process_banner(struct ceph_connection *con) in process_banner() argument
2014 dout("process_banner on %p\n", con); in process_banner()
2016 if (verify_hello(con) < 0) in process_banner()
2024 if (memcmp(&con->peer_addr, &con->actual_peer_addr, in process_banner()
2025 sizeof(con->peer_addr)) != 0 && in process_banner()
2026 !(addr_is_blank(&con->actual_peer_addr) && in process_banner()
2027 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
2029 ceph_pr_addr(&con->peer_addr), in process_banner()
2030 (int)le32_to_cpu(con->peer_addr.nonce), in process_banner()
2031 ceph_pr_addr(&con->actual_peer_addr), in process_banner()
2032 (int)le32_to_cpu(con->actual_peer_addr.nonce)); in process_banner()
2033 con->error_msg = "wrong peer at address"; in process_banner()
2040 if (addr_is_blank(&con->msgr->inst.addr)) { in process_banner()
2041 int port = addr_port(&con->msgr->inst.addr); in process_banner()
2043 memcpy(&con->msgr->inst.addr.in_addr, in process_banner()
2044 &con->peer_addr_for_me.in_addr, in process_banner()
2045 sizeof(con->peer_addr_for_me.in_addr)); in process_banner()
2046 addr_set_port(&con->msgr->inst.addr, port); in process_banner()
2047 encode_my_addr(con->msgr); in process_banner()
2049 ceph_pr_addr(&con->msgr->inst.addr)); in process_banner()
2055 static int process_connect(struct ceph_connection *con) in process_connect() argument
2057 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
2058 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
2059 u64 server_feat = le64_to_cpu(con->in_reply.features); in process_connect()
2062 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); in process_connect()
2064 if (con->auth) { in process_connect()
2065 int len = le32_to_cpu(con->in_reply.authorizer_len); in process_connect()
2074 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { in process_connect()
2075 ret = con->ops->add_authorizer_challenge( in process_connect()
2076 con, con->auth->authorizer_reply_buf, len); in process_connect()
2080 con_out_kvec_reset(con); in process_connect()
2081 __prepare_write_connect(con); in process_connect()
2082 prepare_read_connect(con); in process_connect()
2087 ret = con->ops->verify_authorizer_reply(con); in process_connect()
2089 con->error_msg = "bad authorize reply"; in process_connect()
2095 switch (con->in_reply.tag) { in process_connect()
2099 ENTITY_NAME(con->peer_name), in process_connect()
2100 ceph_pr_addr(&con->peer_addr), in process_connect()
2102 con->error_msg = "missing required protocol features"; in process_connect()
2103 reset_connection(con); in process_connect()
2109 ENTITY_NAME(con->peer_name), in process_connect()
2110 ceph_pr_addr(&con->peer_addr), in process_connect()
2111 le32_to_cpu(con->out_connect.protocol_version), in process_connect()
2112 le32_to_cpu(con->in_reply.protocol_version)); in process_connect()
2113 con->error_msg = "protocol version mismatch"; in process_connect()
2114 reset_connection(con); in process_connect()
2118 con->auth_retry++; in process_connect()
2119 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
2120 con->auth_retry); in process_connect()
2121 if (con->auth_retry == 2) { in process_connect()
2122 con->error_msg = "connect authorization failure"; in process_connect()
2125 con_out_kvec_reset(con); in process_connect()
2126 ret = prepare_write_connect(con); in process_connect()
2129 prepare_read_connect(con); in process_connect()
2141 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2143 ENTITY_NAME(con->peer_name), in process_connect()
2144 ceph_pr_addr(&con->peer_addr)); in process_connect()
2145 reset_connection(con); in process_connect()
2146 con_out_kvec_reset(con); in process_connect()
2147 ret = prepare_write_connect(con); in process_connect()
2150 prepare_read_connect(con); in process_connect()
2153 mutex_unlock(&con->mutex); in process_connect()
2154 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); in process_connect()
2155 if (con->ops->peer_reset) in process_connect()
2156 con->ops->peer_reset(con); in process_connect()
2157 mutex_lock(&con->mutex); in process_connect()
2158 if (con->state != CON_STATE_NEGOTIATING) in process_connect()
2168 le32_to_cpu(con->out_connect.connect_seq), in process_connect()
2169 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2170 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); in process_connect()
2171 con_out_kvec_reset(con); in process_connect()
2172 ret = prepare_write_connect(con); in process_connect()
2175 prepare_read_connect(con); in process_connect()
2184 con->peer_global_seq, in process_connect()
2185 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2186 get_global_seq(con->msgr, in process_connect()
2187 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2188 con_out_kvec_reset(con); in process_connect()
2189 ret = prepare_write_connect(con); in process_connect()
2192 prepare_read_connect(con); in process_connect()
2200 ENTITY_NAME(con->peer_name), in process_connect()
2201 ceph_pr_addr(&con->peer_addr), in process_connect()
2203 con->error_msg = "missing required protocol features"; in process_connect()
2204 reset_connection(con); in process_connect()
2208 WARN_ON(con->state != CON_STATE_NEGOTIATING); in process_connect()
2209 con->state = CON_STATE_OPEN; in process_connect()
2210 con->auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
2211 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); in process_connect()
2212 con->connect_seq++; in process_connect()
2213 con->peer_features = server_feat; in process_connect()
2215 con->peer_global_seq, in process_connect()
2216 le32_to_cpu(con->in_reply.connect_seq), in process_connect()
2217 con->connect_seq); in process_connect()
2218 WARN_ON(con->connect_seq != in process_connect()
2219 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2221 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
2222 con_flag_set(con, CON_FLAG_LOSSYTX); in process_connect()
2224 con->delay = 0; /* reset backoff memory */ in process_connect()
2226 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
2227 prepare_write_seq(con); in process_connect()
2228 prepare_read_seq(con); in process_connect()
2230 prepare_read_tag(con); in process_connect()
2241 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
2245 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
2255 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
2257 int size = sizeof (con->in_temp_ack); in read_partial_ack()
2260 return read_partial(con, end, size, &con->in_temp_ack); in read_partial_ack()
2266 static void process_ack(struct ceph_connection *con) in process_ack() argument
2269 u64 ack = le64_to_cpu(con->in_temp_ack); in process_ack()
2271 bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ); in process_ack()
2272 struct list_head *list = reconnect ? &con->out_queue : &con->out_sent; in process_ack()
2292 prepare_read_tag(con); in process_ack()
2296 static int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
2307 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_section()
2319 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
2321 struct ceph_msg *msg = con->in_msg; in read_partial_msg_data()
2323 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
2334 crc = con->in_data_crc; in read_partial_msg_data()
2342 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
2345 con->in_data_crc = crc; in read_partial_msg_data()
2355 con->in_data_crc = crc; in read_partial_msg_data()
2363 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2365 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
2367 struct ceph_msg *m = con->in_msg; in read_partial_message()
2372 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
2373 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
2377 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
2380 size = sizeof (con->in_hdr); in read_partial_message()
2382 ret = read_partial(con, end, size, &con->in_hdr); in read_partial_message()
2386 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
2387 if (cpu_to_le32(crc) != con->in_hdr.crc) { in read_partial_message()
2389 crc, con->in_hdr.crc); in read_partial_message()
2393 front_len = le32_to_cpu(con->in_hdr.front_len); in read_partial_message()
2396 middle_len = le32_to_cpu(con->in_hdr.middle_len); in read_partial_message()
2399 data_len = le32_to_cpu(con->in_hdr.data_len); in read_partial_message()
2404 seq = le64_to_cpu(con->in_hdr.seq); in read_partial_message()
2405 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
2407 ENTITY_NAME(con->peer_name), in read_partial_message()
2408 ceph_pr_addr(&con->peer_addr), in read_partial_message()
2409 seq, con->in_seq + 1); in read_partial_message()
2410 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2411 sizeof_footer(con); in read_partial_message()
2412 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2414 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
2416 seq, con->in_seq + 1); in read_partial_message()
2417 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
2422 if (!con->in_msg) { in read_partial_message()
2425 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, in read_partial_message()
2427 ret = ceph_con_in_msg_alloc(con, &skip); in read_partial_message()
2431 BUG_ON(!con->in_msg ^ skip); in read_partial_message()
2435 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2436 sizeof_footer(con); in read_partial_message()
2437 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2438 con->in_seq++; in read_partial_message()
2442 BUG_ON(!con->in_msg); in read_partial_message()
2443 BUG_ON(con->in_msg->con != con); in read_partial_message()
2444 m = con->in_msg; in read_partial_message()
2452 prepare_message_data(con->in_msg, data_len); in read_partial_message()
2456 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
2457 &con->in_front_crc); in read_partial_message()
2463 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
2465 &con->in_middle_crc); in read_partial_message()
2472 ret = read_partial_msg_data(con); in read_partial_message()
2478 size = sizeof_footer(con); in read_partial_message()
2480 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
2494 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
2496 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
2499 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
2501 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
2506 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
2508 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
2512 if (need_sign && con->ops->check_message_signature && in read_partial_message()
2513 con->ops->check_message_signature(m)) { in read_partial_message()
2526 static void process_message(struct ceph_connection *con) in process_message() argument
2528 struct ceph_msg *msg = con->in_msg; in process_message()
2530 BUG_ON(con->in_msg->con != con); in process_message()
2531 con->in_msg = NULL; in process_message()
2534 if (con->peer_name.type == 0) in process_message()
2535 con->peer_name = msg->hdr.src; in process_message()
2537 con->in_seq++; in process_message()
2538 mutex_unlock(&con->mutex); in process_message()
2547 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in process_message()
2548 con->ops->dispatch(con, msg); in process_message()
2550 mutex_lock(&con->mutex); in process_message()
2553 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
2557 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
2560 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
2561 prepare_read_tag(con); in read_keepalive_ack()
2569 static int try_write(struct ceph_connection *con) in try_write() argument
2573 dout("try_write start %p state %lu\n", con, con->state); in try_write()
2574 if (con->state != CON_STATE_PREOPEN && in try_write()
2575 con->state != CON_STATE_CONNECTING && in try_write()
2576 con->state != CON_STATE_NEGOTIATING && in try_write()
2577 con->state != CON_STATE_OPEN) in try_write()
2581 if (con->state == CON_STATE_PREOPEN) { in try_write()
2582 BUG_ON(con->sock); in try_write()
2583 con->state = CON_STATE_CONNECTING; in try_write()
2585 con_out_kvec_reset(con); in try_write()
2586 prepare_write_banner(con); in try_write()
2587 prepare_read_banner(con); in try_write()
2589 BUG_ON(con->in_msg); in try_write()
2590 con->in_tag = CEPH_MSGR_TAG_READY; in try_write()
2592 con, con->state); in try_write()
2593 ret = ceph_tcp_connect(con); in try_write()
2595 con->error_msg = "connect error"; in try_write()
2601 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); in try_write()
2602 BUG_ON(!con->sock); in try_write()
2605 if (con->out_kvec_left) { in try_write()
2606 ret = write_partial_kvec(con); in try_write()
2610 if (con->out_skip) { in try_write()
2611 ret = write_partial_skip(con); in try_write()
2617 if (con->out_msg) { in try_write()
2618 if (con->out_msg_done) { in try_write()
2619 ceph_msg_put(con->out_msg); in try_write()
2620 con->out_msg = NULL; /* we're done with this one */ in try_write()
2624 ret = write_partial_message_data(con); in try_write()
2637 if (con->state == CON_STATE_OPEN) { in try_write()
2638 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { in try_write()
2639 prepare_write_keepalive(con); in try_write()
2643 if (!list_empty(&con->out_queue)) { in try_write()
2644 prepare_write_message(con); in try_write()
2647 if (con->in_seq > con->in_seq_acked) { in try_write()
2648 prepare_write_ack(con); in try_write()
2654 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in try_write()
2658 dout("try_write done on %p ret %d\n", con, ret); in try_write()
2665 static int try_read(struct ceph_connection *con) in try_read() argument
2670 dout("try_read start on %p state %lu\n", con, con->state); in try_read()
2671 if (con->state != CON_STATE_CONNECTING && in try_read()
2672 con->state != CON_STATE_NEGOTIATING && in try_read()
2673 con->state != CON_STATE_OPEN) in try_read()
2676 BUG_ON(!con->sock); in try_read()
2678 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, in try_read()
2679 con->in_base_pos); in try_read()
2681 if (con->state == CON_STATE_CONNECTING) { in try_read()
2683 ret = read_partial_banner(con); in try_read()
2686 ret = process_banner(con); in try_read()
2690 con->state = CON_STATE_NEGOTIATING; in try_read()
2697 ret = prepare_write_connect(con); in try_read()
2700 prepare_read_connect(con); in try_read()
2706 if (con->state == CON_STATE_NEGOTIATING) { in try_read()
2708 ret = read_partial_connect(con); in try_read()
2711 ret = process_connect(con); in try_read()
2717 WARN_ON(con->state != CON_STATE_OPEN); in try_read()
2719 if (con->in_base_pos < 0) { in try_read()
2723 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); in try_read()
2726 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); in try_read()
2727 con->in_base_pos += ret; in try_read()
2728 if (con->in_base_pos) in try_read()
2731 if (con->in_tag == CEPH_MSGR_TAG_READY) { in try_read()
2735 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); in try_read()
2738 dout("try_read got tag %d\n", (int)con->in_tag); in try_read()
2739 switch (con->in_tag) { in try_read()
2741 prepare_read_message(con); in try_read()
2744 prepare_read_ack(con); in try_read()
2747 prepare_read_keepalive_ack(con); in try_read()
2750 con_close_socket(con); in try_read()
2751 con->state = CON_STATE_CLOSED; in try_read()
2757 if (con->in_tag == CEPH_MSGR_TAG_MSG) { in try_read()
2758 ret = read_partial_message(con); in try_read()
2762 con->error_msg = "bad crc/signature"; in try_read()
2768 con->error_msg = "io error"; in try_read()
2773 if (con->in_tag == CEPH_MSGR_TAG_READY) in try_read()
2775 process_message(con); in try_read()
2776 if (con->state == CON_STATE_OPEN) in try_read()
2777 prepare_read_tag(con); in try_read()
2780 if (con->in_tag == CEPH_MSGR_TAG_ACK || in try_read()
2781 con->in_tag == CEPH_MSGR_TAG_SEQ) { in try_read()
2786 ret = read_partial_ack(con); in try_read()
2789 process_ack(con); in try_read()
2792 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in try_read()
2793 ret = read_keepalive_ack(con); in try_read()
2800 dout("try_read done on %p ret %d\n", con, ret); in try_read()
2804 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); in try_read()
2805 con->error_msg = "protocol error, garbage tag"; in try_read()
2816 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
2818 if (!con->ops->get(con)) { in queue_con_delay()
2819 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
2823 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
2824 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
2825 con->ops->put(con); in queue_con_delay()
2829 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
2833 static void queue_con(struct ceph_connection *con) in queue_con() argument
2835 (void) queue_con_delay(con, 0); in queue_con()
2838 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
2840 if (cancel_delayed_work(&con->work)) { in cancel_con()
2841 dout("%s %p\n", __func__, con); in cancel_con()
2842 con->ops->put(con); in cancel_con()
2846 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
2848 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) in con_sock_closed()
2853 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
2856 switch (con->state) { in con_sock_closed()
2865 __func__, con, con->state); in con_sock_closed()
2866 con->error_msg = "unrecognized con state"; in con_sock_closed()
2875 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
2879 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) in con_backoff()
2882 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); in con_backoff()
2885 con, con->delay); in con_backoff()
2887 con_flag_set(con, CON_FLAG_BACKOFF); in con_backoff()
2895 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
2897 dout("%s %p\n", __func__, con); in con_fault_finish()
2903 if (con->auth_retry) { in con_fault_finish()
2904 dout("auth_retry %d, invalidating\n", con->auth_retry); in con_fault_finish()
2905 if (con->ops->invalidate_authorizer) in con_fault_finish()
2906 con->ops->invalidate_authorizer(con); in con_fault_finish()
2907 con->auth_retry = 0; in con_fault_finish()
2910 if (con->ops->fault) in con_fault_finish()
2911 con->ops->fault(con); in con_fault_finish()
2919 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
2923 mutex_lock(&con->mutex); in ceph_con_workfn()
2927 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
2928 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
2931 if (con_backoff(con)) { in ceph_con_workfn()
2932 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
2935 if (con->state == CON_STATE_STANDBY) { in ceph_con_workfn()
2936 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
2939 if (con->state == CON_STATE_CLOSED) { in ceph_con_workfn()
2940 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
2941 BUG_ON(con->sock); in ceph_con_workfn()
2944 if (con->state == CON_STATE_PREOPEN) { in ceph_con_workfn()
2945 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
2946 BUG_ON(con->sock); in ceph_con_workfn()
2949 ret = try_read(con); in ceph_con_workfn()
2953 if (!con->error_msg) in ceph_con_workfn()
2954 con->error_msg = "socket error on read"; in ceph_con_workfn()
2959 ret = try_write(con); in ceph_con_workfn()
2963 if (!con->error_msg) in ceph_con_workfn()
2964 con->error_msg = "socket error on write"; in ceph_con_workfn()
2971 con_fault(con); in ceph_con_workfn()
2972 mutex_unlock(&con->mutex); in ceph_con_workfn()
2975 con_fault_finish(con); in ceph_con_workfn()
2977 con->ops->put(con); in ceph_con_workfn()
2984 static void con_fault(struct ceph_connection *con) in con_fault() argument
2987 con, con->state, ceph_pr_addr(&con->peer_addr)); in con_fault()
2989 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
2990 ceph_pr_addr(&con->peer_addr), con->error_msg); in con_fault()
2991 con->error_msg = NULL; in con_fault()
2993 WARN_ON(con->state != CON_STATE_CONNECTING && in con_fault()
2994 con->state != CON_STATE_NEGOTIATING && in con_fault()
2995 con->state != CON_STATE_OPEN); in con_fault()
2997 con_close_socket(con); in con_fault()
2999 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { in con_fault()
3001 con->state = CON_STATE_CLOSED; in con_fault()
3005 if (con->in_msg) { in con_fault()
3006 BUG_ON(con->in_msg->con != con); in con_fault()
3007 ceph_msg_put(con->in_msg); in con_fault()
3008 con->in_msg = NULL; in con_fault()
3012 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
3016 if (list_empty(&con->out_queue) && in con_fault()
3017 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { in con_fault()
3018 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
3019 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in con_fault()
3020 con->state = CON_STATE_STANDBY; in con_fault()
3023 con->state = CON_STATE_PREOPEN; in con_fault()
3024 if (con->delay == 0) in con_fault()
3025 con->delay = BASE_DELAY_INTERVAL; in con_fault()
3026 else if (con->delay < MAX_DELAY_INTERVAL) in con_fault()
3027 con->delay *= 2; in con_fault()
3028 con_flag_set(con, CON_FLAG_BACKOFF); in con_fault()
3029 queue_con(con); in con_fault()
3070 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
3072 if (msg->con) in msg_con_set()
3073 msg->con->ops->put(msg->con); in msg_con_set()
3075 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
3076 BUG_ON(msg->con != con); in msg_con_set()
3079 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
3082 if (con->state == CON_STATE_STANDBY) { in clear_standby()
3083 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
3084 con->state = CON_STATE_PREOPEN; in clear_standby()
3085 con->connect_seq++; in clear_standby()
3086 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); in clear_standby()
3087 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); in clear_standby()
3094 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
3097 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
3101 mutex_lock(&con->mutex); in ceph_con_send()
3103 if (con->state == CON_STATE_CLOSED) { in ceph_con_send()
3104 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
3106 mutex_unlock(&con->mutex); in ceph_con_send()
3110 msg_con_set(msg, con); in ceph_con_send()
3113 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
3115 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
3121 clear_standby(con); in ceph_con_send()
3122 mutex_unlock(&con->mutex); in ceph_con_send()
3126 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_send()
3127 queue_con(con); in ceph_con_send()
3136 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
3138 if (!con) { in ceph_msg_revoke()
3143 mutex_lock(&con->mutex); in ceph_msg_revoke()
3145 dout("%s %p msg %p - was on queue\n", __func__, con, msg); in ceph_msg_revoke()
3151 if (con->out_msg == msg) { in ceph_msg_revoke()
3152 BUG_ON(con->out_skip); in ceph_msg_revoke()
3154 if (con->out_msg_done) { in ceph_msg_revoke()
3155 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3158 con->out_skip += sizeof_footer(con); in ceph_msg_revoke()
3162 con->out_skip += msg->cursor.total_resid; in ceph_msg_revoke()
3164 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3165 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3168 __func__, con, msg, con->out_kvec_bytes, con->out_skip); in ceph_msg_revoke()
3170 con->out_msg = NULL; in ceph_msg_revoke()
3174 mutex_unlock(&con->mutex); in ceph_msg_revoke()
3182 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
3184 if (!con) { in ceph_msg_revoke_incoming()
3189 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
3190 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
3191 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); in ceph_msg_revoke_incoming()
3192 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); in ceph_msg_revoke_incoming()
3193 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); in ceph_msg_revoke_incoming()
3196 dout("%s %p msg %p revoked\n", __func__, con, msg); in ceph_msg_revoke_incoming()
3197 con->in_base_pos = con->in_base_pos - in ceph_msg_revoke_incoming()
3203 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
3204 con->in_msg = NULL; in ceph_msg_revoke_incoming()
3205 con->in_tag = CEPH_MSGR_TAG_READY; in ceph_msg_revoke_incoming()
3206 con->in_seq++; in ceph_msg_revoke_incoming()
3209 __func__, con, con->in_msg, msg); in ceph_msg_revoke_incoming()
3211 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
3217 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
3219 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
3220 mutex_lock(&con->mutex); in ceph_con_keepalive()
3221 clear_standby(con); in ceph_con_keepalive()
3222 con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_keepalive()
3223 mutex_unlock(&con->mutex); in ceph_con_keepalive()
3225 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_keepalive()
3226 queue_con(con); in ceph_con_keepalive()
3230 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
3234 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
3239 ts = timespec64_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
3395 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
3426 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) in ceph_con_in_msg_alloc() argument
3428 struct ceph_msg_header *hdr = &con->in_hdr; in ceph_con_in_msg_alloc()
3433 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
3434 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
3436 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
3437 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
3438 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
3439 if (con->state != CON_STATE_OPEN) { in ceph_con_in_msg_alloc()
3446 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
3447 con->in_msg = msg; in ceph_con_in_msg_alloc()
3457 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
3460 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); in ceph_con_in_msg_alloc()
3462 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
3463 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
3465 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
3466 con->in_msg = NULL; in ceph_con_in_msg_alloc()