1/* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20package thrift 21 22import ( 23 "net" 24 "sync" 25 "time" 26) 27 28type TServerSocket struct { 29 listener net.Listener 30 addr net.Addr 31 clientTimeout time.Duration 32 33 // Protects the interrupted value to make it thread safe. 34 mu sync.RWMutex 35 interrupted bool 36} 37 38func NewTServerSocket(listenAddr string) (*TServerSocket, error) { 39 return NewTServerSocketTimeout(listenAddr, 0) 40} 41 42func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { 43 addr, err := net.ResolveTCPAddr("tcp", listenAddr) 44 if err != nil { 45 return nil, err 46 } 47 return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil 48} 49 50// Creates a TServerSocket from a net.Addr 51func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { 52 return &TServerSocket{addr: addr, clientTimeout: clientTimeout} 53} 54 55func (p *TServerSocket) Listen() error { 56 p.mu.Lock() 57 defer p.mu.Unlock() 58 if p.IsListening() { 59 return nil 60 } 61 l, err := net.Listen(p.addr.Network(), p.addr.String()) 62 if err != nil { 63 return err 64 } 65 p.listener = l 66 return nil 67} 68 69func (p *TServerSocket) Accept() (TTransport, error) { 70 p.mu.RLock() 71 interrupted := p.interrupted 72 p.mu.RUnlock() 73 74 if interrupted { 75 return nil, errTransportInterrupted 76 } 77 78 p.mu.Lock() 79 listener := p.listener 80 p.mu.Unlock() 81 if listener == nil { 82 return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") 83 } 84 85 conn, err := listener.Accept() 86 if err != nil { 87 return nil, NewTTransportExceptionFromError(err) 88 } 89 return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil 90} 91 92// Checks whether the socket is listening. 93func (p *TServerSocket) IsListening() bool { 94 return p.listener != nil 95} 96 97// Connects the socket, creating a new socket object if necessary. 98func (p *TServerSocket) Open() error { 99 p.mu.Lock() 100 defer p.mu.Unlock() 101 if p.IsListening() { 102 return NewTTransportException(ALREADY_OPEN, "Server socket already open") 103 } 104 if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { 105 return err 106 } else { 107 p.listener = l 108 } 109 return nil 110} 111 112func (p *TServerSocket) Addr() net.Addr { 113 if p.listener != nil { 114 return p.listener.Addr() 115 } 116 return p.addr 117} 118 119func (p *TServerSocket) Close() error { 120 var err error 121 p.mu.Lock() 122 if p.IsListening() { 123 err = p.listener.Close() 124 p.listener = nil 125 } 126 p.mu.Unlock() 127 return err 128} 129 130func (p *TServerSocket) Interrupt() error { 131 p.mu.Lock() 132 p.interrupted = true 133 p.mu.Unlock() 134 p.Close() 135 136 return nil 137} 138