Lines Matching +full:multi +full:- +full:socket
1 // SPDX-License-Identifier: GPL-2.0-only
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
15 * This is the "low-level" comms layer.
21 * simply 32 bit numbers to the locking module - if they need to
25 * whatever it needs for inter-node communication.
29 * up to the mid-level comms layer (which understands the
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
71 struct socket *sock; /* NULL if not connected */
103 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
106 struct socket *sock;
110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
153 int (*connect)(struct connection *con, struct socket *sock,
155 void (*sockopts)(struct socket *sock);
156 int (*bind)(struct socket *sock);
158 void (*listen_sockopts)(struct socket *sock);
159 int (*listen_bind)(struct socket *sock);
198 INIT_LIST_HEAD(&entry->msgs); in writequeue_entry_ctor()
217 if (list_empty(&con->writequeue)) in con_next_wq()
220 e = list_first_entry(&con->writequeue, struct writequeue_entry, in con_next_wq()
225 if (e->users || e->len == 0) in con_next_wq()
236 if (con->nodeid == nodeid) in __find_con()
245 return atomic_read(&con->writequeue_cnt); in tcp_eof_condition()
250 con->rx_buflen = dlm_config.ci_buffer_size; in dlm_con_init()
251 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); in dlm_con_init()
252 if (!con->rx_buf) in dlm_con_init()
253 return -ENOMEM; in dlm_con_init()
255 con->nodeid = nodeid; in dlm_con_init()
256 mutex_init(&con->sock_mutex); in dlm_con_init()
257 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
258 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
259 atomic_set(&con->writequeue_cnt, 0); in dlm_con_init()
260 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
261 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
262 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
301 kfree(con->rx_buf); in nodeid2con()
306 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
329 if (na->nodeid == nodeid) in find_node_addr()
338 switch (x->ss_family) { in addr_compare()
342 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) in addr_compare()
344 if (sinx->sin_port != siny->sin_port) in addr_compare()
351 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) in addr_compare()
353 if (sinx->sin6_port != siny->sin6_port) in addr_compare()
371 return -1; in nodeid_to_addr()
375 if (na && na->addr_count) { in nodeid_to_addr()
376 memcpy(&sas, na->addr[na->curr_addr_index], in nodeid_to_addr()
380 na->curr_addr_index++; in nodeid_to_addr()
381 if (na->curr_addr_index == na->addr_count) in nodeid_to_addr()
382 na->curr_addr_index = 0; in nodeid_to_addr()
388 return -EEXIST; in nodeid_to_addr()
390 if (!na->addr_count) in nodeid_to_addr()
391 return -ENOENT; in nodeid_to_addr()
393 *mark = na->mark; in nodeid_to_addr()
401 if (dlm_local_addr[0]->ss_family == AF_INET) { in nodeid_to_addr()
404 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; in nodeid_to_addr()
408 ret6->sin6_addr = in6->sin6_addr; in nodeid_to_addr()
418 int rv = -EEXIST; in addr_to_nodeid()
423 if (!na->addr_count) in addr_to_nodeid()
426 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { in addr_to_nodeid()
427 if (addr_compare(na->addr[addr_i], addr)) { in addr_to_nodeid()
428 *nodeid = na->nodeid; in addr_to_nodeid()
429 *mark = na->mark; in addr_to_nodeid()
446 for (i = 0; i < na->addr_count; i++) { in dlm_lowcomms_na_has_addr()
447 if (addr_compare(na->addr[i], addr)) in dlm_lowcomms_na_has_addr()
462 return -ENOMEM; in dlm_lowcomms_addr()
467 return -ENOMEM; in dlm_lowcomms_addr()
475 new_node->nodeid = nodeid; in dlm_lowcomms_addr()
476 new_node->addr[0] = new_addr; in dlm_lowcomms_addr()
477 new_node->addr_count = 1; in dlm_lowcomms_addr()
478 new_node->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
479 list_add(&new_node->list, &dlm_node_addrs); in dlm_lowcomms_addr()
489 return -EEXIST; in dlm_lowcomms_addr()
492 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
496 return -ENOSPC; in dlm_lowcomms_addr()
499 na->addr[na->addr_count++] = new_addr; in dlm_lowcomms_addr()
505 /* Data available on socket or listen socket received a connect */
511 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) in lowcomms_data_ready()
512 queue_work(recv_workqueue, &con->rwork); in lowcomms_data_ready()
531 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { in lowcomms_write_space()
532 log_print("connected to node %d", con->nodeid); in lowcomms_write_space()
533 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
537 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
539 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
540 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
541 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
544 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
549 if (test_bit(CF_CLOSE, &con->flags)) in lowcomms_connect_sock()
551 queue_work(send_workqueue, &con->swork); in lowcomms_connect_sock()
559 * doesn't switch socket state when entering shutdown, so we in lowcomms_state_change()
562 if (sk->sk_shutdown) { in lowcomms_state_change()
563 if (sk->sk_shutdown == RCV_SHUTDOWN) in lowcomms_state_change()
565 } else if (sk->sk_state == TCP_ESTABLISHED) { in lowcomms_state_change()
582 return -ENOMEM; in dlm_lowcomms_connect_node()
599 return -ENOENT; in dlm_lowcomms_nodes_set_mark()
602 na->mark = mark; in dlm_lowcomms_nodes_set_mark()
621 switch (sk->sk_family) { in lowcomms_error_report()
623 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
626 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
627 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
628 sk->sk_err_soft); in lowcomms_error_report()
632 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
635 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
636 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
637 sk->sk_err_soft); in lowcomms_error_report()
641 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
642 "invalid socket family %d set, " in lowcomms_error_report()
644 sk->sk_family, sk->sk_err, sk->sk_err_soft); in lowcomms_error_report()
649 if (test_bit(CF_IS_OTHERCON, &con->flags)) in lowcomms_error_report()
650 con = con->sendcon; in lowcomms_error_report()
652 switch (sk->sk_err) { in lowcomms_error_report()
654 set_bit(CF_DELAY_CONNECT, &con->flags); in lowcomms_error_report()
660 if (!test_and_set_bit(CF_RECONNECT, &con->flags)) in lowcomms_error_report()
661 queue_work(send_workqueue, &con->swork); in lowcomms_error_report()
669 static void save_listen_callbacks(struct socket *sock) in save_listen_callbacks()
671 struct sock *sk = sock->sk; in save_listen_callbacks()
673 listen_sock.sk_data_ready = sk->sk_data_ready; in save_listen_callbacks()
674 listen_sock.sk_state_change = sk->sk_state_change; in save_listen_callbacks()
675 listen_sock.sk_write_space = sk->sk_write_space; in save_listen_callbacks()
676 listen_sock.sk_error_report = sk->sk_error_report; in save_listen_callbacks()
679 static void restore_callbacks(struct socket *sock) in restore_callbacks()
681 struct sock *sk = sock->sk; in restore_callbacks()
684 sk->sk_user_data = NULL; in restore_callbacks()
685 sk->sk_data_ready = listen_sock.sk_data_ready; in restore_callbacks()
686 sk->sk_state_change = listen_sock.sk_state_change; in restore_callbacks()
687 sk->sk_write_space = listen_sock.sk_write_space; in restore_callbacks()
688 sk->sk_error_report = listen_sock.sk_error_report; in restore_callbacks()
692 static void add_listen_sock(struct socket *sock, struct listen_connection *con) in add_listen_sock()
694 struct sock *sk = sock->sk; in add_listen_sock()
698 con->sock = sock; in add_listen_sock()
700 sk->sk_user_data = con; in add_listen_sock()
701 sk->sk_allocation = GFP_NOFS; in add_listen_sock()
703 sk->sk_data_ready = lowcomms_listen_data_ready; in add_listen_sock()
707 /* Make a socket active */
708 static void add_sock(struct socket *sock, struct connection *con) in add_sock()
710 struct sock *sk = sock->sk; in add_sock()
713 con->sock = sock; in add_sock()
715 sk->sk_user_data = con; in add_sock()
717 sk->sk_data_ready = lowcomms_data_ready; in add_sock()
718 sk->sk_write_space = lowcomms_write_space; in add_sock()
719 sk->sk_state_change = lowcomms_state_change; in add_sock()
720 sk->sk_allocation = GFP_NOFS; in add_sock()
721 sk->sk_error_report = lowcomms_error_report; in add_sock()
730 saddr->ss_family = dlm_local_addr[0]->ss_family; in make_sockaddr()
731 if (saddr->ss_family == AF_INET) { in make_sockaddr()
733 in4_addr->sin_port = cpu_to_be16(port); in make_sockaddr()
735 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); in make_sockaddr()
738 in6_addr->sin6_port = cpu_to_be16(port); in make_sockaddr()
741 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); in make_sockaddr()
749 __free_page(e->page); in dlm_page_release()
757 kref_put(&msg->entry->ref, dlm_page_release); in dlm_msg_release()
765 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { in free_entry()
766 if (msg->orig_msg) { in free_entry()
767 msg->orig_msg->retransmit = false; in free_entry()
768 kref_put(&msg->orig_msg->ref, dlm_msg_release); in free_entry()
771 list_del(&msg->list); in free_entry()
772 kref_put(&msg->ref, dlm_msg_release); in free_entry()
775 list_del(&e->list); in free_entry()
776 atomic_dec(&e->con->writequeue_cnt); in free_entry()
777 kref_put(&e->ref, dlm_page_release); in free_entry()
780 static void dlm_close_sock(struct socket **sock) in dlm_close_sock()
793 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); in close_connection()
796 if (tx && !closing && cancel_work_sync(&con->swork)) { in close_connection()
797 log_print("canceled swork for node %d", con->nodeid); in close_connection()
798 clear_bit(CF_WRITE_PENDING, &con->flags); in close_connection()
800 if (rx && !closing && cancel_work_sync(&con->rwork)) { in close_connection()
801 log_print("canceled rwork for node %d", con->nodeid); in close_connection()
802 clear_bit(CF_READ_PENDING, &con->flags); in close_connection()
805 mutex_lock(&con->sock_mutex); in close_connection()
806 dlm_close_sock(&con->sock); in close_connection()
808 if (con->othercon && and_other) { in close_connection()
809 /* Will only re-enter once. */ in close_connection()
810 close_connection(con->othercon, false, tx, rx); in close_connection()
824 spin_lock(&con->writequeue_lock); in close_connection()
825 if (!list_empty(&con->writequeue)) { in close_connection()
826 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
828 if (e->dirty) in close_connection()
831 spin_unlock(&con->writequeue_lock); in close_connection()
833 con->rx_leftover = 0; in close_connection()
834 con->retries = 0; in close_connection()
835 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
836 clear_bit(CF_CONNECTED, &con->flags); in close_connection()
837 clear_bit(CF_DELAY_CONNECT, &con->flags); in close_connection()
838 clear_bit(CF_RECONNECT, &con->flags); in close_connection()
839 clear_bit(CF_EOF, &con->flags); in close_connection()
840 mutex_unlock(&con->sock_mutex); in close_connection()
841 clear_bit(CF_CLOSING, &con->flags); in close_connection()
848 flush_work(&con->swork); in shutdown_connection()
850 mutex_lock(&con->sock_mutex); in shutdown_connection()
852 if (!con->sock) { in shutdown_connection()
853 mutex_unlock(&con->sock_mutex); in shutdown_connection()
857 set_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
858 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
859 mutex_unlock(&con->sock_mutex); in shutdown_connection()
865 ret = wait_event_timeout(con->shutdown_wait, in shutdown_connection()
866 !test_bit(CF_SHUTDOWN, &con->flags), in shutdown_connection()
878 clear_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
884 if (con->othercon) in dlm_tcp_shutdown()
885 shutdown_connection(con->othercon); in dlm_tcp_shutdown()
895 return -ENOMEM; in con_realloc_receive_buf()
898 if (con->rx_leftover) in con_realloc_receive_buf()
899 memmove(newbuf, con->rx_buf, con->rx_leftover); in con_realloc_receive_buf()
902 kfree(con->rx_buf); in con_realloc_receive_buf()
903 con->rx_buflen = newlen; in con_realloc_receive_buf()
904 con->rx_buf = newbuf; in con_realloc_receive_buf()
916 mutex_lock(&con->sock_mutex); in receive_from_sock()
918 if (con->sock == NULL) { in receive_from_sock()
919 ret = -EAGAIN; in receive_from_sock()
925 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { in receive_from_sock()
935 iov.iov_base = con->rx_buf + con->rx_leftover; in receive_from_sock()
936 iov.iov_len = con->rx_buflen - con->rx_leftover; in receive_from_sock()
940 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
942 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
943 if (ret == -EAGAIN) in receive_from_sock()
949 buflen = ret + con->rx_leftover; in receive_from_sock()
950 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); in receive_from_sock()
958 con->rx_leftover = buflen - ret; in receive_from_sock()
959 if (con->rx_leftover) { in receive_from_sock()
960 memmove(con->rx_buf, con->rx_buf + ret, in receive_from_sock()
961 con->rx_leftover); in receive_from_sock()
965 dlm_midcomms_receive_done(con->nodeid); in receive_from_sock()
966 mutex_unlock(&con->sock_mutex); in receive_from_sock()
970 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) in receive_from_sock()
971 queue_work(recv_workqueue, &con->rwork); in receive_from_sock()
972 mutex_unlock(&con->sock_mutex); in receive_from_sock()
973 return -EAGAIN; in receive_from_sock()
978 con, con->nodeid); in receive_from_sock()
980 if (dlm_proto_ops->eof_condition && in receive_from_sock()
981 dlm_proto_ops->eof_condition(con)) { in receive_from_sock()
982 set_bit(CF_EOF, &con->flags); in receive_from_sock()
983 mutex_unlock(&con->sock_mutex); in receive_from_sock()
985 mutex_unlock(&con->sock_mutex); in receive_from_sock()
989 clear_bit(CF_SHUTDOWN, &con->flags); in receive_from_sock()
990 wake_up(&con->shutdown_wait); in receive_from_sock()
994 ret = -1; in receive_from_sock()
996 mutex_unlock(&con->sock_mutex); in receive_from_sock()
1001 /* Listening socket is busy, accept a connection */
1006 struct socket *newsock; in accept_from_sock()
1013 if (!con->sock) in accept_from_sock()
1014 return -ENOTCONN; in accept_from_sock()
1016 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); in accept_from_sock()
1020 /* Get the connected socket's peer */ in accept_from_sock()
1022 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); in accept_from_sock()
1024 result = -ECONNABORTED; in accept_from_sock()
1036 &sin->sin_addr); in accept_from_sock()
1044 &sin6->sin6_addr); in accept_from_sock()
1054 return -1; in accept_from_sock()
1068 result = -ENOMEM; in accept_from_sock()
1072 sock_set_mark(newsock->sk, mark); in accept_from_sock()
1074 mutex_lock(&newcon->sock_mutex); in accept_from_sock()
1075 if (newcon->sock) { in accept_from_sock()
1076 struct connection *othercon = newcon->othercon; in accept_from_sock()
1081 log_print("failed to allocate incoming socket"); in accept_from_sock()
1082 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
1084 result = -ENOMEM; in accept_from_sock()
1091 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
1096 lockdep_set_subclass(&othercon->sock_mutex, 1); in accept_from_sock()
1097 set_bit(CF_IS_OTHERCON, &othercon->flags); in accept_from_sock()
1098 newcon->othercon = othercon; in accept_from_sock()
1099 othercon->sendcon = newcon; in accept_from_sock()
1105 mutex_lock(&othercon->sock_mutex); in accept_from_sock()
1108 mutex_unlock(&othercon->sock_mutex); in accept_from_sock()
1118 set_bit(CF_CONNECTED, &addcon->flags); in accept_from_sock()
1119 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
1123 * between processing the accept adding the socket in accept_from_sock()
1126 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) in accept_from_sock()
1127 queue_work(recv_workqueue, &addcon->rwork); in accept_from_sock()
1137 if (result != -EAGAIN) in accept_from_sock()
1143 * writequeue_entry_complete - try to delete and free write queue entry
1151 e->offset += completed; in writequeue_entry_complete()
1152 e->len -= completed; in writequeue_entry_complete()
1154 e->dirty = true; in writequeue_entry_complete()
1156 if (e->len == 0 && e->users == 0) in writequeue_entry_complete()
1161 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1163 static int sctp_bind_addrs(struct socket *sock, uint16_t port) in sctp_bind_addrs()
1176 result = sock_bind_add(sock->sk, addr, addr_len); in sctp_bind_addrs()
1221 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); in new_writequeue_entry()
1222 if (!entry->page) { in new_writequeue_entry()
1227 entry->offset = 0; in new_writequeue_entry()
1228 entry->len = 0; in new_writequeue_entry()
1229 entry->end = 0; in new_writequeue_entry()
1230 entry->dirty = false; in new_writequeue_entry()
1231 entry->con = con; in new_writequeue_entry()
1232 entry->users = 1; in new_writequeue_entry()
1233 kref_init(&entry->ref); in new_writequeue_entry()
1243 spin_lock(&con->writequeue_lock); in new_wq_entry()
1244 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1245 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1247 kref_get(&e->ref); in new_wq_entry()
1249 *ppc = page_address(e->page) + e->end; in new_wq_entry()
1253 e->end += len; in new_wq_entry()
1254 e->users++; in new_wq_entry()
1263 kref_get(&e->ref); in new_wq_entry()
1264 *ppc = page_address(e->page); in new_wq_entry()
1265 e->end += len; in new_wq_entry()
1266 atomic_inc(&con->writequeue_cnt); in new_wq_entry()
1270 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1273 spin_unlock(&con->writequeue_lock); in new_wq_entry()
1289 kref_init(&msg->ref); in dlm_lowcomms_new_msg_con()
1297 msg->retransmit = false; in dlm_lowcomms_new_msg_con()
1298 msg->orig_msg = NULL; in dlm_lowcomms_new_msg_con()
1299 msg->ppc = *ppc; in dlm_lowcomms_new_msg_con()
1300 msg->len = len; in dlm_lowcomms_new_msg_con()
1301 msg->entry = e; in dlm_lowcomms_new_msg_con()
1340 kref_get(&msg->ref); in dlm_lowcomms_new_msg()
1342 msg->idx = idx; in dlm_lowcomms_new_msg()
1349 struct writequeue_entry *e = msg->entry; in _dlm_lowcomms_commit_msg()
1350 struct connection *con = e->con; in _dlm_lowcomms_commit_msg()
1353 spin_lock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1354 kref_get(&msg->ref); in _dlm_lowcomms_commit_msg()
1355 list_add(&msg->list, &e->msgs); in _dlm_lowcomms_commit_msg()
1357 users = --e->users; in _dlm_lowcomms_commit_msg()
1361 e->len = DLM_WQ_LENGTH_BYTES(e); in _dlm_lowcomms_commit_msg()
1362 spin_unlock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1364 queue_work(send_workqueue, &con->swork); in _dlm_lowcomms_commit_msg()
1368 spin_unlock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1379 srcu_read_unlock(&connections_srcu, msg->idx); in dlm_lowcomms_commit_msg()
1381 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_commit_msg()
1387 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_put_msg()
1396 if (msg->retransmit) in dlm_lowcomms_resend_msg()
1399 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, in dlm_lowcomms_resend_msg()
1402 return -ENOMEM; in dlm_lowcomms_resend_msg()
1404 msg->retransmit = true; in dlm_lowcomms_resend_msg()
1405 kref_get(&msg->ref); in dlm_lowcomms_resend_msg()
1406 msg_resend->orig_msg = msg; in dlm_lowcomms_resend_msg()
1408 memcpy(ppc, msg->ppc, msg->len); in dlm_lowcomms_resend_msg()
1423 mutex_lock(&con->sock_mutex); in send_to_sock()
1424 if (con->sock == NULL) in send_to_sock()
1427 spin_lock(&con->writequeue_lock); in send_to_sock()
1433 len = e->len; in send_to_sock()
1434 offset = e->offset; in send_to_sock()
1435 BUG_ON(len == 0 && e->users == 0); in send_to_sock()
1436 spin_unlock(&con->writequeue_lock); in send_to_sock()
1438 ret = kernel_sendpage(con->sock, e->page, offset, len, in send_to_sock()
1440 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1441 if (ret == -EAGAIN || ret == 0) { in send_to_sock()
1442 if (ret == -EAGAIN && in send_to_sock()
1443 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1444 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1448 set_bit(SOCK_NOSPACE, &con->sock->flags); in send_to_sock()
1449 con->sock->sk->sk_write_pending++; in send_to_sock()
1462 spin_lock(&con->writequeue_lock); in send_to_sock()
1465 spin_unlock(&con->writequeue_lock); in send_to_sock()
1468 if (test_and_clear_bit(CF_EOF, &con->flags)) { in send_to_sock()
1469 mutex_unlock(&con->sock_mutex); in send_to_sock()
1473 clear_bit(CF_SHUTDOWN, &con->flags); in send_to_sock()
1474 wake_up(&con->shutdown_wait); in send_to_sock()
1476 mutex_unlock(&con->sock_mutex); in send_to_sock()
1482 mutex_unlock(&con->sock_mutex); in send_to_sock()
1486 mutex_unlock(&con->sock_mutex); in send_to_sock()
1487 queue_work(send_workqueue, &con->swork); in send_to_sock()
1495 spin_lock(&con->writequeue_lock); in clean_one_writequeue()
1496 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1499 spin_unlock(&con->writequeue_lock); in clean_one_writequeue()
1514 set_bit(CF_CLOSE, &con->flags); in dlm_lowcomms_close()
1517 if (con->othercon) in dlm_lowcomms_close()
1518 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1525 list_del(&na->list); in dlm_lowcomms_close()
1526 while (na->addr_count--) in dlm_lowcomms_close()
1527 kfree(na->addr[na->addr_count]); in dlm_lowcomms_close()
1540 clear_bit(CF_READ_PENDING, &con->flags); in process_recv_sockets()
1553 struct socket *sock; in dlm_connect()
1556 /* Some odd races can cause double-connects, ignore them */ in dlm_connect()
1557 if (con->retries++ > MAX_CONNECT_RETRIES) in dlm_connect()
1560 if (con->sock) { in dlm_connect()
1561 log_print("node %d already connected.", con->nodeid); in dlm_connect()
1566 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1567 dlm_proto_ops->try_new_addr, &mark); in dlm_connect()
1569 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1573 /* Create a socket to communicate with */ in dlm_connect()
1574 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in dlm_connect()
1575 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_connect()
1579 sock_set_mark(sock->sk, mark); in dlm_connect()
1580 dlm_proto_ops->sockopts(sock); in dlm_connect()
1584 result = dlm_proto_ops->bind(sock); in dlm_connect()
1588 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1590 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, in dlm_connect()
1598 dlm_close_sock(&con->sock); in dlm_connect()
1605 if (result != -EHOSTUNREACH && in dlm_connect()
1606 result != -ENETUNREACH && in dlm_connect()
1607 result != -ENETDOWN && in dlm_connect()
1608 result != -EINVAL && in dlm_connect()
1609 result != -EPROTONOSUPPORT) { in dlm_connect()
1610 log_print("connect %d try %d error %d", con->nodeid, in dlm_connect()
1611 con->retries, result); in dlm_connect()
1622 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1624 clear_bit(CF_WRITE_PENDING, &con->flags); in process_send_sockets()
1626 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { in process_send_sockets()
1628 dlm_midcomms_unack_msg_resend(con->nodeid); in process_send_sockets()
1631 if (con->sock == NULL) { in process_send_sockets()
1632 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) in process_send_sockets()
1635 mutex_lock(&con->sock_mutex); in process_send_sockets()
1637 mutex_unlock(&con->sock_mutex); in process_send_sockets()
1640 if (!list_empty(&con->writequeue)) in process_send_sockets()
1662 return -ENOMEM; in work_start()
1670 return -ENOMEM; in work_start()
1678 if (dlm_proto_ops->shutdown_action) in shutdown_conn()
1679 dlm_proto_ops->shutdown_action(con); in shutdown_conn()
1687 * socket activity. in dlm_lowcomms_shutdown()
1705 mutex_lock(&con->sock_mutex); in _stop_conn()
1706 set_bit(CF_CLOSE, &con->flags); in _stop_conn()
1707 set_bit(CF_READ_PENDING, &con->flags); in _stop_conn()
1708 set_bit(CF_WRITE_PENDING, &con->flags); in _stop_conn()
1709 if (con->sock && con->sock->sk) { in _stop_conn()
1710 lock_sock(con->sock->sk); in _stop_conn()
1711 con->sock->sk->sk_user_data = NULL; in _stop_conn()
1712 release_sock(con->sock->sk); in _stop_conn()
1714 if (con->othercon && and_other) in _stop_conn()
1715 _stop_conn(con->othercon, false); in _stop_conn()
1716 mutex_unlock(&con->sock_mutex); in _stop_conn()
1728 kfree(con->rx_buf); in connection_release()
1736 hlist_del_rcu(&con->list); in free_conn()
1738 if (con->othercon) { in free_conn()
1739 clean_one_writequeue(con->othercon); in free_conn()
1740 call_srcu(&connections_srcu, &con->othercon->rcu, in free_conn()
1744 call_srcu(&connections_srcu, &con->rcu, connection_release); in free_conn()
1763 ok &= test_bit(CF_READ_PENDING, &con->flags); in work_flush()
1764 ok &= test_bit(CF_WRITE_PENDING, &con->flags); in work_flush()
1765 if (con->othercon) { in work_flush()
1767 &con->othercon->flags); in work_flush()
1769 &con->othercon->flags); in work_flush()
1792 struct socket *sock; in dlm_listen_for_all()
1796 dlm_proto_ops->name); in dlm_listen_for_all()
1798 result = dlm_proto_ops->listen_validate(); in dlm_listen_for_all()
1802 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in dlm_listen_for_all()
1803 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_listen_for_all()
1805 log_print("Can't create comms socket: %d", result); in dlm_listen_for_all()
1809 sock_set_mark(sock->sk, dlm_config.ci_mark); in dlm_listen_for_all()
1810 dlm_proto_ops->listen_sockopts(sock); in dlm_listen_for_all()
1812 result = dlm_proto_ops->listen_bind(sock); in dlm_listen_for_all()
1820 result = sock->ops->listen(sock, 5); in dlm_listen_for_all()
1833 static int dlm_tcp_bind(struct socket *sock) in dlm_tcp_bind()
1838 /* Bind to our cluster-known address connecting to avoid in dlm_tcp_bind()
1844 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, in dlm_tcp_bind()
1854 static int dlm_tcp_connect(struct connection *con, struct socket *sock, in dlm_tcp_connect()
1859 ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); in dlm_tcp_connect()
1861 case -EINPROGRESS: in dlm_tcp_connect()
1872 /* We don't support multi-homed hosts */ in dlm_tcp_listen_validate()
1874 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); in dlm_tcp_listen_validate()
1875 return -EINVAL; in dlm_tcp_listen_validate()
1881 static void dlm_tcp_sockopts(struct socket *sock) in dlm_tcp_sockopts()
1884 tcp_sock_set_nodelay(sock->sk); in dlm_tcp_sockopts()
1887 static void dlm_tcp_listen_sockopts(struct socket *sock) in dlm_tcp_listen_sockopts()
1890 sock_set_reuseaddr(sock->sk); in dlm_tcp_listen_sockopts()
1893 static int dlm_tcp_listen_bind(struct socket *sock) in dlm_tcp_listen_bind()
1899 return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0], in dlm_tcp_listen_bind()
1916 static int dlm_sctp_bind(struct socket *sock) in dlm_sctp_bind()
1921 static int dlm_sctp_connect(struct connection *con, struct socket *sock, in dlm_sctp_connect()
1927 * Make sock->ops->connect() function return in specified time, in dlm_sctp_connect()
1931 sock_set_sndtimeo(sock->sk, 5); in dlm_sctp_connect()
1932 ret = sock->ops->connect(sock, addr, addr_len, 0); in dlm_sctp_connect()
1933 sock_set_sndtimeo(sock->sk, 0); in dlm_sctp_connect()
1937 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) in dlm_sctp_connect()
1938 log_print("connected to node %d", con->nodeid); in dlm_sctp_connect()
1947 return -EOPNOTSUPP; in dlm_sctp_listen_validate()
1954 static int dlm_sctp_bind_listen(struct socket *sock) in dlm_sctp_bind_listen()
1959 static void dlm_sctp_sockopts(struct socket *sock) in dlm_sctp_sockopts()
1962 sctp_sock_set_nodelay(sock->sk); in dlm_sctp_sockopts()
1963 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); in dlm_sctp_sockopts()
1980 int error = -EINVAL; in dlm_lowcomms_start()
1988 error = -ENOTCONN; in dlm_lowcomms_start()
2012 error = -EINVAL; in dlm_lowcomms_start()
2040 list_del(&na->list); in dlm_lowcomms_exit()
2041 while (na->addr_count--) in dlm_lowcomms_exit()
2042 kfree(na->addr[na->addr_count]); in dlm_lowcomms_exit()