Lines Matching full:transport

5  * Client-side transport implementation for sockets.
16 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
179 * transport connection with the server. Some servers like to drop a TCP
186 * TCP idle timeout; client drops the transport socket if it is idle
469 xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) in xs_read_header() argument
471 if (!transport->recv.copied) { in xs_read_header()
472 if (buf->head[0].iov_len >= transport->recv.offset) in xs_read_header()
474 &transport->recv.xid, in xs_read_header()
475 transport->recv.offset); in xs_read_header()
476 transport->recv.copied = transport->recv.offset; in xs_read_header()
481 xs_read_stream_request_done(struct sock_xprt *transport) in xs_read_stream_request_done() argument
483 return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); in xs_read_stream_request_done()
487 xs_read_stream_check_eor(struct sock_xprt *transport, in xs_read_stream_check_eor() argument
490 if (xs_read_stream_request_done(transport)) in xs_read_stream_check_eor()
495 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, in xs_read_stream_request() argument
502 xs_read_header(transport, buf); in xs_read_stream_request()
504 want = transport->recv.len - transport->recv.offset; in xs_read_stream_request()
506 ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, in xs_read_stream_request()
507 transport->recv.copied + want, in xs_read_stream_request()
508 transport->recv.copied, in xs_read_stream_request()
510 transport->recv.offset += read; in xs_read_stream_request()
511 transport->recv.copied += read; in xs_read_stream_request()
514 if (transport->recv.offset == transport->recv.len) in xs_read_stream_request()
515 xs_read_stream_check_eor(transport, msg); in xs_read_stream_request()
542 xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg, in xs_read_stream_header() argument
546 .iov_base = &transport->recv.fraghdr, in xs_read_stream_header()
549 return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek); in xs_read_stream_header()
554 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) in xs_read_stream_call() argument
556 struct rpc_xprt *xprt = &transport->xprt; in xs_read_stream_call()
561 req = xprt_lookup_bc_request(xprt, transport->recv.xid); in xs_read_stream_call()
566 if (transport->recv.copied && !req->rq_private_buf.len) in xs_read_stream_call()
569 ret = xs_read_stream_request(transport, msg, flags, req); in xs_read_stream_call()
571 xprt_complete_bc_request(req, transport->recv.copied); in xs_read_stream_call()
573 req->rq_private_buf.len = transport->recv.copied; in xs_read_stream_call()
579 xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) in xs_read_stream_call() argument
586 xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) in xs_read_stream_reply() argument
588 struct rpc_xprt *xprt = &transport->xprt; in xs_read_stream_reply()
594 req = xprt_lookup_rqst(xprt, transport->recv.xid); in xs_read_stream_reply()
595 if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { in xs_read_stream_reply()
602 ret = xs_read_stream_request(transport, msg, flags, req); in xs_read_stream_reply()
606 xprt_complete_rqst(req->rq_task, transport->recv.copied); in xs_read_stream_reply()
608 req->rq_private_buf.len = transport->recv.copied; in xs_read_stream_reply()
616 xs_read_stream(struct sock_xprt *transport, int flags) in xs_read_stream() argument
622 if (transport->recv.len == 0) { in xs_read_stream()
623 want = xs_read_stream_headersize(transport->recv.copied != 0); in xs_read_stream()
624 ret = xs_read_stream_header(transport, &msg, flags, want, in xs_read_stream()
625 transport->recv.offset); in xs_read_stream()
628 transport->recv.offset = ret; in xs_read_stream()
629 if (transport->recv.offset != want) in xs_read_stream()
630 return transport->recv.offset; in xs_read_stream()
631 transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & in xs_read_stream()
633 transport->recv.offset -= sizeof(transport->recv.fraghdr); in xs_read_stream()
637 switch (be32_to_cpu(transport->recv.calldir)) { in xs_read_stream()
642 ret = xs_read_stream_call(transport, &msg, flags); in xs_read_stream()
645 ret = xs_read_stream_reply(transport, &msg, flags); in xs_read_stream()
648 transport->recv.calldir = cpu_to_be32(-1); in xs_read_stream()
649 transport->recv.copied = -1; in xs_read_stream()
654 if (transport->recv.offset < transport->recv.len) { in xs_read_stream()
658 ret = xs_read_discard(transport->sock, &msg, flags, in xs_read_stream()
659 transport->recv.len - transport->recv.offset); in xs_read_stream()
662 transport->recv.offset += ret; in xs_read_stream()
664 if (transport->recv.offset != transport->recv.len) in xs_read_stream()
667 if (xs_read_stream_request_done(transport)) { in xs_read_stream()
668 trace_xs_stream_read_request(transport); in xs_read_stream()
669 transport->recv.copied = 0; in xs_read_stream()
671 transport->recv.offset = 0; in xs_read_stream()
672 transport->recv.len = 0; in xs_read_stream()
678 static __poll_t xs_poll_socket(struct sock_xprt *transport) in xs_poll_socket() argument
680 return transport->sock->ops->poll(transport->file, transport->sock, in xs_poll_socket()
684 static bool xs_poll_socket_readable(struct sock_xprt *transport) in xs_poll_socket_readable() argument
686 __poll_t events = xs_poll_socket(transport); in xs_poll_socket_readable()
691 static void xs_poll_check_readable(struct sock_xprt *transport) in xs_poll_check_readable() argument
694 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); in xs_poll_check_readable()
695 if (!xs_poll_socket_readable(transport)) in xs_poll_check_readable()
697 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) in xs_poll_check_readable()
698 queue_work(xprtiod_workqueue, &transport->recv_worker); in xs_poll_check_readable()
701 static void xs_stream_data_receive(struct sock_xprt *transport) in xs_stream_data_receive() argument
706 mutex_lock(&transport->recv_mutex); in xs_stream_data_receive()
707 if (transport->sock == NULL) in xs_stream_data_receive()
710 ret = xs_read_stream(transport, MSG_DONTWAIT); in xs_stream_data_receive()
717 kernel_sock_shutdown(transport->sock, SHUT_RDWR); in xs_stream_data_receive()
719 xs_poll_check_readable(transport); in xs_stream_data_receive()
721 mutex_unlock(&transport->recv_mutex); in xs_stream_data_receive()
722 trace_xs_stream_read_data(&transport->xprt, ret, read); in xs_stream_data_receive()
727 struct sock_xprt *transport = in xs_stream_data_receive_workfn() local
731 xs_stream_data_receive(transport); in xs_stream_data_receive_workfn()
736 xs_stream_reset_connect(struct sock_xprt *transport) in xs_stream_reset_connect() argument
738 transport->recv.offset = 0; in xs_stream_reset_connect()
739 transport->recv.len = 0; in xs_stream_reset_connect()
740 transport->recv.copied = 0; in xs_stream_reset_connect()
741 transport->xmit.offset = 0; in xs_stream_reset_connect()
745 xs_stream_start_connect(struct sock_xprt *transport) in xs_stream_start_connect() argument
747 transport->xprt.stat.connect_count++; in xs_stream_start_connect()
748 transport->xprt.stat.connect_start = jiffies; in xs_stream_start_connect()
761 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_nospace() local
762 struct sock *sk = transport->inet; in xs_nospace()
765 trace_rpc_socket_nospace(req, transport); in xs_nospace()
806 xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) in xs_send_request_was_aborted() argument
808 return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; in xs_send_request_was_aborted()
836 struct sock_xprt *transport = in xs_local_send_request() local
848 if (xs_send_request_was_aborted(transport, req)) { in xs_local_send_request()
857 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, in xs_local_send_request()
858 transport->xmit.offset, rm, &sent); in xs_local_send_request()
860 __func__, xdr->len - transport->xmit.offset, status); in xs_local_send_request()
862 if (status == -EAGAIN && sock_writeable(transport->inet)) in xs_local_send_request()
866 transport->xmit.offset += sent; in xs_local_send_request()
867 req->rq_bytes_sent = transport->xmit.offset; in xs_local_send_request()
869 req->rq_xmit_bytes_sent += transport->xmit.offset; in xs_local_send_request()
870 transport->xmit.offset = 0; in xs_local_send_request()
908 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_udp_send_request() local
929 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent); in xs_udp_send_request()
938 if (status == -EAGAIN && sock_writeable(transport->inet)) in xs_udp_send_request()
991 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_send_request() local
1003 if (xs_send_request_was_aborted(transport, req)) { in xs_tcp_send_request()
1004 if (transport->sock != NULL) in xs_tcp_send_request()
1005 kernel_sock_shutdown(transport->sock, SHUT_RDWR); in xs_tcp_send_request()
1013 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) in xs_tcp_send_request()
1014 xs_tcp_set_socket_timeouts(xprt, transport->sock); in xs_tcp_send_request()
1021 status = xprt_sock_sendmsg(transport->sock, &msg, xdr, in xs_tcp_send_request()
1022 transport->xmit.offset, rm, &sent); in xs_tcp_send_request()
1025 xdr->len - transport->xmit.offset, status); in xs_tcp_send_request()
1029 transport->xmit.offset += sent; in xs_tcp_send_request()
1030 req->rq_bytes_sent = transport->xmit.offset; in xs_tcp_send_request()
1032 req->rq_xmit_bytes_sent += transport->xmit.offset; in xs_tcp_send_request()
1033 transport->xmit.offset = 0; in xs_tcp_send_request()
1044 if (test_bit(SOCK_NOSPACE, &transport->sock->flags)) in xs_tcp_send_request()
1088 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) in xs_save_old_callbacks() argument
1090 transport->old_data_ready = sk->sk_data_ready; in xs_save_old_callbacks()
1091 transport->old_state_change = sk->sk_state_change; in xs_save_old_callbacks()
1092 transport->old_write_space = sk->sk_write_space; in xs_save_old_callbacks()
1093 transport->old_error_report = sk->sk_error_report; in xs_save_old_callbacks()
1096 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) in xs_restore_old_callbacks() argument
1098 sk->sk_data_ready = transport->old_data_ready; in xs_restore_old_callbacks()
1099 sk->sk_state_change = transport->old_state_change; in xs_restore_old_callbacks()
1100 sk->sk_write_space = transport->old_write_space; in xs_restore_old_callbacks()
1101 sk->sk_error_report = transport->old_error_report; in xs_restore_old_callbacks()
1106 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_sock_reset_state_flags() local
1108 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); in xs_sock_reset_state_flags()
1109 clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); in xs_sock_reset_state_flags()
1110 clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); in xs_sock_reset_state_flags()
1111 clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); in xs_sock_reset_state_flags()
1114 static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) in xs_run_error_worker() argument
1116 set_bit(nr, &transport->sock_state); in xs_run_error_worker()
1117 queue_work(xprtiod_workqueue, &transport->error_worker); in xs_run_error_worker()
1138 struct sock_xprt *transport; in xs_error_report() local
1145 transport = container_of(xprt, struct sock_xprt, xprt); in xs_error_report()
1146 transport->xprt_err = -sk->sk_err; in xs_error_report()
1147 if (transport->xprt_err == 0) in xs_error_report()
1150 xprt, -transport->xprt_err); in xs_error_report()
1151 trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err); in xs_error_report()
1155 xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); in xs_error_report()
1160 static void xs_reset_transport(struct sock_xprt *transport) in xs_reset_transport() argument
1162 struct socket *sock = transport->sock; in xs_reset_transport()
1163 struct sock *sk = transport->inet; in xs_reset_transport()
1164 struct rpc_xprt *xprt = &transport->xprt; in xs_reset_transport()
1165 struct file *filp = transport->file; in xs_reset_transport()
1170 if (atomic_read(&transport->xprt.swapper)) in xs_reset_transport()
1175 mutex_lock(&transport->recv_mutex); in xs_reset_transport()
1177 transport->inet = NULL; in xs_reset_transport()
1178 transport->sock = NULL; in xs_reset_transport()
1179 transport->file = NULL; in xs_reset_transport()
1183 xs_restore_old_callbacks(transport, sk); in xs_reset_transport()
1188 xs_stream_reset_connect(transport); in xs_reset_transport()
1189 mutex_unlock(&transport->recv_mutex); in xs_reset_transport()
1199 * @xprt: transport
1209 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_close() local
1213 xs_reset_transport(transport); in xs_close()
1219 dprintk("RPC: injecting transport disconnect on xprt=%p\n", in xs_inject_disconnect()
1231 * xs_destroy - prepare to shutdown a transport
1232 * @xprt: doomed transport
1237 struct sock_xprt *transport = container_of(xprt, in xs_destroy() local
1241 cancel_delayed_work_sync(&transport->connect_worker); in xs_destroy()
1243 cancel_work_sync(&transport->recv_worker); in xs_destroy()
1244 cancel_work_sync(&transport->error_worker); in xs_destroy()
1251 * @xprt: transport
1310 static void xs_udp_data_receive(struct sock_xprt *transport) in xs_udp_data_receive() argument
1316 mutex_lock(&transport->recv_mutex); in xs_udp_data_receive()
1317 sk = transport->inet; in xs_udp_data_receive()
1324 xs_udp_data_read_skb(&transport->xprt, sk, skb); in xs_udp_data_receive()
1328 xs_poll_check_readable(transport); in xs_udp_data_receive()
1330 mutex_unlock(&transport->recv_mutex); in xs_udp_data_receive()
1335 struct sock_xprt *transport = in xs_udp_data_receive_workfn() local
1339 xs_udp_data_receive(transport); in xs_udp_data_receive_workfn()
1356 struct sock_xprt *transport = container_of(xprt, in xs_data_ready() local
1358 transport->old_data_ready(sk); in xs_data_ready()
1364 if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) in xs_data_ready()
1365 queue_work(xprtiod_workqueue, &transport->recv_worker); in xs_data_ready()
1394 struct sock_xprt *transport; in xs_tcp_state_change() local
1406 transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_state_change()
1412 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); in xs_tcp_state_change()
1418 xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); in xs_tcp_state_change()
1435 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); in xs_tcp_state_change()
1453 &transport->sock_state)) in xs_tcp_state_change()
1457 xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); in xs_tcp_state_change()
1466 struct sock_xprt *transport; in xs_write_space() local
1475 transport = container_of(xprt, struct sock_xprt, xprt); in xs_write_space()
1481 xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); in xs_write_space()
1531 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_udp_do_set_buffer_size() local
1532 struct sock *sk = transport->inet; in xs_udp_do_set_buffer_size()
1534 if (transport->rcvsize) { in xs_udp_do_set_buffer_size()
1536 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; in xs_udp_do_set_buffer_size()
1538 if (transport->sndsize) { in xs_udp_do_set_buffer_size()
1540 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; in xs_udp_do_set_buffer_size()
1547 * @xprt: generic transport
1555 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_udp_set_buffer_size() local
1557 transport->sndsize = 0; in xs_udp_set_buffer_size()
1559 transport->sndsize = sndsize + 1024; in xs_udp_set_buffer_size()
1560 transport->rcvsize = 0; in xs_udp_set_buffer_size()
1562 transport->rcvsize = rcvsize + 1024; in xs_udp_set_buffer_size()
1568 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1569 * @xprt: controlling transport
1614 * @xprt: generic transport
1626 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) in xs_set_srcport() argument
1628 if (transport->srcport == 0 && transport->xprt.reuseport) in xs_set_srcport()
1629 transport->srcport = xs_sock_getport(sock); in xs_set_srcport()
1632 static int xs_get_srcport(struct sock_xprt *transport) in xs_get_srcport() argument
1634 int port = transport->srcport; in xs_get_srcport()
1636 if (port == 0 && transport->xprt.resvport) in xs_get_srcport()
1641 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) in xs_next_srcport() argument
1643 if (transport->srcport != 0) in xs_next_srcport()
1644 transport->srcport = 0; in xs_next_srcport()
1645 if (!transport->xprt.resvport) in xs_next_srcport()
1651 static int xs_bind(struct sock_xprt *transport, struct socket *sock) in xs_bind() argument
1655 int port = xs_get_srcport(transport); in xs_bind()
1660 * transport->xprt.resvport == 0), don't bind. Let the local in xs_bind()
1670 * transport->xprt.resvport == 1) xs_get_srcport above will in xs_bind()
1676 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); in xs_bind()
1680 transport->xprt.addrlen); in xs_bind()
1682 transport->srcport = port; in xs_bind()
1686 port = xs_next_srcport(transport, port); in xs_bind()
1770 struct sock_xprt *transport, int family, int type, in xs_create_sock() argument
1779 dprintk("RPC: can't create %d transport socket (%d).\n", in xs_create_sock()
1788 err = xs_bind(transport, sock); in xs_create_sock()
1797 transport->file = filp; in xs_create_sock()
1807 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, in xs_local_finish_connecting() local
1810 if (!transport->inet) { in xs_local_finish_connecting()
1815 xs_save_old_callbacks(transport, sk); in xs_local_finish_connecting()
1826 transport->sock = sock; in xs_local_finish_connecting()
1827 transport->inet = sk; in xs_local_finish_connecting()
1832 xs_stream_start_connect(transport); in xs_local_finish_connecting()
1839 * @transport: socket transport to connect
1841 static int xs_local_setup_socket(struct sock_xprt *transport) in xs_local_setup_socket() argument
1843 struct rpc_xprt *xprt = &transport->xprt; in xs_local_setup_socket()
1852 "transport socket (%d).\n", -status); in xs_local_setup_socket()
1862 transport->file = filp; in xs_local_setup_socket()
1901 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_local_connect() local
1918 ret = xs_local_setup_socket(transport); in xs_local_connect()
1931 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, in xs_set_memalloc() local
1938 if (!transport->inet) in xs_set_memalloc()
1941 sk_set_memalloc(transport->inet); in xs_set_memalloc()
1945 * xs_enable_swap - Tag this transport as being used for swap.
1946 * @xprt: transport to tag
1948 * Take a reference to this transport on behalf of the rpc_clnt, and
1967 * xs_disable_swap - Untag this transport as being used for swap.
1968 * @xprt: transport to tag
2005 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_udp_finish_connecting() local
2007 if (!transport->inet) { in xs_udp_finish_connecting()
2012 xs_save_old_callbacks(transport, sk); in xs_udp_finish_connecting()
2022 transport->sock = sock; in xs_udp_finish_connecting()
2023 transport->inet = sk; in xs_udp_finish_connecting()
2036 struct sock_xprt *transport = in xs_udp_setup_socket() local
2038 struct rpc_xprt *xprt = &transport->xprt; in xs_udp_setup_socket()
2042 sock = xs_create_sock(xprt, transport, in xs_udp_setup_socket()
2059 xprt_unlock_connect(xprt, transport); in xs_udp_setup_socket()
2065 * @xprt: transport
2072 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_shutdown() local
2073 struct socket *sock = transport->sock; in xs_tcp_shutdown()
2074 int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE; in xs_tcp_shutdown()
2085 xs_reset_transport(transport); in xs_tcp_shutdown()
2092 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_set_socket_timeouts() local
2102 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); in xs_tcp_set_socket_timeouts()
2119 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_set_connect_timeout() local
2134 memcpy(&transport->tcp_timeout, &to, in xs_tcp_set_connect_timeout()
2135 sizeof(transport->tcp_timeout)); in xs_tcp_set_connect_timeout()
2136 xprt->timeout = &transport->tcp_timeout; in xs_tcp_set_connect_timeout()
2139 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); in xs_tcp_set_connect_timeout()
2145 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_finish_connecting() local
2148 if (!transport->inet) { in xs_tcp_finish_connecting()
2167 xs_save_old_callbacks(transport, sk); in xs_tcp_finish_connecting()
2183 transport->sock = sock; in xs_tcp_finish_connecting()
2184 transport->inet = sk; in xs_tcp_finish_connecting()
2194 xs_stream_start_connect(transport); in xs_tcp_finish_connecting()
2197 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); in xs_tcp_finish_connecting()
2201 xs_set_srcport(transport, sock); in xs_tcp_finish_connecting()
2210 transport->srcport = 0; in xs_tcp_finish_connecting()
2224 struct sock_xprt *transport = in xs_tcp_setup_socket() local
2226 struct socket *sock = transport->sock; in xs_tcp_setup_socket()
2227 struct rpc_xprt *xprt = &transport->xprt; in xs_tcp_setup_socket()
2231 sock = xs_create_sock(xprt, transport, in xs_tcp_setup_socket()
2265 xprt_unlock_connect(xprt, transport); in xs_tcp_setup_socket()
2290 xprt_unlock_connect(xprt, transport); in xs_tcp_setup_socket()
2296 * @xprt: pointer to transport structure
2310 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_connect() local
2313 WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); in xs_connect()
2315 if (transport->sock != NULL) { in xs_connect()
2321 xs_reset_transport(transport); in xs_connect()
2330 &transport->connect_worker, in xs_connect()
2334 static void xs_wake_disconnect(struct sock_xprt *transport) in xs_wake_disconnect() argument
2336 if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) in xs_wake_disconnect()
2337 xs_tcp_force_close(&transport->xprt); in xs_wake_disconnect()
2340 static void xs_wake_write(struct sock_xprt *transport) in xs_wake_write() argument
2342 if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) in xs_wake_write()
2343 xprt_write_space(&transport->xprt); in xs_wake_write()
2346 static void xs_wake_error(struct sock_xprt *transport) in xs_wake_error() argument
2350 if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) in xs_wake_error()
2352 mutex_lock(&transport->recv_mutex); in xs_wake_error()
2353 if (transport->sock == NULL) in xs_wake_error()
2355 if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) in xs_wake_error()
2357 sockerr = xchg(&transport->xprt_err, 0); in xs_wake_error()
2359 xprt_wake_pending_tasks(&transport->xprt, sockerr); in xs_wake_error()
2361 mutex_unlock(&transport->recv_mutex); in xs_wake_error()
2364 static void xs_wake_pending(struct sock_xprt *transport) in xs_wake_pending() argument
2366 if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) in xs_wake_pending()
2367 xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); in xs_wake_pending()
2372 struct sock_xprt *transport = container_of(work, in xs_error_handle() local
2375 xs_wake_disconnect(transport); in xs_error_handle()
2376 xs_wake_write(transport); in xs_error_handle()
2377 xs_wake_error(transport); in xs_error_handle()
2378 xs_wake_pending(transport); in xs_error_handle()
2418 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_udp_print_stats() local
2422 transport->srcport, in xs_udp_print_stats()
2442 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); in xs_tcp_print_stats() local
2450 transport->srcport, in xs_tcp_print_stats()
2510 struct sock_xprt *transport = in bc_sendto() local
2521 err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent); in bc_sendto()
2754 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2755 * @args: rpc transport creation arguments
2757 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2762 struct sock_xprt *transport; in xs_setup_local() local
2770 transport = container_of(xprt, struct sock_xprt, xprt); in xs_setup_local()
2782 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); in xs_setup_local()
2783 INIT_WORK(&transport->error_worker, xs_error_handle); in xs_setup_local()
2784 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); in xs_setup_local()
2796 ret = ERR_PTR(xs_local_setup_socket(transport)); in xs_setup_local()
2824 * xs_setup_udp - Set up transport to use a UDP socket
2825 * @args: rpc transport creation arguments
2832 struct sock_xprt *transport; in xs_setup_udp() local
2839 transport = container_of(xprt, struct sock_xprt, xprt); in xs_setup_udp()
2853 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); in xs_setup_udp()
2854 INIT_WORK(&transport->error_worker, xs_error_handle); in xs_setup_udp()
2855 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); in xs_setup_udp()
2900 * xs_setup_tcp - Set up transport to use a TCP socket
2901 * @args: rpc transport creation arguments
2908 struct sock_xprt *transport; in xs_setup_tcp() local
2919 transport = container_of(xprt, struct sock_xprt, xprt); in xs_setup_tcp()
2935 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); in xs_setup_tcp()
2936 INIT_WORK(&transport->error_worker, xs_error_handle); in xs_setup_tcp()
2937 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); in xs_setup_tcp()
2976 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2977 * @args: rpc transport creation arguments
2984 struct sock_xprt *transport; in xs_setup_bc_tcp() local
2992 transport = container_of(xprt, struct sock_xprt, xprt); in xs_setup_bc_tcp()
3035 transport->sock = bc_sock->sk_sock; in xs_setup_bc_tcp()
3036 transport->inet = bc_sock->sk_sk; in xs_setup_bc_tcp()