Lines Matching refs:con
85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
108 struct connection *con; member
156 struct connection *con; in __find_con() local
161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
162 if (con->nodeid == nodeid) { in __find_con()
164 return con; in __find_con()
178 struct connection *con, *tmp; in nodeid2con() local
181 con = __find_con(nodeid); in nodeid2con()
182 if (con || !alloc) in nodeid2con()
183 return con; in nodeid2con()
185 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
186 if (!con) in nodeid2con()
189 con->rx_buflen = dlm_config.ci_buffer_size; in nodeid2con()
190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); in nodeid2con()
191 if (!con->rx_buf) { in nodeid2con()
192 kfree(con); in nodeid2con()
196 con->nodeid = nodeid; in nodeid2con()
197 mutex_init(&con->sock_mutex); in nodeid2con()
198 INIT_LIST_HEAD(&con->writequeue); in nodeid2con()
199 spin_lock_init(&con->writequeue_lock); in nodeid2con()
200 INIT_WORK(&con->swork, process_send_sockets); in nodeid2con()
201 INIT_WORK(&con->rwork, process_recv_sockets); in nodeid2con()
202 init_waitqueue_head(&con->shutdown_wait); in nodeid2con()
205 if (con->nodeid) { in nodeid2con()
208 con->connect_action = zerocon->connect_action; in nodeid2con()
209 if (!con->rx_action) in nodeid2con()
210 con->rx_action = zerocon->rx_action; in nodeid2con()
225 kfree(con->rx_buf); in nodeid2con()
226 kfree(con); in nodeid2con()
230 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
233 return con; in nodeid2con()
240 struct connection *con; in foreach_conn() local
244 hlist_for_each_entry_rcu(con, &connection_hash[i], list) in foreach_conn()
245 conn_func(con); in foreach_conn()
404 struct connection *con; in lowcomms_data_ready() local
407 con = sock2con(sk); in lowcomms_data_ready()
408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) in lowcomms_data_ready()
409 queue_work(recv_workqueue, &con->rwork); in lowcomms_data_ready()
415 struct connection *con; in lowcomms_write_space() local
418 con = sock2con(sk); in lowcomms_write_space()
419 if (!con) in lowcomms_write_space()
422 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
425 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
429 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
434 static inline void lowcomms_connect_sock(struct connection *con) in lowcomms_connect_sock() argument
436 if (test_bit(CF_CLOSE, &con->flags)) in lowcomms_connect_sock()
438 queue_work(send_workqueue, &con->swork); in lowcomms_connect_sock()
459 struct connection *con; in dlm_lowcomms_connect_node() local
464 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_connect_node()
465 if (!con) in dlm_lowcomms_connect_node()
467 lowcomms_connect_sock(con); in dlm_lowcomms_connect_node()
473 struct connection *con; in lowcomms_error_report() local
478 con = sock2con(sk); in lowcomms_error_report()
479 if (con == NULL) in lowcomms_error_report()
483 if (con->sock == NULL || in lowcomms_error_report()
484 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { in lowcomms_error_report()
488 con->nodeid, dlm_config.ci_tcp_port, in lowcomms_error_report()
496 con->nodeid, &sin4->sin_addr.s_addr, in lowcomms_error_report()
505 con->nodeid, sin6->sin6_addr.s6_addr32[0], in lowcomms_error_report()
543 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
548 con->sock = sock; in add_sock()
550 sk->sk_user_data = con; in add_sock()
580 static void close_connection(struct connection *con, bool and_other, in close_connection() argument
583 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); in close_connection()
585 if (tx && !closing && cancel_work_sync(&con->swork)) { in close_connection()
586 log_print("canceled swork for node %d", con->nodeid); in close_connection()
587 clear_bit(CF_WRITE_PENDING, &con->flags); in close_connection()
589 if (rx && !closing && cancel_work_sync(&con->rwork)) { in close_connection()
590 log_print("canceled rwork for node %d", con->nodeid); in close_connection()
591 clear_bit(CF_READ_PENDING, &con->flags); in close_connection()
594 mutex_lock(&con->sock_mutex); in close_connection()
595 if (con->sock) { in close_connection()
596 restore_callbacks(con->sock); in close_connection()
597 sock_release(con->sock); in close_connection()
598 con->sock = NULL; in close_connection()
600 if (con->othercon && and_other) { in close_connection()
602 close_connection(con->othercon, false, true, true); in close_connection()
605 con->rx_leftover = 0; in close_connection()
606 con->retries = 0; in close_connection()
607 mutex_unlock(&con->sock_mutex); in close_connection()
608 clear_bit(CF_CLOSING, &con->flags); in close_connection()
611 static void shutdown_connection(struct connection *con) in shutdown_connection() argument
615 if (cancel_work_sync(&con->swork)) { in shutdown_connection()
616 log_print("canceled swork for node %d", con->nodeid); in shutdown_connection()
617 clear_bit(CF_WRITE_PENDING, &con->flags); in shutdown_connection()
620 mutex_lock(&con->sock_mutex); in shutdown_connection()
622 if (!con->sock) { in shutdown_connection()
623 mutex_unlock(&con->sock_mutex); in shutdown_connection()
627 set_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
628 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
629 mutex_unlock(&con->sock_mutex); in shutdown_connection()
632 con, ret); in shutdown_connection()
635 ret = wait_event_timeout(con->shutdown_wait, in shutdown_connection()
636 !test_bit(CF_SHUTDOWN, &con->flags), in shutdown_connection()
640 con); in shutdown_connection()
648 clear_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
649 close_connection(con, false, true, true); in shutdown_connection()
652 static void dlm_tcp_shutdown(struct connection *con) in dlm_tcp_shutdown() argument
654 if (con->othercon) in dlm_tcp_shutdown()
655 shutdown_connection(con->othercon); in dlm_tcp_shutdown()
656 shutdown_connection(con); in dlm_tcp_shutdown()
659 static int con_realloc_receive_buf(struct connection *con, int newlen) in con_realloc_receive_buf() argument
668 if (con->rx_leftover) in con_realloc_receive_buf()
669 memmove(newbuf, con->rx_buf, con->rx_leftover); in con_realloc_receive_buf()
672 kfree(con->rx_buf); in con_realloc_receive_buf()
673 con->rx_buflen = newlen; in con_realloc_receive_buf()
674 con->rx_buf = newbuf; in con_realloc_receive_buf()
680 static int receive_from_sock(struct connection *con) in receive_from_sock() argument
687 mutex_lock(&con->sock_mutex); in receive_from_sock()
689 if (con->sock == NULL) { in receive_from_sock()
694 if (con->nodeid == 0) { in receive_from_sock()
701 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { in receive_from_sock()
702 ret = con_realloc_receive_buf(con, buflen); in receive_from_sock()
710 iov.iov_base = con->rx_buf + con->rx_leftover; in receive_from_sock()
711 iov.iov_len = con->rx_buflen - con->rx_leftover; in receive_from_sock()
715 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
723 buflen = ret + con->rx_leftover; in receive_from_sock()
724 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); in receive_from_sock()
732 con->rx_leftover = buflen - ret; in receive_from_sock()
733 if (con->rx_leftover) { in receive_from_sock()
734 memmove(con->rx_buf, con->rx_buf + ret, in receive_from_sock()
735 con->rx_leftover); in receive_from_sock()
742 mutex_unlock(&con->sock_mutex); in receive_from_sock()
746 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) in receive_from_sock()
747 queue_work(recv_workqueue, &con->rwork); in receive_from_sock()
748 mutex_unlock(&con->sock_mutex); in receive_from_sock()
752 mutex_unlock(&con->sock_mutex); in receive_from_sock()
755 close_connection(con, false, true, false); in receive_from_sock()
758 con, con->nodeid); in receive_from_sock()
760 clear_bit(CF_SHUTDOWN, &con->flags); in receive_from_sock()
761 wake_up(&con->shutdown_wait); in receive_from_sock()
770 static int accept_from_sock(struct connection *con) in accept_from_sock() argument
785 mutex_lock_nested(&con->sock_mutex, 0); in accept_from_sock()
787 if (!con->sock) { in accept_from_sock()
788 mutex_unlock(&con->sock_mutex); in accept_from_sock()
792 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); in accept_from_sock()
812 mutex_unlock(&con->sock_mutex); in accept_from_sock()
892 mutex_unlock(&con->sock_mutex); in accept_from_sock()
897 mutex_unlock(&con->sock_mutex); in accept_from_sock()
933 static int sctp_bind_addrs(struct connection *con, uint16_t port) in sctp_bind_addrs() argument
944 result = kernel_bind(con->sock, addr, addr_len); in sctp_bind_addrs()
946 result = sock_bind_add(con->sock->sk, addr, addr_len); in sctp_bind_addrs()
962 static void sctp_connect_to_sock(struct connection *con) in sctp_connect_to_sock() argument
970 if (con->nodeid == 0) { in sctp_connect_to_sock()
975 dlm_comm_mark(con->nodeid, &mark); in sctp_connect_to_sock()
977 mutex_lock(&con->sock_mutex); in sctp_connect_to_sock()
980 if (con->retries++ > MAX_CONNECT_RETRIES) in sctp_connect_to_sock()
983 if (con->sock) { in sctp_connect_to_sock()
984 log_print("node %d already connected.", con->nodeid); in sctp_connect_to_sock()
989 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); in sctp_connect_to_sock()
991 log_print("no address for nodeid %d", con->nodeid); in sctp_connect_to_sock()
1003 con->rx_action = receive_from_sock; in sctp_connect_to_sock()
1004 con->connect_action = sctp_connect_to_sock; in sctp_connect_to_sock()
1005 add_sock(sock, con); in sctp_connect_to_sock()
1008 if (sctp_bind_addrs(con, 0)) in sctp_connect_to_sock()
1013 log_print("connecting to %d", con->nodeid); in sctp_connect_to_sock()
1034 con->sock = NULL; in sctp_connect_to_sock()
1047 log_print("connect %d try %d error %d", con->nodeid, in sctp_connect_to_sock()
1048 con->retries, result); in sctp_connect_to_sock()
1049 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1051 lowcomms_connect_sock(con); in sctp_connect_to_sock()
1056 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1060 static void tcp_connect_to_sock(struct connection *con) in tcp_connect_to_sock() argument
1068 if (con->nodeid == 0) { in tcp_connect_to_sock()
1073 dlm_comm_mark(con->nodeid, &mark); in tcp_connect_to_sock()
1075 mutex_lock(&con->sock_mutex); in tcp_connect_to_sock()
1076 if (con->retries++ > MAX_CONNECT_RETRIES) in tcp_connect_to_sock()
1080 if (con->sock) in tcp_connect_to_sock()
1092 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); in tcp_connect_to_sock()
1094 log_print("no address for nodeid %d", con->nodeid); in tcp_connect_to_sock()
1098 con->rx_action = receive_from_sock; in tcp_connect_to_sock()
1099 con->connect_action = tcp_connect_to_sock; in tcp_connect_to_sock()
1100 con->shutdown_action = dlm_tcp_shutdown; in tcp_connect_to_sock()
1101 add_sock(sock, con); in tcp_connect_to_sock()
1116 log_print("connecting to %d", con->nodeid); in tcp_connect_to_sock()
1129 if (con->sock) { in tcp_connect_to_sock()
1130 sock_release(con->sock); in tcp_connect_to_sock()
1131 con->sock = NULL; in tcp_connect_to_sock()
1144 log_print("connect %d try %d error %d", con->nodeid, in tcp_connect_to_sock()
1145 con->retries, result); in tcp_connect_to_sock()
1146 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1148 lowcomms_connect_sock(con); in tcp_connect_to_sock()
1152 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1156 static struct socket *tcp_create_listen_sock(struct connection *con, in tcp_create_listen_sock() argument
1184 sock->sk->sk_user_data = con; in tcp_create_listen_sock()
1186 con->rx_action = accept_from_sock; in tcp_create_listen_sock()
1187 con->connect_action = tcp_connect_to_sock; in tcp_create_listen_sock()
1197 con->sock = NULL; in tcp_create_listen_sock()
1245 struct connection *con = nodeid2con(0, GFP_NOFS); in sctp_listen_for_all() local
1247 if (!con) in sctp_listen_for_all()
1265 sock->sk->sk_user_data = con; in sctp_listen_for_all()
1267 con->sock = sock; in sctp_listen_for_all()
1268 con->sock->sk->sk_data_ready = lowcomms_data_ready; in sctp_listen_for_all()
1269 con->rx_action = accept_from_sock; in sctp_listen_for_all()
1270 con->connect_action = sctp_connect_to_sock; in sctp_listen_for_all()
1275 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) in sctp_listen_for_all()
1288 con->sock = NULL; in sctp_listen_for_all()
1296 struct connection *con = nodeid2con(0, GFP_NOFS); in tcp_listen_for_all() local
1299 if (!con) in tcp_listen_for_all()
1311 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); in tcp_listen_for_all()
1313 add_sock(sock, con); in tcp_listen_for_all()
1325 static struct writequeue_entry *new_writequeue_entry(struct connection *con, in new_writequeue_entry() argument
1344 entry->con = con; in new_writequeue_entry()
1351 struct connection *con; in dlm_lowcomms_get_buffer() local
1355 con = nodeid2con(nodeid, allocation); in dlm_lowcomms_get_buffer()
1356 if (!con) in dlm_lowcomms_get_buffer()
1359 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1360 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); in dlm_lowcomms_get_buffer()
1361 if ((&e->list == &con->writequeue) || in dlm_lowcomms_get_buffer()
1369 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1377 e = new_writequeue_entry(con, allocation); in dlm_lowcomms_get_buffer()
1379 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1383 list_add_tail(&e->list, &con->writequeue); in dlm_lowcomms_get_buffer()
1384 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1393 struct connection *con = e->con; in dlm_lowcomms_commit_buffer() local
1396 spin_lock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1401 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1403 queue_work(send_workqueue, &con->swork); in dlm_lowcomms_commit_buffer()
1407 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1412 static void send_to_sock(struct connection *con) in send_to_sock() argument
1420 mutex_lock(&con->sock_mutex); in send_to_sock()
1421 if (con->sock == NULL) in send_to_sock()
1424 spin_lock(&con->writequeue_lock); in send_to_sock()
1426 e = list_entry(con->writequeue.next, struct writequeue_entry, in send_to_sock()
1428 if ((struct list_head *) e == &con->writequeue) in send_to_sock()
1434 spin_unlock(&con->writequeue_lock); in send_to_sock()
1438 ret = kernel_sendpage(con->sock, e->page, offset, len, in send_to_sock()
1442 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1443 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1447 set_bit(SOCK_NOSPACE, &con->sock->flags); in send_to_sock()
1448 con->sock->sk->sk_write_pending++; in send_to_sock()
1462 spin_lock(&con->writequeue_lock); in send_to_sock()
1465 spin_unlock(&con->writequeue_lock); in send_to_sock()
1467 mutex_unlock(&con->sock_mutex); in send_to_sock()
1471 mutex_unlock(&con->sock_mutex); in send_to_sock()
1472 close_connection(con, false, false, true); in send_to_sock()
1475 queue_work(send_workqueue, &con->swork); in send_to_sock()
1479 mutex_unlock(&con->sock_mutex); in send_to_sock()
1480 queue_work(send_workqueue, &con->swork); in send_to_sock()
1484 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1488 spin_lock(&con->writequeue_lock); in clean_one_writequeue()
1489 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1493 spin_unlock(&con->writequeue_lock); in clean_one_writequeue()
1500 struct connection *con; in dlm_lowcomms_close() local
1504 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1505 if (con) { in dlm_lowcomms_close()
1506 set_bit(CF_CLOSE, &con->flags); in dlm_lowcomms_close()
1507 close_connection(con, true, true, true); in dlm_lowcomms_close()
1508 clean_one_writequeue(con); in dlm_lowcomms_close()
1527 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1530 clear_bit(CF_READ_PENDING, &con->flags); in process_recv_sockets()
1532 err = con->rx_action(con); in process_recv_sockets()
1539 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1541 clear_bit(CF_WRITE_PENDING, &con->flags); in process_send_sockets()
1542 if (con->sock == NULL) /* not mutex protected so check it inside too */ in process_send_sockets()
1543 con->connect_action(con); in process_send_sockets()
1544 if (!list_empty(&con->writequeue)) in process_send_sockets()
1545 send_to_sock(con); in process_send_sockets()
1576 static void _stop_conn(struct connection *con, bool and_other) in _stop_conn() argument
1578 mutex_lock(&con->sock_mutex); in _stop_conn()
1579 set_bit(CF_CLOSE, &con->flags); in _stop_conn()
1580 set_bit(CF_READ_PENDING, &con->flags); in _stop_conn()
1581 set_bit(CF_WRITE_PENDING, &con->flags); in _stop_conn()
1582 if (con->sock && con->sock->sk) { in _stop_conn()
1583 write_lock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1584 con->sock->sk->sk_user_data = NULL; in _stop_conn()
1585 write_unlock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1587 if (con->othercon && and_other) in _stop_conn()
1588 _stop_conn(con->othercon, false); in _stop_conn()
1589 mutex_unlock(&con->sock_mutex); in _stop_conn()
1592 static void stop_conn(struct connection *con) in stop_conn() argument
1594 _stop_conn(con, true); in stop_conn()
1597 static void shutdown_conn(struct connection *con) in shutdown_conn() argument
1599 if (con->shutdown_action) in shutdown_conn()
1600 con->shutdown_action(con); in shutdown_conn()
1605 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1607 kfree(con->rx_buf); in connection_release()
1608 kfree(con); in connection_release()
1611 static void free_conn(struct connection *con) in free_conn() argument
1613 close_connection(con, true, true, true); in free_conn()
1615 hlist_del_rcu(&con->list); in free_conn()
1617 if (con->othercon) { in free_conn()
1618 clean_one_writequeue(con->othercon); in free_conn()
1619 call_rcu(&con->othercon->rcu, connection_release); in free_conn()
1621 clean_one_writequeue(con); in free_conn()
1622 call_rcu(&con->rcu, connection_release); in free_conn()
1629 struct connection *con; in work_flush() local
1640 hlist_for_each_entry_rcu(con, &connection_hash[i], in work_flush()
1642 ok &= test_bit(CF_READ_PENDING, &con->flags); in work_flush()
1643 ok &= test_bit(CF_WRITE_PENDING, &con->flags); in work_flush()
1644 if (con->othercon) { in work_flush()
1646 &con->othercon->flags); in work_flush()
1648 &con->othercon->flags); in work_flush()
1678 struct connection *con; in dlm_lowcomms_start() local
1709 con = nodeid2con(0,0); in dlm_lowcomms_start()
1710 if (con) in dlm_lowcomms_start()
1711 free_conn(con); in dlm_lowcomms_start()