1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20import errno 21import logging 22import os 23import socket 24import sys 25 26from .TTransport import TTransportBase, TTransportException, TServerTransportBase 27 28logger = logging.getLogger(__name__) 29 30 31class TSocketBase(TTransportBase): 32 def _resolveAddr(self): 33 if self._unix_socket is not None: 34 return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, 35 self._unix_socket)] 36 else: 37 return socket.getaddrinfo(self.host, 38 self.port, 39 self._socket_family, 40 socket.SOCK_STREAM, 41 0, 42 socket.AI_PASSIVE) 43 44 def close(self): 45 if self.handle: 46 self.handle.close() 47 self.handle = None 48 49 50class TSocket(TSocketBase): 51 """Socket implementation of TTransport base.""" 52 53 def __init__(self, host='localhost', port=9090, unix_socket=None, 54 socket_family=socket.AF_UNSPEC, 55 socket_keepalive=False): 56 """Initialize a TSocket 57 58 @param host(str) The host to connect to. 59 @param port(int) The (TCP) port to connect to. 60 @param unix_socket(str) The filename of a unix socket to connect to. 61 (host and port will be ignored.) 62 @param socket_family(int) The socket family to use with this socket. 63 @param socket_keepalive(bool) enable TCP keepalive, default off. 64 """ 65 self.host = host 66 self.port = port 67 self.handle = None 68 self._unix_socket = unix_socket 69 self._timeout = None 70 self._socket_family = socket_family 71 self._socket_keepalive = socket_keepalive 72 73 def setHandle(self, h): 74 self.handle = h 75 76 def isOpen(self): 77 if self.handle is None: 78 return False 79 80 # this lets us cheaply see if the other end of the socket is still 81 # connected. if disconnected, we'll get EOF back (expressed as zero 82 # bytes of data) otherwise we'll get one byte or an error indicating 83 # we'd have to block for data. 84 # 85 # note that we're not doing this with socket.MSG_DONTWAIT because 1) 86 # it's linux-specific and 2) gevent-patched sockets hide EAGAIN from us 87 # when timeout is non-zero. 88 original_timeout = self.handle.gettimeout() 89 try: 90 self.handle.settimeout(0) 91 try: 92 peeked_bytes = self.handle.recv(1, socket.MSG_PEEK) 93 except (socket.error, OSError) as exc: # on modern python this is just BlockingIOError 94 if exc.errno in (errno.EWOULDBLOCK, errno.EAGAIN): 95 return True 96 return False 97 except ValueError: 98 # SSLSocket fails on recv with non-zero flags; fallback to the old behavior 99 return True 100 finally: 101 self.handle.settimeout(original_timeout) 102 103 # the length will be zero if we got EOF (indicating connection closed) 104 return len(peeked_bytes) == 1 105 106 def setTimeout(self, ms): 107 if ms is None: 108 self._timeout = None 109 else: 110 self._timeout = ms / 1000.0 111 112 if self.handle is not None: 113 self.handle.settimeout(self._timeout) 114 115 def _do_open(self, family, socktype): 116 return socket.socket(family, socktype) 117 118 @property 119 def _address(self): 120 return self._unix_socket if self._unix_socket else '%s:%d' % (self.host, self.port) 121 122 def open(self): 123 if self.handle: 124 raise TTransportException(type=TTransportException.ALREADY_OPEN, message="already open") 125 try: 126 addrs = self._resolveAddr() 127 except socket.gaierror as gai: 128 msg = 'failed to resolve sockaddr for ' + str(self._address) 129 logger.exception(msg) 130 raise TTransportException(type=TTransportException.NOT_OPEN, message=msg, inner=gai) 131 for family, socktype, _, _, sockaddr in addrs: 132 handle = self._do_open(family, socktype) 133 134 # TCP keep-alive 135 if self._socket_keepalive: 136 handle.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 137 138 handle.settimeout(self._timeout) 139 try: 140 handle.connect(sockaddr) 141 self.handle = handle 142 return 143 except socket.error: 144 handle.close() 145 logger.info('Could not connect to %s', sockaddr, exc_info=True) 146 msg = 'Could not connect to any of %s' % list(map(lambda a: a[4], 147 addrs)) 148 logger.error(msg) 149 raise TTransportException(type=TTransportException.NOT_OPEN, message=msg) 150 151 def read(self, sz): 152 try: 153 buff = self.handle.recv(sz) 154 except socket.error as e: 155 if (e.args[0] == errno.ECONNRESET and 156 (sys.platform == 'darwin' or sys.platform.startswith('freebsd'))): 157 # freebsd and Mach don't follow POSIX semantic of recv 158 # and fail with ECONNRESET if peer performed shutdown. 159 # See corresponding comment and code in TSocket::read() 160 # in lib/cpp/src/transport/TSocket.cpp. 161 self.close() 162 # Trigger the check to raise the END_OF_FILE exception below. 163 buff = '' 164 elif e.args[0] == errno.ETIMEDOUT: 165 raise TTransportException(type=TTransportException.TIMED_OUT, message="read timeout", inner=e) 166 else: 167 raise TTransportException(message="unexpected exception", inner=e) 168 if len(buff) == 0: 169 raise TTransportException(type=TTransportException.END_OF_FILE, 170 message='TSocket read 0 bytes') 171 return buff 172 173 def write(self, buff): 174 if not self.handle: 175 raise TTransportException(type=TTransportException.NOT_OPEN, 176 message='Transport not open') 177 sent = 0 178 have = len(buff) 179 while sent < have: 180 try: 181 plus = self.handle.send(buff) 182 if plus == 0: 183 raise TTransportException(type=TTransportException.END_OF_FILE, 184 message='TSocket sent 0 bytes') 185 sent += plus 186 buff = buff[plus:] 187 except socket.error as e: 188 raise TTransportException(message="unexpected exception", inner=e) 189 190 def flush(self): 191 pass 192 193 194class TServerSocket(TSocketBase, TServerTransportBase): 195 """Socket implementation of TServerTransport base.""" 196 197 def __init__(self, host=None, port=9090, unix_socket=None, socket_family=socket.AF_UNSPEC): 198 self.host = host 199 self.port = port 200 self._unix_socket = unix_socket 201 self._socket_family = socket_family 202 self.handle = None 203 self._backlog = 128 204 205 def setBacklog(self, backlog=None): 206 if not self.handle: 207 self._backlog = backlog 208 else: 209 # We cann't update backlog when it is already listening, since the 210 # handle has been created. 211 logger.warn('You have to set backlog before listen.') 212 213 def listen(self): 214 res0 = self._resolveAddr() 215 socket_family = self._socket_family == socket.AF_UNSPEC and socket.AF_INET6 or self._socket_family 216 for res in res0: 217 if res[0] is socket_family or res is res0[-1]: 218 break 219 220 # We need remove the old unix socket if the file exists and 221 # nobody is listening on it. 222 if self._unix_socket: 223 tmp = socket.socket(res[0], res[1]) 224 try: 225 tmp.connect(res[4]) 226 except socket.error as err: 227 eno, message = err.args 228 if eno == errno.ECONNREFUSED: 229 os.unlink(res[4]) 230 231 self.handle = socket.socket(res[0], res[1]) 232 self.handle.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 233 self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 234 if hasattr(self.handle, 'settimeout'): 235 self.handle.settimeout(None) 236 self.handle.bind(res[4]) 237 self.handle.listen(self._backlog) 238 239 def accept(self): 240 client, addr = self.handle.accept() 241 result = TSocket() 242 result.setHandle(client) 243 return result 244