1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! Types used to send and receive bytes over an I/O channel.
19 //!
20 //! The core types are the `TReadTransport`, `TWriteTransport` and the
21 //! `TIoChannel` traits, through which `TInputProtocol` or
22 //! `TOutputProtocol` can receive and send primitives over the wire. While
23 //! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
24 //! the types in this module understand only bytes.
25 
26 use std::io;
27 use std::io::{Read, Write};
28 use std::ops::{Deref, DerefMut};
29 
30 #[cfg(test)]
31 macro_rules! assert_eq_transport_num_written_bytes {
32     ($transport:ident, $num_written_bytes:expr) => {{
33         assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
34     }};
35 }
36 
37 #[cfg(test)]
38 macro_rules! assert_eq_transport_written_bytes {
39     ($transport:ident, $expected_bytes:ident) => {{
40         assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
41     }};
42 }
43 
44 mod buffered;
45 mod framed;
46 mod mem;
47 mod socket;
48 
49 pub use self::buffered::{
50     TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport,
51     TBufferedWriteTransportFactory,
52 };
53 pub use self::framed::{
54     TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
55     TFramedWriteTransportFactory,
56 };
57 pub use self::mem::TBufferChannel;
58 pub use self::socket::TTcpChannel;
59 
60 /// Identifies a transport used by a `TInputProtocol` to receive bytes.
61 pub trait TReadTransport: Read {}
62 
63 /// Helper type used by a server to create `TReadTransport` instances for
64 /// accepted client connections.
65 pub trait TReadTransportFactory {
66     /// Create a `TTransport` that wraps a channel over which bytes are to be read.
create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>67     fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>;
68 }
69 
70 /// Identifies a transport used by `TOutputProtocol` to send bytes.
71 pub trait TWriteTransport: Write {}
72 
73 /// Helper type used by a server to create `TWriteTransport` instances for
74 /// accepted client connections.
75 pub trait TWriteTransportFactory {
76     /// Create a `TTransport` that wraps a channel over which bytes are to be sent.
create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>77     fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>;
78 }
79 
80 impl<T> TReadTransport for T where T: Read {}
81 
82 impl<T> TWriteTransport for T where T: Write {}
83 
84 // FIXME: implement the Debug trait for boxed transports
85 
86 impl<T> TReadTransportFactory for Box<T>
87 where
88     T: TReadTransportFactory + ?Sized,
89 {
create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>90     fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
91         (**self).create(channel)
92     }
93 }
94 
95 impl<T> TWriteTransportFactory for Box<T>
96 where
97     T: TWriteTransportFactory + ?Sized,
98 {
create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>99     fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
100         (**self).create(channel)
101     }
102 }
103 
104 /// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
105 pub trait TIoChannel: Read + Write {
106     /// Split the channel into a readable half and a writable half, where the
107     /// readable half implements `io::Read` and the writable half implements
108     /// `io::Write`. Returns `None` if the channel was not initialized, or if it
109     /// cannot be split safely.
110     ///
111     /// Returned halves may share the underlying OS channel or buffer resources.
112     /// Implementations **should ensure** that these two halves can be safely
113     /// used independently by concurrent threads.
split( self, ) -> crate::Result<( crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>, )> where Self: Sized114     fn split(
115         self,
116     ) -> crate::Result<(
117         crate::transport::ReadHalf<Self>,
118         crate::transport::WriteHalf<Self>,
119     )>
120     where
121         Self: Sized;
122 }
123 
124 /// The readable half of an object returned from `TIoChannel::split`.
125 #[derive(Debug)]
126 pub struct ReadHalf<C>
127 where
128     C: Read,
129 {
130     handle: C,
131 }
132 
133 /// The writable half of an object returned from `TIoChannel::split`.
134 #[derive(Debug)]
135 pub struct WriteHalf<C>
136 where
137     C: Write,
138 {
139     handle: C,
140 }
141 
142 impl<C> ReadHalf<C>
143 where
144     C: Read,
145 {
146     /// Create a `ReadHalf` associated with readable `handle`
new(handle: C) -> ReadHalf<C>147     pub fn new(handle: C) -> ReadHalf<C> {
148         ReadHalf { handle }
149     }
150 }
151 
152 impl<C> WriteHalf<C>
153 where
154     C: Write,
155 {
156     /// Create a `WriteHalf` associated with writable `handle`
new(handle: C) -> WriteHalf<C>157     pub fn new(handle: C) -> WriteHalf<C> {
158         WriteHalf { handle }
159     }
160 }
161 
162 impl<C> Read for ReadHalf<C>
163 where
164     C: Read,
165 {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>166     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
167         self.handle.read(buf)
168     }
169 }
170 
171 impl<C> Write for WriteHalf<C>
172 where
173     C: Write,
174 {
write(&mut self, buf: &[u8]) -> io::Result<usize>175     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176         self.handle.write(buf)
177     }
178 
flush(&mut self) -> io::Result<()>179     fn flush(&mut self) -> io::Result<()> {
180         self.handle.flush()
181     }
182 }
183 
184 impl<C> Deref for ReadHalf<C>
185 where
186     C: Read,
187 {
188     type Target = C;
189 
deref(&self) -> &Self::Target190     fn deref(&self) -> &Self::Target {
191         &self.handle
192     }
193 }
194 
195 impl<C> DerefMut for ReadHalf<C>
196 where
197     C: Read,
198 {
deref_mut(&mut self) -> &mut C199     fn deref_mut(&mut self) -> &mut C {
200         &mut self.handle
201     }
202 }
203 
204 impl<C> Deref for WriteHalf<C>
205 where
206     C: Write,
207 {
208     type Target = C;
209 
deref(&self) -> &Self::Target210     fn deref(&self) -> &Self::Target {
211         &self.handle
212     }
213 }
214 
215 impl<C> DerefMut for WriteHalf<C>
216 where
217     C: Write,
218 {
deref_mut(&mut self) -> &mut C219     fn deref_mut(&mut self) -> &mut C {
220         &mut self.handle
221     }
222 }
223 
224 #[cfg(test)]
225 mod tests {
226 
227     use std::io::Cursor;
228 
229     use super::*;
230 
231     #[test]
must_create_usable_read_channel_from_concrete_read_type()232     fn must_create_usable_read_channel_from_concrete_read_type() {
233         let r = Cursor::new([0, 1, 2]);
234         let _ = TBufferedReadTransport::new(r);
235     }
236 
237     #[test]
must_create_usable_read_channel_from_boxed_read()238     fn must_create_usable_read_channel_from_boxed_read() {
239         let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2]));
240         let _ = TBufferedReadTransport::new(r);
241     }
242 
243     #[test]
must_create_usable_write_channel_from_concrete_write_type()244     fn must_create_usable_write_channel_from_concrete_write_type() {
245         let w = vec![0u8; 10];
246         let _ = TBufferedWriteTransport::new(w);
247     }
248 
249     #[test]
must_create_usable_write_channel_from_boxed_write()250     fn must_create_usable_write_channel_from_boxed_write() {
251         let w: Box<dyn Write> = Box::new(vec![0u8; 10]);
252         let _ = TBufferedWriteTransport::new(w);
253     }
254 
255     #[test]
must_create_usable_read_transport_from_concrete_read_transport()256     fn must_create_usable_read_transport_from_concrete_read_transport() {
257         let r = Cursor::new([0, 1, 2]);
258         let mut t = TBufferedReadTransport::new(r);
259         takes_read_transport(&mut t)
260     }
261 
262     #[test]
must_create_usable_read_transport_from_boxed_read()263     fn must_create_usable_read_transport_from_boxed_read() {
264         let r = Cursor::new([0, 1, 2]);
265         let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r));
266         takes_read_transport(&mut t)
267     }
268 
269     #[test]
must_create_usable_write_transport_from_concrete_write_transport()270     fn must_create_usable_write_transport_from_concrete_write_transport() {
271         let w = vec![0u8; 10];
272         let mut t = TBufferedWriteTransport::new(w);
273         takes_write_transport(&mut t)
274     }
275 
276     #[test]
must_create_usable_write_transport_from_boxed_write()277     fn must_create_usable_write_transport_from_boxed_write() {
278         let w = vec![0u8; 10];
279         let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
280         takes_write_transport(&mut t)
281     }
282 
takes_read_transport<R>(t: &mut R) where R: TReadTransport,283     fn takes_read_transport<R>(t: &mut R)
284     where
285         R: TReadTransport,
286     {
287         t.bytes();
288     }
289 
takes_write_transport<W>(t: &mut W) where W: TWriteTransport,290     fn takes_write_transport<W>(t: &mut W)
291     where
292         W: TWriteTransport,
293     {
294         t.flush().unwrap();
295     }
296 }
297