1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use clap::{clap_app, value_t};
19 use log::*;
20 
21 use thrift::protocol::{
22     TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory, TCompactInputProtocolFactory,
23     TCompactOutputProtocolFactory, TInputProtocolFactory, TOutputProtocolFactory,
24 };
25 use thrift::server::TServer;
26 use thrift::transport::{
27     TFramedReadTransportFactory, TFramedWriteTransportFactory, TReadTransportFactory,
28     TWriteTransportFactory,
29 };
30 
31 use crate::Socket::{ListenAddress, UnixDomainSocket};
32 use kitchen_sink::base_one::Noodle;
33 use kitchen_sink::base_two::{
34     BrothType, Napkin, NapkinServiceSyncHandler, Ramen, RamenServiceSyncHandler,
35 };
36 use kitchen_sink::midlayer::{
37     Dessert, Meal, MealServiceSyncHandler, MealServiceSyncProcessor, Pie,
38 };
39 use kitchen_sink::recursive;
40 use kitchen_sink::ultimate::FullMealAndDrinksServiceSyncHandler;
41 use kitchen_sink::ultimate::{
42     Drink, FullMeal, FullMealAndDrinks, FullMealAndDrinksServiceSyncProcessor,
43     FullMealServiceSyncHandler,
44 };
45 
46 enum Socket {
47     ListenAddress(String),
48     UnixDomainSocket(String),
49 }
50 
main()51 fn main() {
52     match run() {
53         Ok(()) => println!("kitchen sink server completed successfully"),
54         Err(e) => {
55             println!("kitchen sink server failed with error {:?}", e);
56             std::process::exit(1);
57         }
58     }
59 }
60 
run() -> thrift::Result<()>61 fn run() -> thrift::Result<()> {
62     let matches = clap_app!(rust_kitchen_sink_server =>
63         (version: "0.1.0")
64         (author: "Apache Thrift Developers <dev@thrift.apache.org>")
65         (about: "Thrift Rust kitchen sink test server")
66         (@arg port: --port +takes_value "Port on which the Thrift test server listens")
67         (@arg domain_socket: --("domain-socket") + takes_value "Unix Domain Socket on which the Thrift test server listens")
68         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
69         (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\", \"recursive\")")
70     )
71             .get_matches();
72 
73     let port = value_t!(matches, "port", u16).unwrap_or(9090);
74     let domain_socket = matches.value_of("domain_socket");
75     let protocol = matches.value_of("protocol").unwrap_or("compact");
76     let service = matches.value_of("service").unwrap_or("part");
77     let listen_address = format!("127.0.0.1:{}", port);
78 
79     let socket = match domain_socket {
80         None => {
81             info!("Server is binding to {}", listen_address);
82             Socket::ListenAddress(listen_address)
83         }
84         Some(domain_socket) => {
85             info!("Server is binding to {} (UDS)", domain_socket);
86             Socket::UnixDomainSocket(domain_socket.to_string())
87         }
88     };
89 
90     let r_transport_factory = TFramedReadTransportFactory::new();
91     let w_transport_factory = TFramedWriteTransportFactory::new();
92 
93     let (i_protocol_factory, o_protocol_factory): (
94         Box<dyn TInputProtocolFactory>,
95         Box<dyn TOutputProtocolFactory>,
96     ) = match &*protocol {
97         "binary" => (
98             Box::new(TBinaryInputProtocolFactory::new()),
99             Box::new(TBinaryOutputProtocolFactory::new()),
100         ),
101         "compact" => (
102             Box::new(TCompactInputProtocolFactory::new()),
103             Box::new(TCompactOutputProtocolFactory::new()),
104         ),
105         unknown => {
106             return Err(format!("unsupported transport type {}", unknown).into());
107         }
108     };
109 
110     // FIXME: should processor be boxed as well?
111     //
112     // [sigh] I hate Rust generics implementation
113     //
114     // I would have preferred to build a server here, return it, and then do
115     // the common listen-and-handle stuff, but since the server doesn't have a
116     // common type (because each match arm instantiates a server with a
117     // different processor) this isn't possible.
118     //
119     // Since what I'm doing is uncommon I'm just going to duplicate the code
120     match &*service {
121         "part" => run_meal_server(
122             socket,
123             r_transport_factory,
124             i_protocol_factory,
125             w_transport_factory,
126             o_protocol_factory,
127         ),
128         "full" => run_full_meal_server(
129             socket,
130             r_transport_factory,
131             i_protocol_factory,
132             w_transport_factory,
133             o_protocol_factory,
134         ),
135         "recursive" => run_recursive_server(
136             socket,
137             r_transport_factory,
138             i_protocol_factory,
139             w_transport_factory,
140             o_protocol_factory,
141         ),
142         unknown => Err(format!("unsupported service type {}", unknown).into()),
143     }
144 }
145 
run_meal_server<RTF, IPF, WTF, OPF>( socket: Socket, r_transport_factory: RTF, i_protocol_factory: IPF, w_transport_factory: WTF, o_protocol_factory: OPF, ) -> thrift::Result<()> where RTF: TReadTransportFactory + 'static, IPF: TInputProtocolFactory + 'static, WTF: TWriteTransportFactory + 'static, OPF: TOutputProtocolFactory + 'static,146 fn run_meal_server<RTF, IPF, WTF, OPF>(
147     socket: Socket,
148     r_transport_factory: RTF,
149     i_protocol_factory: IPF,
150     w_transport_factory: WTF,
151     o_protocol_factory: OPF,
152 ) -> thrift::Result<()>
153 where
154     RTF: TReadTransportFactory + 'static,
155     IPF: TInputProtocolFactory + 'static,
156     WTF: TWriteTransportFactory + 'static,
157     OPF: TOutputProtocolFactory + 'static,
158 {
159     let processor = MealServiceSyncProcessor::new(PartHandler {});
160     let mut server = TServer::new(
161         r_transport_factory,
162         i_protocol_factory,
163         w_transport_factory,
164         o_protocol_factory,
165         processor,
166         1,
167     );
168 
169     match socket {
170         ListenAddress(listen_address) => server.listen(listen_address),
171         UnixDomainSocket(s) => server.listen_uds(s),
172     }
173 }
174 
run_full_meal_server<RTF, IPF, WTF, OPF>( socket: Socket, r_transport_factory: RTF, i_protocol_factory: IPF, w_transport_factory: WTF, o_protocol_factory: OPF, ) -> thrift::Result<()> where RTF: TReadTransportFactory + 'static, IPF: TInputProtocolFactory + 'static, WTF: TWriteTransportFactory + 'static, OPF: TOutputProtocolFactory + 'static,175 fn run_full_meal_server<RTF, IPF, WTF, OPF>(
176     socket: Socket,
177     r_transport_factory: RTF,
178     i_protocol_factory: IPF,
179     w_transport_factory: WTF,
180     o_protocol_factory: OPF,
181 ) -> thrift::Result<()>
182 where
183     RTF: TReadTransportFactory + 'static,
184     IPF: TInputProtocolFactory + 'static,
185     WTF: TWriteTransportFactory + 'static,
186     OPF: TOutputProtocolFactory + 'static,
187 {
188     let processor = FullMealAndDrinksServiceSyncProcessor::new(FullHandler {});
189     let mut server = TServer::new(
190         r_transport_factory,
191         i_protocol_factory,
192         w_transport_factory,
193         o_protocol_factory,
194         processor,
195         1,
196     );
197 
198     match socket {
199         ListenAddress(listen_address) => server.listen(listen_address),
200         UnixDomainSocket(s) => server.listen_uds(s),
201     }
202 }
203 
204 struct PartHandler;
205 
206 impl MealServiceSyncHandler for PartHandler {
handle_meal(&self) -> thrift::Result<Meal>207     fn handle_meal(&self) -> thrift::Result<Meal> {
208         println!("part: handling meal call");
209         Ok(meal())
210     }
211 }
212 
213 impl RamenServiceSyncHandler for PartHandler {
handle_ramen(&self, _: i32) -> thrift::Result<Ramen>214     fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
215         println!("part: handling ramen call");
216         Ok(ramen())
217     }
218 }
219 
220 impl NapkinServiceSyncHandler for PartHandler {
handle_napkin(&self) -> thrift::Result<Napkin>221     fn handle_napkin(&self) -> thrift::Result<Napkin> {
222         println!("part: handling napkin call");
223         Ok(napkin())
224     }
225 }
226 
227 // full service
228 //
229 
230 struct FullHandler;
231 
232 impl FullMealAndDrinksServiceSyncHandler for FullHandler {
handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks>233     fn handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks> {
234         println!("full_meal_and_drinks: handling full meal and drinks call");
235         Ok(FullMealAndDrinks::new(full_meal(), Drink::CANADIAN_WHISKY))
236     }
237 
handle_best_pie(&self) -> thrift::Result<Pie>238     fn handle_best_pie(&self) -> thrift::Result<Pie> {
239         println!("full_meal_and_drinks: handling pie call");
240         Ok(Pie::MISSISSIPPI_MUD) // I prefer Pie::Pumpkin, but I have to check that casing works
241     }
242 }
243 
244 impl FullMealServiceSyncHandler for FullHandler {
handle_full_meal(&self) -> thrift::Result<FullMeal>245     fn handle_full_meal(&self) -> thrift::Result<FullMeal> {
246         println!("full: handling full meal call");
247         Ok(full_meal())
248     }
249 }
250 
251 impl MealServiceSyncHandler for FullHandler {
handle_meal(&self) -> thrift::Result<Meal>252     fn handle_meal(&self) -> thrift::Result<Meal> {
253         println!("full: handling meal call");
254         Ok(meal())
255     }
256 }
257 
258 impl RamenServiceSyncHandler for FullHandler {
handle_ramen(&self, _: i32) -> thrift::Result<Ramen>259     fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
260         println!("full: handling ramen call");
261         Ok(ramen())
262     }
263 }
264 
265 impl NapkinServiceSyncHandler for FullHandler {
handle_napkin(&self) -> thrift::Result<Napkin>266     fn handle_napkin(&self) -> thrift::Result<Napkin> {
267         println!("full: handling napkin call");
268         Ok(napkin())
269     }
270 }
271 
full_meal() -> FullMeal272 fn full_meal() -> FullMeal {
273     FullMeal::new(meal(), Dessert::Port("Graham's Tawny".to_owned()))
274 }
275 
meal() -> Meal276 fn meal() -> Meal {
277     Meal::new(noodle(), ramen())
278 }
279 
noodle() -> Noodle280 fn noodle() -> Noodle {
281     Noodle::new("spelt".to_owned(), 100)
282 }
283 
ramen() -> Ramen284 fn ramen() -> Ramen {
285     Ramen::new("Mr Ramen".to_owned(), 72, BrothType::MISO)
286 }
287 
napkin() -> Napkin288 fn napkin() -> Napkin {
289     Napkin {}
290 }
291 
run_recursive_server<RTF, IPF, WTF, OPF>( socket: Socket, r_transport_factory: RTF, i_protocol_factory: IPF, w_transport_factory: WTF, o_protocol_factory: OPF, ) -> thrift::Result<()> where RTF: TReadTransportFactory + 'static, IPF: TInputProtocolFactory + 'static, WTF: TWriteTransportFactory + 'static, OPF: TOutputProtocolFactory + 'static,292 fn run_recursive_server<RTF, IPF, WTF, OPF>(
293     socket: Socket,
294     r_transport_factory: RTF,
295     i_protocol_factory: IPF,
296     w_transport_factory: WTF,
297     o_protocol_factory: OPF,
298 ) -> thrift::Result<()>
299 where
300     RTF: TReadTransportFactory + 'static,
301     IPF: TInputProtocolFactory + 'static,
302     WTF: TWriteTransportFactory + 'static,
303     OPF: TOutputProtocolFactory + 'static,
304 {
305     let processor = recursive::TestServiceSyncProcessor::new(RecursiveTestServerHandler {});
306     let mut server = TServer::new(
307         r_transport_factory,
308         i_protocol_factory,
309         w_transport_factory,
310         o_protocol_factory,
311         processor,
312         1,
313     );
314 
315     match socket {
316         ListenAddress(listen_address) => server.listen(listen_address),
317         UnixDomainSocket(s) => server.listen_uds(s),
318     }
319 }
320 
321 struct RecursiveTestServerHandler;
322 impl recursive::TestServiceSyncHandler for RecursiveTestServerHandler {
handle_echo_tree(&self, tree: recursive::RecTree) -> thrift::Result<recursive::RecTree>323     fn handle_echo_tree(&self, tree: recursive::RecTree) -> thrift::Result<recursive::RecTree> {
324         println!("{:?}", tree);
325         Ok(tree)
326     }
327 
handle_echo_list(&self, lst: recursive::RecList) -> thrift::Result<recursive::RecList>328     fn handle_echo_list(&self, lst: recursive::RecList) -> thrift::Result<recursive::RecList> {
329         println!("{:?}", lst);
330         Ok(lst)
331     }
332 
handle_echo_co_rec(&self, item: recursive::CoRec) -> thrift::Result<recursive::CoRec>333     fn handle_echo_co_rec(&self, item: recursive::CoRec) -> thrift::Result<recursive::CoRec> {
334         println!("{:?}", item);
335         Ok(item)
336     }
337 }
338