1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use std::convert::From;
19 use std::io;
20 use std::io::{ErrorKind, Read, Write};
21 use std::net::{Shutdown, TcpStream, ToSocketAddrs};
22 
23 #[cfg(unix)]
24 use std::os::unix::net::UnixStream;
25 
26 use super::{ReadHalf, TIoChannel, WriteHalf};
27 use crate::{new_transport_error, TransportErrorKind};
28 
29 /// Bidirectional TCP/IP channel.
30 ///
31 /// # Examples
32 ///
33 /// Create a `TTcpChannel`.
34 ///
35 /// ```no_run
36 /// use std::io::{Read, Write};
37 /// use thrift::transport::TTcpChannel;
38 ///
39 /// let mut c = TTcpChannel::new();
40 /// c.open("localhost:9090").unwrap();
41 ///
42 /// let mut buf = vec![0u8; 4];
43 /// c.read(&mut buf).unwrap();
44 /// c.write(&vec![0, 1, 2]).unwrap();
45 /// ```
46 ///
47 /// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
48 ///
49 /// ```no_run
50 /// use std::io::{Read, Write};
51 /// use std::net::TcpStream;
52 /// use thrift::transport::TTcpChannel;
53 ///
54 /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
55 ///
56 /// // no need to call c.open() since we've already connected above
57 /// let mut c = TTcpChannel::with_stream(stream);
58 ///
59 /// let mut buf = vec![0u8; 4];
60 /// c.read(&mut buf).unwrap();
61 /// c.write(&vec![0, 1, 2]).unwrap();
62 /// ```
63 #[derive(Debug, Default)]
64 pub struct TTcpChannel {
65     stream: Option<TcpStream>,
66 }
67 
68 impl TTcpChannel {
69     /// Create an uninitialized `TTcpChannel`.
70     ///
71     /// The returned instance must be opened using `TTcpChannel::open(...)`
72     /// before it can be used.
new() -> TTcpChannel73     pub fn new() -> TTcpChannel {
74         TTcpChannel { stream: None }
75     }
76 
77     /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
78     ///
79     /// The passed-in stream is assumed to have been opened before being wrapped
80     /// by the created `TTcpChannel` instance.
with_stream(stream: TcpStream) -> TTcpChannel81     pub fn with_stream(stream: TcpStream) -> TTcpChannel {
82         TTcpChannel {
83             stream: Some(stream),
84         }
85     }
86 
87     /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait.
open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()>88     pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()> {
89         if self.stream.is_some() {
90             Err(new_transport_error(
91                 TransportErrorKind::AlreadyOpen,
92                 "tcp connection previously opened",
93             ))
94         } else {
95             match TcpStream::connect(&remote_address) {
96                 Ok(s) => {
97                     self.stream = Some(s);
98                     Ok(())
99                 }
100                 Err(e) => Err(From::from(e)),
101             }
102         }
103     }
104 
105     /// Shut down this channel.
106     ///
107     /// Both send and receive halves are closed, and this instance can no
108     /// longer be used to communicate with another endpoint.
close(&mut self) -> crate::Result<()>109     pub fn close(&mut self) -> crate::Result<()> {
110         self.if_set(|s| s.shutdown(Shutdown::Both))
111             .map_err(From::from)
112     }
113 
if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> where F: FnMut(&mut TcpStream) -> io::Result<T>,114     fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
115     where
116         F: FnMut(&mut TcpStream) -> io::Result<T>,
117     {
118         if let Some(ref mut s) = self.stream {
119             stream_operation(s)
120         } else {
121             Err(io::Error::new(
122                 ErrorKind::NotConnected,
123                 "tcp endpoint not connected",
124             ))
125         }
126     }
127 }
128 
129 impl TIoChannel for TTcpChannel {
split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)> where Self: Sized,130     fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
131     where
132         Self: Sized,
133     {
134         let mut s = self;
135 
136         s.stream
137             .as_mut()
138             .and_then(|s| s.try_clone().ok())
139             .map(|cloned| {
140                 let read_half = ReadHalf::new(TTcpChannel {
141                     stream: s.stream.take(),
142                 });
143                 let write_half = WriteHalf::new(TTcpChannel {
144                     stream: Some(cloned),
145                 });
146                 (read_half, write_half)
147             })
148             .ok_or_else(|| {
149                 new_transport_error(
150                     TransportErrorKind::Unknown,
151                     "cannot clone underlying tcp stream",
152                 )
153             })
154     }
155 }
156 
157 impl Read for TTcpChannel {
read(&mut self, b: &mut [u8]) -> io::Result<usize>158     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
159         self.if_set(|s| s.read(b))
160     }
161 }
162 
163 impl Write for TTcpChannel {
write(&mut self, b: &[u8]) -> io::Result<usize>164     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
165         self.if_set(|s| s.write(b))
166     }
167 
flush(&mut self) -> io::Result<()>168     fn flush(&mut self) -> io::Result<()> {
169         self.if_set(|s| s.flush())
170     }
171 }
172 
173 #[cfg(unix)]
174 impl TIoChannel for UnixStream {
split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)> where Self: Sized,175     fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
176     where
177         Self: Sized,
178     {
179         let socket_rx = self.try_clone().unwrap();
180 
181         Ok((ReadHalf::new(self), WriteHalf::new(socket_rx)))
182     }
183 }
184