1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 //! Types used to send and receive bytes over an I/O channel. 19 //! 20 //! The core types are the `TReadTransport`, `TWriteTransport` and the 21 //! `TIoChannel` traits, through which `TInputProtocol` or 22 //! `TOutputProtocol` can receive and send primitives over the wire. While 23 //! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives 24 //! the types in this module understand only bytes. 25 26 use std::io; 27 use std::io::{Read, Write}; 28 use std::ops::{Deref, DerefMut}; 29 30 #[cfg(test)] 31 macro_rules! assert_eq_transport_num_written_bytes { 32 ($transport:ident, $num_written_bytes:expr) => {{ 33 assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes); 34 }}; 35 } 36 37 #[cfg(test)] 38 macro_rules! assert_eq_transport_written_bytes { 39 ($transport:ident, $expected_bytes:ident) => {{ 40 assert_eq!($transport.channel.write_bytes(), &$expected_bytes); 41 }}; 42 } 43 44 mod buffered; 45 mod framed; 46 mod mem; 47 mod socket; 48 49 pub use self::buffered::{ 50 TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport, 51 TBufferedWriteTransportFactory, 52 }; 53 pub use self::framed::{ 54 TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, 55 TFramedWriteTransportFactory, 56 }; 57 pub use self::mem::TBufferChannel; 58 pub use self::socket::TTcpChannel; 59 60 /// Identifies a transport used by a `TInputProtocol` to receive bytes. 61 pub trait TReadTransport: Read {} 62 63 /// Helper type used by a server to create `TReadTransport` instances for 64 /// accepted client connections. 65 pub trait TReadTransportFactory { 66 /// Create a `TTransport` that wraps a channel over which bytes are to be read. create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>67 fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>; 68 } 69 70 /// Identifies a transport used by `TOutputProtocol` to send bytes. 71 pub trait TWriteTransport: Write {} 72 73 /// Helper type used by a server to create `TWriteTransport` instances for 74 /// accepted client connections. 75 pub trait TWriteTransportFactory { 76 /// Create a `TTransport` that wraps a channel over which bytes are to be sent. create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>77 fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>; 78 } 79 80 impl<T> TReadTransport for T where T: Read {} 81 82 impl<T> TWriteTransport for T where T: Write {} 83 84 // FIXME: implement the Debug trait for boxed transports 85 86 impl<T> TReadTransportFactory for Box<T> 87 where 88 T: TReadTransportFactory + ?Sized, 89 { create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>90 fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> { 91 (**self).create(channel) 92 } 93 } 94 95 impl<T> TWriteTransportFactory for Box<T> 96 where 97 T: TWriteTransportFactory + ?Sized, 98 { create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>99 fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> { 100 (**self).create(channel) 101 } 102 } 103 104 /// Identifies a splittable bidirectional I/O channel used to send and receive bytes. 105 pub trait TIoChannel: Read + Write { 106 /// Split the channel into a readable half and a writable half, where the 107 /// readable half implements `io::Read` and the writable half implements 108 /// `io::Write`. Returns `None` if the channel was not initialized, or if it 109 /// cannot be split safely. 110 /// 111 /// Returned halves may share the underlying OS channel or buffer resources. 112 /// Implementations **should ensure** that these two halves can be safely 113 /// used independently by concurrent threads. split( self, ) -> crate::Result<( crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>, )> where Self: Sized114 fn split( 115 self, 116 ) -> crate::Result<( 117 crate::transport::ReadHalf<Self>, 118 crate::transport::WriteHalf<Self>, 119 )> 120 where 121 Self: Sized; 122 } 123 124 /// The readable half of an object returned from `TIoChannel::split`. 125 #[derive(Debug)] 126 pub struct ReadHalf<C> 127 where 128 C: Read, 129 { 130 handle: C, 131 } 132 133 /// The writable half of an object returned from `TIoChannel::split`. 134 #[derive(Debug)] 135 pub struct WriteHalf<C> 136 where 137 C: Write, 138 { 139 handle: C, 140 } 141 142 impl<C> ReadHalf<C> 143 where 144 C: Read, 145 { 146 /// Create a `ReadHalf` associated with readable `handle` new(handle: C) -> ReadHalf<C>147 pub fn new(handle: C) -> ReadHalf<C> { 148 ReadHalf { handle } 149 } 150 } 151 152 impl<C> WriteHalf<C> 153 where 154 C: Write, 155 { 156 /// Create a `WriteHalf` associated with writable `handle` new(handle: C) -> WriteHalf<C>157 pub fn new(handle: C) -> WriteHalf<C> { 158 WriteHalf { handle } 159 } 160 } 161 162 impl<C> Read for ReadHalf<C> 163 where 164 C: Read, 165 { read(&mut self, buf: &mut [u8]) -> io::Result<usize>166 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 167 self.handle.read(buf) 168 } 169 } 170 171 impl<C> Write for WriteHalf<C> 172 where 173 C: Write, 174 { write(&mut self, buf: &[u8]) -> io::Result<usize>175 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 176 self.handle.write(buf) 177 } 178 flush(&mut self) -> io::Result<()>179 fn flush(&mut self) -> io::Result<()> { 180 self.handle.flush() 181 } 182 } 183 184 impl<C> Deref for ReadHalf<C> 185 where 186 C: Read, 187 { 188 type Target = C; 189 deref(&self) -> &Self::Target190 fn deref(&self) -> &Self::Target { 191 &self.handle 192 } 193 } 194 195 impl<C> DerefMut for ReadHalf<C> 196 where 197 C: Read, 198 { deref_mut(&mut self) -> &mut C199 fn deref_mut(&mut self) -> &mut C { 200 &mut self.handle 201 } 202 } 203 204 impl<C> Deref for WriteHalf<C> 205 where 206 C: Write, 207 { 208 type Target = C; 209 deref(&self) -> &Self::Target210 fn deref(&self) -> &Self::Target { 211 &self.handle 212 } 213 } 214 215 impl<C> DerefMut for WriteHalf<C> 216 where 217 C: Write, 218 { deref_mut(&mut self) -> &mut C219 fn deref_mut(&mut self) -> &mut C { 220 &mut self.handle 221 } 222 } 223 224 #[cfg(test)] 225 mod tests { 226 227 use std::io::Cursor; 228 229 use super::*; 230 231 #[test] must_create_usable_read_channel_from_concrete_read_type()232 fn must_create_usable_read_channel_from_concrete_read_type() { 233 let r = Cursor::new([0, 1, 2]); 234 let _ = TBufferedReadTransport::new(r); 235 } 236 237 #[test] must_create_usable_read_channel_from_boxed_read()238 fn must_create_usable_read_channel_from_boxed_read() { 239 let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2])); 240 let _ = TBufferedReadTransport::new(r); 241 } 242 243 #[test] must_create_usable_write_channel_from_concrete_write_type()244 fn must_create_usable_write_channel_from_concrete_write_type() { 245 let w = vec![0u8; 10]; 246 let _ = TBufferedWriteTransport::new(w); 247 } 248 249 #[test] must_create_usable_write_channel_from_boxed_write()250 fn must_create_usable_write_channel_from_boxed_write() { 251 let w: Box<dyn Write> = Box::new(vec![0u8; 10]); 252 let _ = TBufferedWriteTransport::new(w); 253 } 254 255 #[test] must_create_usable_read_transport_from_concrete_read_transport()256 fn must_create_usable_read_transport_from_concrete_read_transport() { 257 let r = Cursor::new([0, 1, 2]); 258 let mut t = TBufferedReadTransport::new(r); 259 takes_read_transport(&mut t) 260 } 261 262 #[test] must_create_usable_read_transport_from_boxed_read()263 fn must_create_usable_read_transport_from_boxed_read() { 264 let r = Cursor::new([0, 1, 2]); 265 let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r)); 266 takes_read_transport(&mut t) 267 } 268 269 #[test] must_create_usable_write_transport_from_concrete_write_transport()270 fn must_create_usable_write_transport_from_concrete_write_transport() { 271 let w = vec![0u8; 10]; 272 let mut t = TBufferedWriteTransport::new(w); 273 takes_write_transport(&mut t) 274 } 275 276 #[test] must_create_usable_write_transport_from_boxed_write()277 fn must_create_usable_write_transport_from_boxed_write() { 278 let w = vec![0u8; 10]; 279 let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w)); 280 takes_write_transport(&mut t) 281 } 282 takes_read_transport<R>(t: &mut R) where R: TReadTransport,283 fn takes_read_transport<R>(t: &mut R) 284 where 285 R: TReadTransport, 286 { 287 t.bytes(); 288 } 289 takes_write_transport<W>(t: &mut W) where W: TWriteTransport,290 fn takes_write_transport<W>(t: &mut W) 291 where 292 W: TWriteTransport, 293 { 294 t.flush().unwrap(); 295 } 296 } 297