1 (*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20 unit Thrift.Socket;
21
22 {$I Thrift.Defines.inc}
23 {$I-} // prevent annoying errors with default log delegate and no console
24
25 interface
26 {$IFNDEF OLD_SOCKETS} // not for OLD_SOCKETS
27
28 uses
29 Winapi.Windows, Winapi.Winsock2;
30
31 const
32 AI_PASSIVE = $00000001; // Socket address will be used in bind() call
33 AI_CANONNAME = $00000002; // Return canonical name in first ai_canonname
34 AI_NUMERICHOST = $00000004; // Nodename must be a numeric address string
35 AI_NUMERICSERV = $00000008; // Servicename must be a numeric port number
36
37 AI_ALL = $00000100; // Query both IP6 and IP4 with AI_V4MAPPED
38 AI_ADDRCONFIG = $00000400; // Resolution only if global address configured
39 AI_V4MAPPED = $00000800; // On v6 failure, query v4 and convert to V4MAPPED format
40
41 AI_NON_AUTHORITATIVE = $00004000; // LUP_NON_AUTHORITATIVE
42 AI_SECURE = $00008000; // LUP_SECURE
43 AI_RETURN_PREFERRED_NAMES = $00010000; // LUP_RETURN_PREFERRED_NAMES
44
45 AI_FQDN = $00020000; // Return the FQDN in ai_canonname
46 AI_FILESERVER = $00040000; // Resolving fileserver name resolution
47
48 type
49 PAddrInfoA = ^TAddrInfoA;
50 TAddrInfoA = record
51 ai_flags: Integer;
52 ai_family: Integer;
53 ai_socktype: Integer;
54 ai_protocol: Integer;
55 ai_addrlen: NativeUInt;
56 ai_canonname: PAnsiChar;
57 ai_addr: PSockAddr;
58 ai_next: PAddrInfoA;
59 end;
60
61 PAddrInfoW = ^TAddrInfoW;
62 TAddrInfoW = record
63 ai_flags: Integer;
64 ai_family: Integer;
65 ai_socktype: Integer;
66 ai_protocol: Integer;
67 ai_addrlen: NativeUInt;
68 ai_canonname: PChar;
69 ai_addr: PSockAddr;
70 ai_next: PAddrInfoW;
71 end;
72
73 TAddressFamily = USHORT;
74
75 TIn6Addr = record
76 case Integer of
77 0: (_Byte: array[0..15] of UCHAR);
78 1: (_Word: array[0..7] of USHORT);
79 end;
80
81 TScopeId = record
82 public
83 Value: ULONG;
84 strict private
GetBitFieldnull85 function GetBitField(Loc: Integer): Integer; inline;
86 procedure SetBitField(Loc: Integer; const aValue: Integer); inline;
87 public
88 property Zone: Integer index $0028 read GetBitField write SetBitField;
89 property Level: Integer index $2804 read GetBitField write SetBitField;
90 end;
91
92 TSockAddrIn6 = record
93 sin6_family: TAddressFamily;
94 sin6_port: USHORT;
95 sin6_flowinfo: ULONG;
96 sin6_addr: TIn6Addr;
97 case Integer of
98 0: (sin6_scope_id: ULONG);
99 1: (sin6_scope_struct: TScopeId);
100 end;
101 PSockAddrIn6 = ^TSockAddrIn6;
102
103 const
104 NI_NOFQDN = $01; // Only return nodename portion for local hosts
105 NI_NUMERICHOST = $02; // Return numeric form of the host's address
106 NI_NAMEREQD = $04; // Error if the host's name not in DNS
107 NI_NUMERICSERV = $08; // Return numeric form of the service (port #)
108 NI_DGRAM = $10; // Service is a datagram service
109
110 NI_MAXHOST = 1025; // Max size of a fully-qualified domain name
111 NI_MAXSERV = 32; // Max size of a service name
112
getaddrinfonull113 function getaddrinfo(pNodeName, pServiceName: PAnsiChar; const pHints: TAddrInfoA; var ppResult: PAddrInfoA): Integer; stdcall;
GetAddrInfoWnull114 function GetAddrInfoW(pNodeName, pServiceName: PWideChar; const pHints: TAddrInfoW; var ppResult: PAddrInfoW): Integer; stdcall;
115 procedure freeaddrinfo(pAddrInfo: PAddrInfoA); stdcall;
116 procedure FreeAddrInfoW(pAddrInfo: PAddrInfoW); stdcall;
getnameinfonull117 function getnameinfo(const pSockaddr: TSockAddr; SockaddrLength: Integer; pNodeBuffer: PAnsiChar; NodeBufferSize: DWORD; pServiceBuffer: PAnsiChar;
118 ServiceBufferSize: DWORD; Flags: Integer): Integer; stdcall;
GetNameInfoWnull119 function GetNameInfoW(const pSockaddr: TSockAddr; SockaddrLength: Integer; pNodeBuffer: PWideChar; NodeBufferSize: DWORD; pServiceBuffer: PWideChar;
120 ServiceBufferSize: DWORD; Flags: Integer): Integer; stdcall;
121
122 type
123 TSmartPointerDestroyer<T> = reference to procedure(Value: T);
124
Tnull125 ISmartPointer<T> = reference to function: T;
126
127 TSmartPointer<T> = class(TInterfacedObject, ISmartPointer<T>)
128 strict private
129 FValue: T;
130 FDestroyer: TSmartPointerDestroyer<T>;
131 public
132 constructor Create(AValue: T; ADestroyer: TSmartPointerDestroyer<T>);
133 destructor Destroy; override;
Invokenull134 function Invoke: T;
135 end;
136
137 TBaseSocket = class abstract
138 public type
139 TLogDelegate = reference to procedure( const str: string);
140 strict private
141 FPort: Integer;
142 FSocket: Winapi.Winsock2.TSocket;
143 FSendTimeout,
144 FRecvTimeout: Longword;
145 FKeepAlive: Boolean;
146 FLogDelegate: TLogDelegate;
147 class constructor Create;
148 class destructor Destroy;
149 class procedure DefaultLogDelegate(const Str: string);
150 strict protected type
151 IGetAddrInfoWrapper = interface
Initnull152 function Init: Integer;
GetResnull153 function GetRes: PAddrInfoW;
154 property Res: PAddrInfoW read GetRes;
155 end;
156 TGetAddrInfoWrapper = class(TInterfacedObject, IGetAddrInfoWrapper)
157 strict private
158 FNode: string;
159 FService: string;
160 FHints,
161 FRes: PAddrInfoW;
162 public
163 constructor Create(ANode, AService: string; AHints: PAddrInfoW);
164 destructor Destroy; override;
Initnull165 function Init: Integer;
GetResnull166 function GetRes: PAddrInfoW;
167 property Res: PAddrInfoW read GetRes;
168 end;
169 strict protected
170 procedure CommonInit; virtual;
CreateSocketnull171 function CreateSocket(AAddress: string; APort: Integer): IGetAddrInfoWrapper;
172 procedure SetRecvTimeout(ARecvTimeout: Longword); virtual;
173 procedure SetSendTimeout(ASendTimeout: Longword); virtual;
174 procedure SetKeepAlive(AKeepAlive: Boolean); virtual;
175 procedure SetSocket(ASocket: Winapi.Winsock2.TSocket);
176 property LogDelegate: TLogDelegate read FLogDelegate;
177 public
178 //
179 // Constructs a new socket. Note that this does NOT actually connect the
180 // socket.
181 //
182 constructor Create(ALogDelegate: TLogDelegate = nil); overload;
183 constructor Create(APort: Integer; ALogDelegate: TLogDelegate = nil); overload;
184
185 //
186 // Destroys the socket object, closing it if necessary.
187 //
188 destructor Destroy; override;
189
190 //
191 // Shuts down communications on the socket
192 //
193 procedure Close; virtual;
194
195 // The port that the socket is connected to
196 property Port: Integer read FPort write FPort;
197
198 // The receive timeout
199 property RecvTimeout: Longword read FRecvTimeout write SetRecvTimeout;
200
201 // The send timeout
202 property SendTimeout: Longword read FSendTimeout write SetSendTimeout;
203
204 // Set SO_KEEPALIVE
205 property KeepAlive: Boolean read FKeepAlive write SetKeepAlive;
206
207 // The underlying socket descriptor
208 property Socket: Winapi.Winsock2.TSocket read FSocket write SetSocket;
209 end;
210
211 TSocket = class(TBaseSocket)
212 strict private type
213 TCachedPeerAddr = record
214 case Integer of
215 0: (ipv4: TSockAddrIn);
216 1: (ipv6: TSockAddrIn6);
217 end;
218 strict private
219 FHost: string;
220 FPeerHost: string;
221 FPeerAddress: string;
222 FPeerPort: Integer;
223 FInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
224 FConnTimeout: Longword;
225 FLingerOn: Boolean;
226 FLingerVal: Integer;
227 FNoDelay: Boolean;
228 FMaxRecvRetries: Longword;
229 FCachedPeerAddr: TCachedPeerAddr;
230 procedure InitPeerInfo;
231 procedure OpenConnection(Res: TBaseSocket.IGetAddrInfoWrapper);
232 procedure LocalOpen;
233 procedure SetGenericTimeout(S: Winapi.Winsock2.TSocket; Timeout: Longword; OptName: Integer);
GetIsOpennull234 function GetIsOpen: Boolean;
235 procedure SetNoDelay(ANoDelay: Boolean);
GetSocketInfonull236 function GetSocketInfo: string;
GetPeerHostnull237 function GetPeerHost: string;
GetPeerAddressnull238 function GetPeerAddress: string;
GetPeerPortnull239 function GetPeerPort: Integer;
GetOriginnull240 function GetOrigin: string;
241 strict protected
242 procedure CommonInit; override;
243 procedure SetRecvTimeout(ARecvTimeout: Longword); override;
244 procedure SetSendTimeout(ASendTimeout: Longword); override;
245 procedure SetKeepAlive(AKeepAlive: Boolean); override;
246 public
247 //
248 // Constructs a new socket. Note that this does NOT actually connect the
249 // socket.
250 //
251 constructor Create(ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
252
253 //
254 // Constructs a new socket. Note that this does NOT actually connect the
255 // socket.
256 //
257 // @param host An IP address or hostname to connect to
258 // @param port The port to connect on
259 //
260 constructor Create(AHost: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
261
262 //
263 // Constructor to create socket from socket descriptor.
264 //
265 constructor Create(ASocket: Winapi.Winsock2.TSocket; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
266
267 //
268 // Constructor to create socket from socket descriptor that
269 // can be interrupted safely.
270 //
271 constructor Create(ASocket: Winapi.Winsock2.TSocket; AInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
272 ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
273
274 //
275 // Creates and opens the socket
276 //
277 // @throws ETransportationException If the socket could not connect
278 //
279 procedure Open;
280
281 //
282 // Shuts down communications on the socket
283 //
284 procedure Close; override;
285
286 //
287 // Reads from the underlying socket.
288 // \returns the number of bytes read or 0 indicates EOF
289 // \throws TTransportException of types:
290 // Interrupted means the socket was interrupted
291 // out of a blocking call
292 // NotOpen means the socket has been closed
293 // TimedOut means the receive timeout expired
294 // Unknown means something unexpected happened
295 //
Readnull296 function Read(var Buf; Len: Integer): Integer;
297
298 //
299 // Writes to the underlying socket. Loops until done or fail.
300 //
301 procedure Write(const Buf; Len: Integer);
302
303 //
304 // Writes to the underlying socket. Does single send() and returns result.
305 //
WritePartialnull306 function WritePartial(const Buf; Len: Integer): Integer;
307
308 //
309 // Returns a cached copy of the peer address.
310 //
GetCachedAddressnull311 function GetCachedAddress(out Len: Integer): PSockAddr;
312
313 //
314 // Set a cache of the peer address (used when trivially available: e.g.
315 // accept() or connect()). Only caches IPV4 and IPV6; unset for others.
316 //
317 procedure SetCachedAddress(const Addr: TSockAddr; Len: Integer);
318
319 //
320 // Controls whether the linger option is set on the socket.
321 //
322 // @param on Whether SO_LINGER is on
323 // @param linger If linger is active, the number of seconds to linger for
324 //
325 procedure SetLinger(LingerOn: Boolean; LingerVal: Integer);
326
327 //
328 // Calls select() on the socket to see if there is more data available.
329 //
Peeknull330 function Peek: Boolean;
331
332 // Whether the socket is alive
333 property IsOpen: Boolean read GetIsOpen;
334
335 // The host that the socket is connected to
336 property Host: string read FHost write FHost;
337
338 // Whether to enable or disable Nagle's algorithm
339 property NoDelay: Boolean read FNoDelay write SetNoDelay;
340
341 // Connect timeout
342 property ConnTimeout: Longword read FConnTimeout write FConnTimeout;
343
344 // The max number of recv retries in the case of a WSAEWOULDBLOCK
345 property MaxRecvRetries: Longword read FMaxRecvRetries write FMaxRecvRetries;
346
347 // Socket information formatted as a string <Host: x Port: x>
348 property SocketInfo: string read GetSocketInfo;
349
350 // The DNS name of the host to which the socket is connected
351 property PeerHost: string read GetPeerHost;
352
353 // The address of the host to which the socket is connected
354 property PeerAddress: string read GetPeerAddress;
355
356 // The port of the host to which the socket is connected
357 property PeerPort: Integer read GetPeerPort;
358
359 // The origin the socket is connected to
360 property Origin: string read GetOrigin;
361 end;
362
363 TServerSocketFunc = reference to procedure(sock: Winapi.Winsock2.TSocket);
364
365 TServerSocket = class(TBaseSocket)
366 strict private
367 FAddress: string;
368 FAcceptBacklog,
369 FRetryLimit,
370 FRetryDelay,
371 FTcpSendBuffer,
372 FTcpRecvBuffer: Integer;
373 FAcceptTimeout: Longword;
374 FListening,
375 FInterruptableChildren: Boolean;
376 FInterruptSockWriter, // is notified on Interrupt()
377 FInterruptSockReader, // is used in select with FSocket for interruptability
378 FChildInterruptSockWriter: Winapi.Winsock2.TSocket; // is notified on InterruptChildren()
379 FChildInterruptSockReader: ISmartPointer<Winapi.Winsock2.TSocket>; // if FnterruptableChildren this is shared with child TSockets
380 FListenCallback,
381 FAcceptCallback: TServerSocketFunc;
382 function CreateSocketObj(Client: Winapi.Winsock2.TSocket): TSocket;
383 procedure Notify(NotifySocket: Winapi.Winsock2.TSocket);
384 procedure SetInterruptableChildren(AValue: Boolean);
385 strict protected
386 procedure CommonInit; override;
387 public const
388 DEFAULT_BACKLOG = 1024;
389 public
390 //
391 // Constructor.
392 //
393 // @param port Port number to bind to
394 //
395 constructor Create(APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
396
397 //
398 // Constructor.
399 //
400 // @param port Port number to bind to
401 // @param sendTimeout Socket send timeout
402 // @param recvTimeout Socket receive timeout
403 //
404 constructor Create(APort: Integer; ASendTimeout, ARecvTimeout: Longword; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
405
406 //
407 // Constructor.
408 //
409 // @param address Address to bind to
410 // @param port Port number to bind to
411 //
412 constructor Create(AAddress: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil); overload;
413
414 procedure Listen;
415 function Accept: TSocket;
416 procedure Interrupt;
417 procedure InterruptChildren;
418 procedure Close; override;
419
420 property AcceptBacklog: Integer read FAcceptBacklog write FAcceptBacklog;
421 property AcceptTimeout: Longword read FAcceptTimeout write FAcceptTimeout;
422 property RetryLimit: Integer read FRetryLimit write FRetryLimit;
423 property RetryDelay: Integer read FRetryDelay write FRetryDelay;
424 property TcpSendBuffer: Integer read FTcpSendBuffer write FTcpSendBuffer;
425 property TcpRecvBuffer: Integer read FTcpRecvBuffer write FTcpRecvBuffer;
426
427 // When enabled (the default), new children TSockets will be constructed so
428 // they can be interrupted by TServerTransport.InterruptChildren().
429 // This is more expensive in terms of system calls (poll + recv) however
430 // ensures a connected client cannot interfere with TServer.Stop().
431 //
432 // When disabled, TSocket children do not incur an additional poll() call.
433 // Server-side reads are more efficient, however a client can interfere with
434 // the server's ability to shutdown properly by staying connected.
435 //
436 // Must be called before listen(); mode cannot be switched after that.
437 // \throws EPropertyError if listen() has been called
438 property InterruptableChildren: Boolean read FInterruptableChildren write SetInterruptableChildren;
439
440 // listenCallback gets called just before listen, and after all Thrift
441 // setsockopt calls have been made. If you have custom setsockopt
442 // things that need to happen on the listening socket, this is the place to do it.
443 property ListenCallback: TServerSocketFunc read FListenCallback write FListenCallback;
444
445 // acceptCallback gets called after each accept call, on the newly created socket.
446 // It is called after all Thrift setsockopt calls have been made. If you have
447 // custom setsockopt things that need to happen on the accepted
448 // socket, this is the place to do it.
449 property AcceptCallback: TServerSocketFunc read FAcceptCallback write FAcceptCallback;
450 end;
451
452 {$ENDIF} // not for OLD_SOCKETS
453 implementation
454 {$IFNDEF OLD_SOCKETS} // not for OLD_SOCKETS
455
456 uses
457 System.SysUtils, System.Math, System.DateUtils, Thrift.Transport;
458
459 constructor TBaseSocket.TGetAddrInfoWrapper.Create(ANode, AService: string; AHints: PAddrInfoW);
460 begin
461 inherited Create;
462 FNode := ANode;
463 FService := AService;
464 FHints := AHints;
465 FRes := nil;
466 end;
467
468 destructor TBaseSocket.TGetAddrInfoWrapper.Destroy;
469 begin
470 if Assigned(FRes) then
471 FreeAddrInfoW(FRes);
472 inherited Destroy;
473 end;
474
TGetAddrInfoWrappernull475 function TBaseSocket.TGetAddrInfoWrapper.Init: Integer;
476 begin
477 if FRes = nil then
478 Exit(GetAddrInfoW(@FNode[1], @FService[1], FHints^, FRes));
479 Result := 0;
480 end;
481
TBaseSocket.TGetAddrInfoWrapper.GetResnull482 function TBaseSocket.TGetAddrInfoWrapper.GetRes: PAddrInfoW;
483 begin
484 Result := FRes;
485 end;
486
487 procedure DestroyerOfFineSockets(ssock: Winapi.Winsock2.TSocket);
488 begin
489 closesocket(ssock);
490 end;
491
GetBitFieldnull492 function TScopeId.GetBitField(Loc: Integer): Integer;
493 begin
494 Result := (Value shr (Loc shr 8)) and ((1 shl (Loc and $FF)) - 1);
495 end;
496
497 procedure TScopeId.SetBitField(Loc: Integer; const aValue: Integer);
498 begin
499 Value := (Value and ULONG((not ((1 shl (Loc and $FF)) - 1)))) or ULONG(aValue shl (Loc shr 8));
500 end;
501
getaddrinfonull502 function getaddrinfo; external 'ws2_32.dll' name 'getaddrinfo';
GetAddrInfoWnull503 function GetAddrInfoW; external 'ws2_32.dll' name 'GetAddrInfoW';
504 procedure freeaddrinfo; external 'ws2_32.dll' name 'freeaddrinfo';
505 procedure FreeAddrInfoW; external 'ws2_32.dll' name 'FreeAddrInfoW';
getnameinfonull506 function getnameinfo; external 'ws2_32.dll' name 'getnameinfo';
GetNameInfoWnull507 function GetNameInfoW; external 'ws2_32.dll' name 'GetNameInfoW';
508
509 constructor TSmartPointer<T>.Create(AValue: T; ADestroyer: TSmartPointerDestroyer<T>);
510 begin
511 inherited Create;
512 FValue := AValue;
513 FDestroyer := ADestroyer;
514 end;
515
516 destructor TSmartPointer<T>.Destroy;
517 begin
518 if Assigned(FDestroyer) then FDestroyer(FValue);
519 inherited Destroy;
520 end;
521
TSmartPointernull522 function TSmartPointer<T>.Invoke: T;
523 begin
524 Result := FValue;
525 end;
526
527 class constructor TBaseSocket.Create;
528 var
529 Version: WORD;
530 Data: WSAData;
531 Error: Integer;
532 begin
533 Version := $0202;
534 FillChar(Data, SizeOf(Data), 0);
535 Error := WSAStartup(Version, Data);
536 if Error <> 0 then
537 raise Exception.Create('Failed to initialize Winsock.');
538 end;
539
540 class destructor TBaseSocket.Destroy;
541 begin
542 WSACleanup;
543 end;
544
545 class procedure TBaseSocket.DefaultLogDelegate(const Str: string);
546 var
547 OutStr: string;
548 begin
549 OutStr := Format('Thrift: %s %s', [DateTimeToStr(Now, TFormatSettings.Create), Str]);
550 try
551 Writeln(OutStr);
552 if IoResult <> 0 then OutputDebugString(PChar(OutStr));
553 except
554 OutputDebugString(PChar(OutStr));
555 end;
556 end;
557
558 procedure TBaseSocket.CommonInit;
559 begin
560 FSocket := INVALID_SOCKET;
561 FPort := 0;
562 FSendTimeout := 0;
563 FRecvTimeout := 0;
564 FKeepAlive := False;
565 FLogDelegate := DefaultLogDelegate;
566 end;
567
TBaseSocket.CreateSocketnull568 function TBaseSocket.CreateSocket(AAddress: string; APort: Integer): IGetAddrInfoWrapper;
569 var
570 Hints: TAddrInfoW;
571 Res: PAddrInfoW;
572 ThePort: array[0..5] of Char;
573 Error: Integer;
574 begin
575 FillChar(Hints, SizeOf(Hints), 0);
576 Hints.ai_family := PF_UNSPEC;
577 Hints.ai_socktype := SOCK_STREAM;
578 Hints.ai_flags := AI_PASSIVE;
579 StrFmt(ThePort, '%d', [FPort]);
580
581 Result := TGetAddrInfoWrapper.Create(AAddress, ThePort, @Hints);
582 Error := Result.Init;
583 if Error <> 0 then begin
584 LogDelegate(Format('GetAddrInfoW %d: %s', [Error, SysErrorMessage(Error)]));
585 Close;
586 raise TTransportExceptionNotOpen.Create('Could not resolve host for server socket.');
587 end;
588
589 // Pick the ipv6 address first since ipv4 addresses can be mapped
590 // into ipv6 space.
591 Res := Result.Res;
592 while Assigned(Res) do begin
593 if (Res^.ai_family = AF_INET6) or (not Assigned(Res^.ai_next)) then
594 Break;
595 Res := Res^.ai_next;
596 end;
597
598 FSocket := Winapi.Winsock2.socket(Res^.ai_family, Res^.ai_socktype, Res^.ai_protocol);
599 if FSocket = INVALID_SOCKET then begin
600 Error := WSAGetLastError;
601 LogDelegate(Format('TBaseSocket.CreateSocket() socket() %s', [SysErrorMessage(Error)]));
602 Close;
603 raise TTransportExceptionNotOpen.Create(Format('socket(): %s', [SysErrorMessage(Error)]));
604 end;
605 end;
606
607 procedure TBaseSocket.SetRecvTimeout(ARecvTimeout: Longword);
608 begin
609 FRecvTimeout := ARecvTimeout;
610 end;
611
612 procedure TBaseSocket.SetSendTimeout(ASendTimeout: Longword);
613 begin
614 FSendTimeout := ASendTimeout;
615 end;
616
617 procedure TBaseSocket.SetKeepAlive(AKeepAlive: Boolean);
618 begin
619 FKeepAlive := AKeepAlive;
620 end;
621
622 procedure TBaseSocket.SetSocket(ASocket: Winapi.Winsock2.TSocket);
623 begin
624 if FSocket <> INVALID_SOCKET then
625 Close;
626 FSocket := ASocket;
627 end;
628
629 constructor TBaseSocket.Create(ALogDelegate: TLogDelegate);
630 begin
631 inherited Create;
632 CommonInit;
633 if Assigned(ALogDelegate) then FLogDelegate := ALogDelegate;
634 end;
635
636 constructor TBaseSocket.Create(APort: Integer; ALogDelegate: TLogDelegate);
637 begin
638 inherited Create;
639 CommonInit;
640 FPort := APort;
641 if Assigned(ALogDelegate) then FLogDelegate := ALogDelegate;
642 end;
643
644 destructor TBaseSocket.Destroy;
645 begin
646 Close;
647 inherited Destroy;
648 end;
649
650 procedure TBaseSocket.Close;
651 begin
652 if FSocket <> INVALID_SOCKET then begin
653 shutdown(FSocket, SD_BOTH);
654 closesocket(FSocket);
655 end;
656 FSocket := INVALID_SOCKET;
657 end;
658
659 procedure TSocket.InitPeerInfo;
660 begin
661 FCachedPeerAddr.ipv4.sin_family := AF_UNSPEC;
662 FPeerHost := '';
663 FPeerAddress := '';
664 FPeerPort := 0;
665 end;
666
667 procedure TSocket.CommonInit;
668 begin
669 inherited CommonInit;
670 FHost := '';
671 FInterruptListener := nil;
672 FConnTimeout := 0;
673 FLingerOn := True;
674 FLingerVal := 0;
675 FNoDelay := True;
676 FMaxRecvRetries := 5;
677 InitPeerInfo;
678 end;
679
680 procedure TSocket.OpenConnection(Res: TBaseSocket.IGetAddrInfoWrapper);
681 label
682 Done;
683 var
684 ErrnoCopy: Integer;
685 Ret,
686 Ret2: Integer;
687 Fds: TFdSet;
688 TVal: TTimeVal;
689 PTVal: PTimeVal;
690 Val,
691 Lon: Integer;
692 One,
693 Zero: Cardinal;
694 begin
695 if SendTimeout > 0 then SetSendTimeout(SendTimeout);
696 if RecvTimeout > 0 then SetRecvTimeout(RecvTimeout);
697 if KeepAlive then SetKeepAlive(KeepAlive);
698 SetLinger(FLingerOn, FLingerVal);
699 SetNoDelay(FNoDelay);
700
701 // Set the socket to be non blocking for connect if a timeout exists
702 Zero := 0;
703 if FConnTimeout > 0 then begin
704 One := 1;
705 if ioctlsocket(Socket, Integer(FIONBIO), One) = SOCKET_ERROR then begin
706 ErrnoCopy := WSAGetLastError;
707 LogDelegate(Format('TSocket.OpenConnection() ioctlsocket() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
708 raise TTransportExceptionNotOpen.Create(Format('ioctlsocket() failed: %s', [SysErrorMessage(ErrnoCopy)]));
709 end;
710 end
711 else begin
712 if ioctlsocket(Socket, Integer(FIONBIO), Zero) = SOCKET_ERROR then begin
713 ErrnoCopy := WSAGetLastError;
714 LogDelegate(Format('TSocket.OpenConnection() ioctlsocket() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
715 raise TTransportExceptionNotOpen.Create(Format('ioctlsocket() failed: %s', [SysErrorMessage(ErrnoCopy)]));
716 end;
717 end;
718
719 Ret := connect(Socket, Res.Res^.ai_addr^, Res.Res^.ai_addrlen);
720 if Ret = 0 then goto Done;
721
722 ErrnoCopy := WSAGetLastError;
723 if (ErrnoCopy <> WSAEINPROGRESS) and (ErrnoCopy <> WSAEWOULDBLOCK) then begin
724 LogDelegate(Format('TSocket.OpenConnection() connect() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
725 raise TTransportExceptionNotOpen.Create(Format('connect() failed: %s', [SysErrorMessage(ErrnoCopy)]));
726 end;
727
728 FD_ZERO(Fds);
729 _FD_SET(Socket, Fds);
730 if FConnTimeout > 0 then begin
731 TVal.tv_sec := FConnTimeout div 1000;
732 TVal.tv_usec := (FConnTimeout mod 1000) * 1000;
733 PTVal := @TVal;
734 end
735 else
736 PTVal := nil;
737 Ret := select(1, nil, @Fds, nil, PTVal);
738
739 if Ret > 0 then begin
740 // Ensure the socket is connected and that there are no errors set
741 Lon := SizeOf(Val);
742 Ret2 := getsockopt(Socket, SOL_SOCKET, SO_ERROR, @Val, Lon);
743 if Ret2 = SOCKET_ERROR then begin
744 ErrnoCopy := WSAGetLastError;
745 LogDelegate(Format('TSocket.OpenConnection() getsockopt() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
746 raise TTransportExceptionNotOpen.Create(Format('getsockopt(): %s', [SysErrorMessage(ErrnoCopy)]));
747 end;
748 // no errors on socket, go to town
749 if Val = 0 then goto Done;
750 LogDelegate(Format('TSocket.OpenConnection() error on socket (after select()) ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
751 raise TTransportExceptionNotOpen.Create(Format('socket OpenConnection() error: %s', [SysErrorMessage(Val)]));
752 end
753 else if Ret = 0 then begin
754 // socket timed out
755 LogDelegate(Format('TSocket.OpenConnection() timed out ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
756 raise TTransportExceptionNotOpen.Create('OpenConnection() timed out');
757 end
758 else begin
759 // error on select()
760 ErrnoCopy := WSAGetLastError;
761 LogDelegate(Format('TSocket.OpenConnection() select() ', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
762 raise TTransportExceptionNotOpen.Create(Format('select() failed: %s', [SysErrorMessage(ErrnoCopy)]));
763 end;
764
765 Done:
766 // Set socket back to normal mode (blocking)
767 ioctlsocket(Socket, Integer(FIONBIO), Zero);
768 SetCachedAddress(Res.Res^.ai_addr^, Res.Res^.ai_addrlen);
769 end;
770
771 procedure TSocket.LocalOpen;
772 var
773 Res: TBaseSocket.IGetAddrInfoWrapper;
774 begin
775 if IsOpen then Exit;
776
777 // Validate port number
778 if (Port < 0) or (Port > $FFFF) then
779 raise TTransportExceptionBadArgs.Create('Specified port is invalid');
780
781 Res := CreateSocket(Host, Port);
782
783 OpenConnection(Res);
784 end;
785
786 procedure TSocket.SetGenericTimeout(S: Winapi.Winsock2.TSocket; Timeout: Longword; OptName: Integer);
787 var
788 Time: DWORD;
789 begin
790 if S = INVALID_SOCKET then
791 Exit;
792
793 Time := Timeout;
794
795 if setsockopt(S, SOL_SOCKET, OptName, @Time, SizeOf(Time)) = SOCKET_ERROR then
796 LogDelegate(Format('SetGenericTimeout() setsockopt() %s', [SysErrorMessage(WSAGetLastError)]));
797 end;
798
GetIsOpennull799 function TSocket.GetIsOpen: Boolean;
800 begin
801 Result := Socket <> INVALID_SOCKET;
802 end;
803
804 procedure TSocket.SetNoDelay(ANoDelay: Boolean);
805 var
806 V: Integer;
807 begin
808 FNoDelay := ANoDelay;
809 if Socket = INVALID_SOCKET then
810 Exit;
811
812 V := IfThen(FNoDelay, 1, 0);
813 if setsockopt(Socket, IPPROTO_TCP, TCP_NODELAY, @V, SizeOf(V)) = SOCKET_ERROR then
814 LogDelegate(Format('TSocket.SetNoDelay() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
815 end;
816
GetSocketInfonull817 function TSocket.GetSocketInfo: string;
818 begin
819 if (FHost = '') or (Port = 0) then
820 Result := '<Host: ' + GetPeerAddress + ' Port: ' + GetPeerPort.ToString + '>'
821 else
822 Result := '<Host: ' + FHost + ' Port: ' + Port.ToString + '>';
823 end;
824
TSocket.GetPeerHostnull825 function TSocket.GetPeerHost: string;
826 var
827 Addr: TSockAddrStorage;
828 AddrPtr: PSockAddr;
829 AddrLen: Integer;
830 ClientHost: array[0..NI_MAXHOST-1] of Char;
831 ClientService: array[0..NI_MAXSERV-1] of Char;
832 begin
833 if FPeerHost = '' then begin
834 if Socket = INVALID_SOCKET then
835 Exit(FPeerHost);
836
837 AddrPtr := GetCachedAddress(AddrLen);
838 if AddrPtr = nil then begin
839 AddrLen := SizeOf(Addr);
840 if getpeername(Socket, PSockAddr(@Addr)^, AddrLen) <> 0 then
841 Exit(FPeerHost);
842 AddrPtr := PSockAddr(@Addr);
843 SetCachedAddress(AddrPtr^, AddrLen);
844 end;
845
846 GetNameInfoW(AddrPtr^, AddrLen, ClientHost, NI_MAXHOST, ClientService, NI_MAXSERV, 0);
847 FPeerHost := ClientHost;
848 end;
849 Result := FPeerHost;
850 end;
851
TSocket.GetPeerAddressnull852 function TSocket.GetPeerAddress: string;
853 var
854 Addr: TSockAddrStorage;
855 AddrPtr: PSockAddr;
856 AddrLen: Integer;
857 ClientHost: array[0..NI_MAXHOST-1] of Char;
858 ClientService: array[0..NI_MAXSERV-1] of Char;
859 begin
860 if FPeerAddress = '' then begin
861 if Socket = INVALID_SOCKET then
862 Exit(FPeerAddress);
863
864 AddrPtr := GetCachedAddress(AddrLen);
865 if AddrPtr = nil then begin
866 AddrLen := SizeOf(Addr);
867 if getpeername(Socket, PSockAddr(@Addr)^, AddrLen) <> 0 then
868 Exit(FPeerHost);
869 AddrPtr := PSockAddr(@Addr);
870 SetCachedAddress(AddrPtr^, AddrLen);
871 end;
872
873 GetNameInfoW(AddrPtr^, AddrLen, ClientHost, NI_MAXHOST, ClientService, NI_MAXSERV, NI_NUMERICHOST or NI_NUMERICSERV);
874 FPeerAddress := ClientHost;
875 TryStrToInt(ClientService, FPeerPort);
876 end;
877 Result := FPeerAddress
878 end;
879
TSocket.GetPeerPortnull880 function TSocket.GetPeerPort: Integer;
881 begin
882 GetPeerAddress;
883 Result := FPeerPort;
884 end;
885
GetOriginnull886 function TSocket.GetOrigin: string;
887 begin
888 Result := GetPeerHost + ':' + GetPeerPort.ToString;
889 end;
890
891 procedure TSocket.SetRecvTimeout(ARecvTimeout: Longword);
892 begin
893 inherited SetRecvTimeout(ARecvTimeout);
894 SetGenericTimeout(Socket, ARecvTimeout, SO_RCVTIMEO);
895 end;
896
897 procedure TSocket.SetSendTimeout(ASendTimeout: Longword);
898 begin
899 inherited SetSendTimeout(ASendTimeout);
900 SetGenericTimeout(Socket, ASendTimeout, SO_SNDTIMEO);
901 end;
902
903 procedure TSocket.SetKeepAlive(AKeepAlive: Boolean);
904 var
905 Value: Integer;
906 begin
907 inherited SetKeepAlive(AKeepAlive);
908
909 Value := IfThen(KeepAlive, 1, 0);
910 if setsockopt(Socket, SOL_SOCKET, SO_KEEPALIVE, @Value, SizeOf(Value)) = SOCKET_ERROR then
911 LogDelegate(Format('TSocket.SetKeepAlive() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
912 end;
913
914 constructor TSocket.Create(ALogDelegate: TBaseSocket.TLogDelegate = nil);
915 begin
916 // Not needed, but just a placeholder
917 inherited Create(ALogDelegate);
918 end;
919
920 constructor TSocket.Create(AHost: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate);
921 begin
922 inherited Create(APort, ALogDelegate);
923 FHost := AHost;
924 end;
925
926 constructor TSocket.Create(ASocket: Winapi.Winsock2.TSocket; ALogDelegate: TBaseSocket.TLogDelegate);
927 begin
928 inherited Create(ALogDelegate);
929 Socket := ASocket;
930 end;
931
932 constructor TSocket.Create(ASocket: Winapi.Winsock2.TSocket; AInterruptListener: ISmartPointer<Winapi.Winsock2.TSocket>;
933 ALogDelegate: TBaseSocket.TLogDelegate);
934 begin
935 inherited Create(ALogDelegate);
936 Socket := ASocket;
937 FInterruptListener := AInterruptListener;
938 end;
939
940 procedure TSocket.Open;
941 begin
942 if IsOpen then Exit;
943 LocalOpen;
944 end;
945
946 procedure TSocket.Close;
947 begin
948 inherited Close;
949 InitPeerInfo;
950 end;
951
Readnull952 function TSocket.Read(var Buf; Len: Integer): Integer;
953 label
954 TryAgain;
955 var
956 Retries: Longword;
957 EAgainThreshold,
958 ReadElapsed: UInt64;
959 Start: TDateTime;
960 Got: Integer;
961 Fds: TFdSet;
962 ErrnoCopy: Integer;
963 TVal: TTimeVal;
964 PTVal: PTimeVal;
965 Ret: Integer;
966 begin
967 if Socket = INVALID_SOCKET then
968 raise TTransportExceptionNotOpen.Create('Called read on non-open socket');
969
970 Retries := 0;
971
972 // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
973 // the system is out of resources (an awesome undocumented feature).
974 // The following is an approximation of the time interval under which
975 // THRIFT_EAGAIN is taken to indicate an out of resources error.
976 EAgainThreshold := 0;
977 if RecvTimeout <> 0 then
978 // if a readTimeout is specified along with a max number of recv retries, then
979 // the threshold will ensure that the read timeout is not exceeded even in the
980 // case of resource errors
981 EAgainThreshold := RecvTimeout div IfThen(FMaxRecvRetries > 0, FMaxRecvRetries, 2);
982
983 TryAgain:
984 // Read from the socket
985 if RecvTimeout > 0 then
986 Start := Now
987 else
988 // if there is no read timeout we don't need the TOD to determine whether
989 // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
990 Start := 0;
991
992 if Assigned(FInterruptListener) then begin
993 FD_ZERO(Fds);
994 _FD_SET(Socket, Fds);
995 _FD_SET(FInterruptListener, Fds);
996 if RecvTimeout > 0 then begin
997 TVal.tv_sec := RecvTimeout div 1000;
998 TVal.tv_usec := (RecvTimeout mod 1000) * 1000;
999 PTVal := @TVal;
1000 end
1001 else
1002 PTVal := nil;
1003
1004 Ret := select(2, @Fds, nil, nil, PTVal);
1005 ErrnoCopy := WSAGetLastError;
1006 if Ret < 0 then begin
1007 // error cases
1008 if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
1009 Inc(Retries);
1010 goto TryAgain;
1011 end;
1012 LogDelegate(Format('TSocket.Read() select() %s', [SysErrorMessage(ErrnoCopy)]));
1013 raise TTransportExceptionUnknown.Create(Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
1014 end
1015 else if Ret > 0 then begin
1016 // Check the interruptListener
1017 if FD_ISSET(FInterruptListener, Fds) then
1018 raise TTransportExceptionInterrupted.Create('Interrupted');
1019 end
1020 else // Ret = 0
1021 raise TTransportExceptionTimedOut.Create('WSAEWOULDBLOCK (timed out)');
1022
1023 // falling through means there is something to recv and it cannot block
1024 end;
1025
1026 Got := recv(Socket, Buf, Len, 0);
1027 ErrnoCopy := WSAGetLastError;
1028 // Check for error on read
1029 if Got < 0 then begin
1030 if ErrnoCopy = WSAEWOULDBLOCK then begin
1031 // if no timeout we can assume that resource exhaustion has occurred.
1032 if RecvTimeout = 0 then
1033 raise TTransportExceptionTimedOut.Create('WSAEWOULDBLOCK (unavailable resources)');
1034 // check if this is the lack of resources or timeout case
1035 ReadElapsed := MilliSecondsBetween(Now, Start);
1036 if (EAgainThreshold = 0) or (ReadElapsed < EAgainThreshold) then begin
1037 if Retries < FMaxRecvRetries then begin
1038 Inc(Retries);
1039 Sleep(1);
1040 goto TryAgain;
1041 end
1042 else
1043 raise TTransportExceptionTimedOut.Create('WSAEWOULDBLOCK (unavailable resources)');
1044 end
1045 else
1046 // infer that timeout has been hit
1047 raise TTransportExceptionTimedOut.Create('WSAEWOULDBLOCK (timed out)');
1048 end;
1049
1050 // If interrupted, try again
1051 if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
1052 Inc(Retries);
1053 goto TryAgain;
1054 end;
1055
1056 if ErrnoCopy = WSAECONNRESET then
1057 Exit(0);
1058
1059 // This ish isn't open
1060 if ErrnoCopy = WSAENOTCONN then
1061 raise TTransportExceptionNotOpen.Create('WSAENOTCONN');
1062
1063 // Timed out!
1064 if ErrnoCopy = WSAETIMEDOUT then
1065 raise TTransportExceptionNotOpen.Create('WSAETIMEDOUT');
1066
1067 // Now it's not a try again case, but a real probblez
1068 LogDelegate(Format('TSocket.Read() recv() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
1069
1070 // Some other error, whatevz
1071 raise TTransportExceptionUnknown.Create(Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
1072 end;
1073
1074 Result := Got;
1075 end;
1076
1077 procedure TSocket.Write(const Buf; Len: Integer);
1078 var
1079 Sent, B: Integer;
1080 begin
1081 Sent := 0;
1082 while Sent < Len do begin
1083 B := WritePartial((PByte(@Buf) + Sent)^, Len - Sent);
1084 if B = 0 then
1085 // This should only happen if the timeout set with SO_SNDTIMEO expired.
1086 // Raise an exception.
1087 raise TTransportExceptionTimedOut.Create('send timeout expired');
1088 Inc(Sent, B);
1089 end;
1090 end;
1091
WritePartialnull1092 function TSocket.WritePartial(const Buf; Len: Integer): Integer;
1093 var
1094 B: Integer;
1095 ErrnoCopy: Integer;
1096 begin
1097 if Socket = INVALID_SOCKET then
1098 raise TTransportExceptionNotOpen.Create('Called write on non-open socket');
1099
1100 B := send(Socket, Buf, Len, 0);
1101
1102 if B < 0 then begin
1103 // Fail on a send error
1104 ErrnoCopy := WSAGetLastError;
1105 if ErrnoCopy = WSAEWOULDBLOCK then
1106 Exit(0);
1107
1108 LogDelegate(Format('TSocket.WritePartial() send() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
1109
1110 if (ErrnoCopy = WSAECONNRESET) or (ErrnoCopy = WSAENOTCONN) then begin
1111 Close;
1112 raise TTransportExceptionNotOpen.Create(Format('write() send(): %s', [SysErrorMessage(ErrnoCopy)]));
1113 end;
1114
1115 raise TTransportExceptionUnknown.Create(Format('write() send(): %s', [SysErrorMessage(ErrnoCopy)]));
1116 end;
1117
1118 // Fail on blocked send
1119 if B = 0 then
1120 raise TTransportExceptionNotOpen.Create('Socket send returned 0.');
1121
1122 Result := B;
1123 end;
1124
GetCachedAddressnull1125 function TSocket.GetCachedAddress(out Len: Integer): PSockAddr;
1126 begin
1127 case FCachedPeerAddr.ipv4.sin_family of
1128 AF_INET: begin
1129 Len := SizeOf(TSockAddrIn);
1130 Result := PSockAddr(@FCachedPeerAddr.ipv4);
1131 end;
1132 AF_INET6: begin
1133 Len := SizeOf(TSockAddrIn6);
1134 Result := PSockAddr(@FCachedPeerAddr.ipv6);
1135 end;
1136 else
1137 Len := 0;
1138 Result := nil;
1139 end;
1140 end;
1141
1142 procedure TSocket.SetCachedAddress(const Addr: TSockAddr; Len: Integer);
1143 begin
1144 case Addr.sa_family of
1145 AF_INET: if Len = SizeOf(TSockAddrIn) then FCachedPeerAddr.ipv4 := PSockAddrIn(@Addr)^;
1146 AF_INET6: if Len = SizeOf(TSockAddrIn6) then FCachedPeerAddr.ipv6 := PSockAddrIn6(@Addr)^;
1147 end;
1148 FPeerAddress := '';
1149 FPeerHost := '';
1150 FPeerPort := 0;
1151 end;
1152
1153 procedure TSocket.SetLinger(LingerOn: Boolean; LingerVal: Integer);
1154 var
1155 L: TLinger;
1156 begin
1157 FLingerOn := LingerOn;
1158 FLingerVal := LingerVal;
1159 if Socket = INVALID_SOCKET then
1160 Exit;
1161
1162 L.l_onoff := IfThen(FLingerOn, 1, 0);
1163 L.l_linger := LingerVal;
1164
1165 if setsockopt(Socket, SOL_SOCKET, SO_LINGER, @L, SizeOf(L)) = SOCKET_ERROR then
1166 LogDelegate(Format('TSocket.SetLinger() setsockopt() %s %s', [SocketInfo, SysErrorMessage(WSAGetLastError)]));
1167 end;
1168
Peeknull1169 function TSocket.Peek: Boolean;
1170 var
1171 Retries: Longword;
1172 Fds: TFdSet;
1173 TVal: TTimeVal;
1174 PTVal: PTimeVal;
1175 Ret: Integer;
1176 ErrnoCopy: Integer;
1177 Buf: Byte;
1178 begin
1179 if not IsOpen then Exit(False);
1180
1181 if Assigned(FInterruptListener) then begin
1182 Retries := 0;
1183 while true do begin
1184 FD_ZERO(Fds);
1185 _FD_SET(Socket, Fds);
1186 _FD_SET(FInterruptListener, Fds);
1187 if RecvTimeout > 0 then begin
1188 TVal.tv_sec := RecvTimeout div 1000;
1189 TVal.tv_usec := (RecvTimeout mod 1000) * 1000;
1190 PTVal := @TVal;
1191 end
1192 else
1193 PTVal := nil;
1194
1195 Ret := select(2, @Fds, nil, nil, PTVal);
1196 ErrnoCopy := WSAGetLastError;
1197 if Ret < 0 then begin
1198 // error cases
1199 if (ErrnoCopy = WSAEINTR) and (Retries < FMaxRecvRetries) then begin
1200 Inc(Retries);
1201 Continue;
1202 end;
1203 LogDelegate(Format('TSocket.Peek() select() %s', [SysErrorMessage(ErrnoCopy)]));
1204 raise TTransportExceptionUnknown.Create(Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
1205 end
1206 else if Ret > 0 then begin
1207 // Check the interruptListener
1208 if FD_ISSET(FInterruptListener, Fds) then
1209 Exit(False);
1210 // There must be data or a disconnection, fall through to the PEEK
1211 Break;
1212 end
1213 else
1214 // timeout
1215 Exit(False);
1216 end;
1217 end;
1218
1219 // Check to see if data is available or if the remote side closed
1220 Ret := recv(Socket, Buf, 1, MSG_PEEK);
1221 if Ret = SOCKET_ERROR then begin
1222 ErrnoCopy := WSAGetLastError;
1223 if ErrnoCopy = WSAECONNRESET then begin
1224 Close;
1225 Exit(False);
1226 end;
1227 LogDelegate(Format('TSocket.Peek() recv() %s %s', [SocketInfo, SysErrorMessage(ErrnoCopy)]));
1228 raise TTransportExceptionUnknown.Create(Format('recv(): %s', [SysErrorMessage(ErrnoCopy)]));
1229 end;
1230 Result := Ret > 0;
1231 end;
1232
CreateSocketObjnull1233 function TServerSocket.CreateSocketObj(Client: Winapi.Winsock2.TSocket): TSocket;
1234 begin
1235 if FInterruptableChildren then
1236 Result := TSocket.Create(Client, FChildInterruptSockReader)
1237 else
1238 Result := TSocket.Create(Client);
1239 end;
1240
1241 procedure TServerSocket.Notify(NotifySocket: Winapi.Winsock2.TSocket);
1242 var
1243 Byt: Byte;
1244 begin
1245 if NotifySocket <> INVALID_SOCKET then begin
1246 Byt := 0;
1247 if send(NotifySocket, Byt, SizeOf(Byt), 0) = SOCKET_ERROR then
1248 LogDelegate(Format('TServerSocket.Notify() send() %s', [SysErrorMessage(WSAGetLastError)]));
1249 end;
1250 end;
1251
1252 procedure TServerSocket.SetInterruptableChildren(AValue: Boolean);
1253 begin
1254 if FListening then
1255 raise Exception.Create('InterruptableChildren cannot be set after listen()');
1256 FInterruptableChildren := AValue;
1257 end;
1258
1259 procedure TServerSocket.CommonInit;
1260 begin
1261 inherited CommonInit;
1262 FInterruptableChildren := True;
1263 FAcceptBacklog := DEFAULT_BACKLOG;
1264 FAcceptTimeout := 0;
1265 FRetryLimit := 0;
1266 FRetryDelay := 0;
1267 FTcpSendBuffer := 0;
1268 FTcpRecvBuffer := 0;
1269 FListening := False;
1270 FInterruptSockWriter := INVALID_SOCKET;
1271 FInterruptSockReader := INVALID_SOCKET;
1272 FChildInterruptSockWriter := INVALID_SOCKET;
1273 end;
1274
1275 constructor TServerSocket.Create(APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate = nil);
1276 begin
1277 // Unnecessary, but here for documentation purposes
1278 inherited Create(APort, ALogDelegate);
1279 end;
1280
1281 constructor TServerSocket.Create(APort: Integer; ASendTimeout, ARecvTimeout: Longword; ALogDelegate: TBaseSocket.TLogDelegate);
1282 begin
1283 inherited Create(APort, ALogDelegate);
1284 SendTimeout := ASendTimeout;
1285 RecvTimeout := ARecvTimeout;
1286 end;
1287
1288 constructor TServerSocket.Create(AAddress: string; APort: Integer; ALogDelegate: TBaseSocket.TLogDelegate);
1289 begin
1290 inherited Create(APort, ALogDelegate);
1291 FAddress := AAddress;
1292 end;
1293
1294 procedure TServerSocket.Listen;
1295
1296 function CreateSocketPair(var Reader, Writer: Winapi.Winsock2.TSocket): Integer;
1297 label
1298 Error;
1299 type
1300 TSAUnion = record
1301 case Integer of
1302 0: (inaddr: TSockAddrIn);
1303 1: (addr: TSockAddr);
1304 end;
1305 var
1306 a: TSAUnion;
1307 listener: Winapi.Winsock2.TSocket;
1308 e: Integer;
1309 addrlen: Integer;
1310 flags: DWORD;
1311 reuse: Integer;
1312 begin
1313 addrlen := SizeOf(a.inaddr);
1314 flags := 0;
1315 reuse := 1;
1316
1317 listener := Winapi.Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1318 if listener = INVALID_SOCKET then
1319 Exit(SOCKET_ERROR);
1320
1321 FillChar(a, SizeOf(a), 0);
1322 a.inaddr.sin_family := AF_INET;
1323 a.inaddr.sin_addr.s_addr := htonl(INADDR_LOOPBACK);
1324 a.inaddr.sin_port := 0;
1325 Reader := INVALID_SOCKET;
1326 Writer := INVALID_SOCKET;
1327
1328 // ignore errors coming out of this setsockopt. This is because
1329 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
1330 // want to force socket pairs to be an admin.
1331 setsockopt(listener, SOL_SOCKET, Integer(SO_EXCLUSIVEADDRUSE), @reuse, SizeOf(reuse));
1332 if bind(listener, a.addr, SizeOf(a.inaddr)) = SOCKET_ERROR then
1333 goto Error;
1334
1335 if getsockname(listener, a.addr, addrlen) = SOCKET_ERROR then
1336 goto Error;
1337
1338 if Winapi.Winsock2.listen(listener, 1) = SOCKET_ERROR then
1339 goto Error;
1340
1341 Reader := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, flags);
1342 if Reader = INVALID_SOCKET then
1343 goto Error;
1344
1345 if connect(Reader, a.addr, SizeOf(a.inaddr)) = SOCKET_ERROR then
1346 goto Error;
1347
1348 Writer := Winapi.Winsock2.accept(listener, nil, nil);
1349 if Writer = INVALID_SOCKET then
1350 goto Error;
1351
1352 closesocket(listener);
1353 Exit(0);
1354
1355 Error:
1356 e := WSAGetLastError;
1357 closesocket(listener);
1358 closesocket(Reader);
1359 closesocket(Writer);
1360 WSASetLastError(e);
1361 Result := SOCKET_ERROR;
1362 end;
1363
1364 var
1365 TempIntReader,
1366 TempIntWriter: Winapi.Winsock2.TSocket;
1367 One: Cardinal;
1368 ErrnoCopy: Integer;
1369 Ling: TLinger;
1370 Retries: Integer;
1371 AddrInfo: IGetAddrInfoWrapper;
1372 SA: TSockAddrStorage;
1373 Len: Integer;
1374 begin
1375 // Create the socket pair used to interrupt
1376 if CreateSocketPair(TempIntReader, TempIntWriter) = SOCKET_ERROR then begin
1377 LogDelegate(Format('TServerSocket.Listen() CreateSocketPair() Interrupt %s', [SysErrorMessage(WSAGetLastError)]));
1378 FInterruptSockReader := INVALID_SOCKET;
1379 FInterruptSockWriter := INVALID_SOCKET;
1380 end
1381 else begin
1382 FInterruptSockReader := TempIntReader;
1383 FInterruptSockWriter := TempIntWriter;
1384 end;
1385
1386 // Create the socket pair used to interrupt all clients
1387 if CreateSocketPair(TempIntReader, TempIntWriter) = SOCKET_ERROR then begin
1388 LogDelegate(Format('TServerSocket.Listen() CreateSocketPair() ChildInterrupt %s', [SysErrorMessage(WSAGetLastError)]));
1389 FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(INVALID_SOCKET, nil);
1390 FChildInterruptSockWriter := INVALID_SOCKET;
1391 end
1392 else begin
1393 FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(TempIntReader, DestroyerOfFineSockets);
1394 FChildInterruptSockWriter := TempIntWriter;
1395 end;
1396
1397 if (Port < 0) or (Port > $FFFF) then
1398 raise TTransportExceptionBadArgs.Create('Specified port is invalid');
1399
1400 AddrInfo := CreateSocket(FAddress, Port);
1401
1402 // Set SO_EXCLUSIVEADDRUSE to prevent 2MSL delay on accept
1403 One := 1;
1404 setsockopt(Socket, SOL_SOCKET, Integer(SO_EXCLUSIVEADDRUSE), @one, SizeOf(One));
1405 // ignore errors coming out of this setsockopt on Windows. This is because
1406 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
1407 // want to force servers to be an admin.
1408
1409 // Set TCP buffer sizes
1410 if FTcpSendBuffer > 0 then begin
1411 if setsockopt(Socket, SOL_SOCKET, SO_SNDBUF, @FTcpSendBuffer, SizeOf(FTcpSendBuffer)) = SOCKET_ERROR then begin
1412 ErrnoCopy := WSAGetLastError;
1413 LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_SNDBUF %s', [SysErrorMessage(ErrnoCopy)]));
1414 raise TTransportExceptionNotOpen.Create(Format('Could not set SO_SNDBUF: %s', [SysErrorMessage(ErrnoCopy)]));
1415 end;
1416 end;
1417
1418 if FTcpRecvBuffer > 0 then begin
1419 if setsockopt(Socket, SOL_SOCKET, SO_RCVBUF, @FTcpRecvBuffer, SizeOf(FTcpRecvBuffer)) = SOCKET_ERROR then begin
1420 ErrnoCopy := WSAGetLastError;
1421 LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_RCVBUF %s', [SysErrorMessage(ErrnoCopy)]));
1422 raise TTransportExceptionNotOpen.Create(Format('Could not set SO_RCVBUF: %s', [SysErrorMessage(ErrnoCopy)]));
1423 end;
1424 end;
1425
1426 // Turn linger off, don't want to block on calls to close
1427 Ling.l_onoff := 0;
1428 Ling.l_linger := 0;
1429 if setsockopt(Socket, SOL_SOCKET, SO_LINGER, @Ling, SizeOf(Ling)) = SOCKET_ERROR then begin
1430 ErrnoCopy := WSAGetLastError;
1431 LogDelegate(Format('TServerSocket.Listen() setsockopt() SO_LINGER %s', [SysErrorMessage(ErrnoCopy)]));
1432 raise TTransportExceptionNotOpen.Create(Format('Could not set SO_LINGER: %s', [SysErrorMessage(ErrnoCopy)]));
1433 end;
1434
1435 // TCP Nodelay, speed over bandwidth
1436 if setsockopt(Socket, IPPROTO_TCP, TCP_NODELAY, @One, SizeOf(One)) = SOCKET_ERROR then begin
1437 ErrnoCopy := WSAGetLastError;
1438 LogDelegate(Format('TServerSocket.Listen() setsockopt() TCP_NODELAY %s', [SysErrorMessage(ErrnoCopy)]));
1439 raise TTransportExceptionNotOpen.Create(Format('Could not set TCP_NODELAY: %s', [SysErrorMessage(ErrnoCopy)]));
1440 end;
1441
1442 // Set NONBLOCK on the accept socket
1443 if ioctlsocket(Socket, Integer(FIONBIO), One) = SOCKET_ERROR then begin
1444 ErrnoCopy := WSAGetLastError;
1445 LogDelegate(Format('TServerSocket.Listen() ioctlsocket() FIONBIO %s', [SysErrorMessage(ErrnoCopy)]));
1446 raise TTransportExceptionNotOpen.Create(Format('ioctlsocket() FIONBIO: %s', [SysErrorMessage(ErrnoCopy)]));
1447 end;
1448
1449 // prepare the port information
1450 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
1451 // always seem to work. The client can configure the retry variables.
1452 Retries := 0;
1453 while True do begin
1454 if bind(Socket, AddrInfo.Res^.ai_addr^, AddrInfo.Res^.ai_addrlen) = 0 then
1455 Break;
1456 Inc(Retries);
1457 if Retries > FRetryLimit then
1458 Break;
1459 Sleep(FRetryDelay * 1000);
1460 end;
1461
1462 // retrieve bind info
1463 if (Port = 0) and (Retries < FRetryLimit) then begin
1464 Len := SizeOf(SA);
1465 FillChar(SA, Len, 0);
1466 if getsockname(Socket, PSockAddr(@SA)^, Len) = SOCKET_ERROR then
1467 LogDelegate(Format('TServerSocket.Listen() getsockname() %s', [SysErrorMessage(WSAGetLastError)]))
1468 else begin
1469 if SA.ss_family = AF_INET6 then
1470 Port := ntohs(PSockAddrIn6(@SA)^.sin6_port)
1471 else
1472 Port := ntohs(PSockAddrIn(@SA)^.sin_port);
1473 end;
1474 end;
1475
1476 // throw an error if we failed to bind properly
1477 if (Retries > FRetryLimit) then begin
1478 LogDelegate(Format('TServerSocket.Listen() BIND %d', [Port]));
1479 Close;
1480 raise TTransportExceptionNotOpen.Create(Format('Could not bind: %s', [SysErrorMessage(WSAGetLastError)]));
1481 end;
1482
1483 if Assigned(FListenCallback) then
1484 FListenCallback(Socket);
1485
1486 // Call listen
1487 if Winapi.Winsock2.listen(Socket, FAcceptBacklog) = SOCKET_ERROR then begin
1488 ErrnoCopy := WSAGetLastError;
1489 LogDelegate(Format('TServerSocket.Listen() listen() %s', [SysErrorMessage(ErrnoCopy)]));
1490 raise TTransportExceptionNotOpen.Create(Format('Could not listen: %s', [SysErrorMessage(ErrnoCopy)]));
1491 end;
1492
1493 // The socket is now listening!
1494 end;
1495
Acceptnull1496 function TServerSocket.Accept: TSocket;
1497 var
1498 Fds: TFdSet;
1499 MaxEInters,
1500 NumEInters: Integer;
1501 TVal: TTimeVal;
1502 PTVal: PTimeVal;
1503 ErrnoCopy: Integer;
1504 Buf: Byte;
1505 ClientAddress: TSockAddrStorage;
1506 Size: Integer;
1507 ClientSocket: Winapi.Winsock2.TSocket;
1508 Zero: Cardinal;
1509 Client: TSocket;
1510 Ret: Integer;
1511 begin
1512 MaxEInters := 5;
1513 NumEInters := 0;
1514
1515 while True do begin
1516 FD_ZERO(Fds);
1517 _FD_SET(Socket, Fds);
1518 _FD_SET(FInterruptSockReader, Fds);
1519 if FAcceptTimeout > 0 then begin
1520 TVal.tv_sec := FAcceptTimeout div 1000;
1521 TVal.tv_usec := (FAcceptTimeout mod 1000) * 1000;
1522 PTVal := @TVal;
1523 end
1524 else
1525 PTVal := nil;
1526
1527 // TODO: if WSAEINTR is received, we'll restart the timeout.
1528 // To be accurate, we need to fix this in the future.
1529 Ret := select(2, @Fds, nil, nil, PTVal);
1530
1531 if Ret < 0 then begin
1532 // error cases
1533 if (WSAGetLastError = WSAEINTR) and (NumEInters < MaxEInters) then begin
1534 // THRIFT_EINTR needs to be handled manually and we can tolerate
1535 // a certain number
1536 Inc(NumEInters);
1537 Continue;
1538 end;
1539 ErrnoCopy := WSAGetLastError;
1540 LogDelegate(Format('TServerSocket.Accept() select() %s', [SysErrorMessage(ErrnoCopy)]));
1541 raise TTransportExceptionUnknown.Create(Format('Unknown: %s', [SysErrorMessage(ErrnoCopy)]));
1542 end
1543 else if Ret > 0 then begin
1544 // Check for an interrupt signal
1545 if (FInterruptSockReader <> INVALID_SOCKET) and FD_ISSET(FInterruptSockReader, Fds) then begin
1546 if recv(FInterruptSockReader, Buf, SizeOf(Buf), 0) = SOCKET_ERROR then
1547 LogDelegate(Format('TServerSocket.Accept() recv() interrupt %s', [SysErrorMessage(WSAGetLastError)]));
1548 raise TTransportExceptionInterrupted.Create('interrupted');
1549 end;
1550
1551 // Check for the actual server socket being ready
1552 if FD_ISSET(Socket, Fds) then
1553 Break;
1554 end
1555 else begin
1556 LogDelegate('TServerSocket.Accept() select() 0');
1557 raise TTransportExceptionUnknown.Create('unknown error');
1558 end;
1559 end;
1560
1561 Size := SizeOf(ClientAddress);
1562 ClientSocket := Winapi.Winsock2.accept(Socket, @ClientAddress, @Size);
1563 if ClientSocket = INVALID_SOCKET then begin
1564 ErrnoCopy := WSAGetLastError;
1565 LogDelegate(Format('TServerSocket.Accept() accept() %s', [SysErrorMessage(ErrnoCopy)]));
1566 raise TTransportExceptionUnknown.Create(Format('accept(): %s', [SysErrorMessage(ErrnoCopy)]));
1567 end;
1568
1569 // Make sure client socket is blocking
1570 Zero := 0;
1571 if ioctlsocket(ClientSocket, Integer(FIONBIO), Zero) = SOCKET_ERROR then begin
1572 ErrnoCopy := WSAGetLastError;
1573 closesocket(ClientSocket);
1574 LogDelegate(Format('TServerSocket.Accept() ioctlsocket() FIONBIO %s', [SysErrorMessage(ErrnoCopy)]));
1575 raise TTransportExceptionUnknown.Create(Format('ioctlsocket(): %s', [SysErrorMessage(ErrnoCopy)]));
1576 end;
1577
1578 Client := CreateSocketObj(ClientSocket);
1579 if SendTimeout > 0 then
1580 Client.SendTimeout := SendTimeout;
1581 if RecvTimeout > 0 then
1582 Client.RecvTimeout := RecvTimeout;
1583 if KeepAlive then
1584 Client.KeepAlive := KeepAlive;
1585 Client.SetCachedAddress(PSockAddr(@ClientAddress)^, Size);
1586
1587 if Assigned(FAcceptCallback) then
1588 FAcceptCallback(ClientSocket);
1589
1590 Result := Client;
1591 end;
1592
1593 procedure TServerSocket.Interrupt;
1594 begin
1595 Notify(FInterruptSockWriter);
1596 end;
1597
1598 procedure TServerSocket.InterruptChildren;
1599 begin
1600 Notify(FChildInterruptSockWriter);
1601 end;
1602
1603 procedure TServerSocket.Close;
1604 begin
1605 inherited Close;
1606 if FInterruptSockWriter <> INVALID_SOCKET then
1607 closesocket(FInterruptSockWriter);
1608 if FInterruptSockReader <> INVALID_SOCKET then
1609 closesocket(FInterruptSockReader);
1610 if FChildInterruptSockWriter <> INVALID_SOCKET then
1611 closesocket(FChildInterruptSockWriter);
1612 FChildInterruptSockReader := TSmartPointer<Winapi.Winsock2.TSocket>.Create(INVALID_SOCKET, nil);
1613 FListening := False;
1614 end;
1615
1616 {$ENDIF} // not for OLD_SOCKETS
1617 end.
1618