1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
21 #define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
22 
23 #include <thrift/protocol/TProtocolDecorator.h>
24 #include <thrift/TApplicationException.h>
25 #include <thrift/TProcessor.h>
26 #include <boost/tokenizer.hpp>
27 
28 namespace apache {
29 namespace thrift {
30 namespace protocol {
31 
32 /**
33  *  To be able to work with any protocol, we needed
34  *  to allow them to call readMessageBegin() and get a TMessage in exactly
35  *  the standard format, without the service name prepended to TMessage.name.
36  */
37 class StoredMessageProtocol : public TProtocolDecorator {
38 public:
StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,const std::string & _name,const TMessageType _type,const int32_t _seqid)39   StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,
40                         const std::string& _name,
41                         const TMessageType _type,
42                         const int32_t _seqid)
43     : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {}
44 
readMessageBegin_virt(std::string & _name,TMessageType & _type,int32_t & _seqid)45   uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override {
46 
47     _name = name;
48     _type = type;
49     _seqid = seqid;
50 
51     return 0; // (Normal TProtocol read functions return number of bytes read)
52   }
53 
54   std::string name;
55   TMessageType type;
56   int32_t seqid;
57 };
58 } // namespace protocol
59 
60 /**
61  * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
62  * a single <code>TServer</code> to provide multiple services.
63  *
64  * <p>To do so, you instantiate the processor and then register additional
65  * processors with it, as shown in the following example:</p>
66  *
67  * <blockquote><code>
68  *     std::shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
69  *
70  *     processor->registerProcessor(
71  *         "Calculator",
72  *         std::shared_ptr<TProcessor>( new CalculatorProcessor(
73  *             std::shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
74  *
75  *     processor->registerProcessor(
76  *         "WeatherReport",
77  *         std::shared_ptr<TProcessor>( new WeatherReportProcessor(
78  *             std::shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
79  *
80  *     std::shared_ptr<TServerTransport> transport(new TServerSocket(9090));
81  *     TSimpleServer server(processor, transport);
82  *
83  *     server.serve();
84  * </code></blockquote>
85  */
86 class TMultiplexedProcessor : public TProcessor {
87 public:
88   typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
89 
90   /**
91     * 'Register' a service with this <code>TMultiplexedProcessor</code>.  This
92     * allows us to broker requests to individual services by using the service
93     * name to select them at request time.
94     *
95     * \param [in] serviceName Name of a service, has to be identical to the name
96     *                         declared in the Thrift IDL, e.g. "WeatherReport".
97     * \param [in] processor   Implementation of a service, usually referred to
98     *                         as "handlers", e.g. WeatherReportHandler,
99     *                         implementing WeatherReportIf interface.
100     */
registerProcessor(const std::string & serviceName,std::shared_ptr<TProcessor> processor)101   void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
102     services[serviceName] = processor;
103   }
104 
105   /**
106    * Register a service to be called to process queries without service name
107    * \param [in] processor   Implementation of a service.
108    */
registerDefault(const std::shared_ptr<TProcessor> & processor)109   void registerDefault(const std::shared_ptr<TProcessor>& processor) {
110     defaultProcessor = processor;
111   }
112 
113   /**
114    * Chew up invalid input and return an exception to throw.
115    */
protocol_error(std::shared_ptr<protocol::TProtocol> in,std::shared_ptr<protocol::TProtocol> out,const std::string & name,int32_t seqid,const std::string & msg)116   TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
117                             std::shared_ptr<protocol::TProtocol> out,
118                             const std::string& name,
119                             int32_t seqid,
120                             const std::string& msg) const {
121     in->skip(::apache::thrift::protocol::T_STRUCT);
122     in->readMessageEnd();
123     in->getTransport()->readEnd();
124     ::apache::thrift::TApplicationException
125       x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
126         "TMultiplexedProcessor: " + msg);
127     out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
128     x.write(out.get());
129     out->writeMessageEnd();
130     out->getTransport()->writeEnd();
131     out->getTransport()->flush();
132     return TException(msg);
133 }
134 
135   /**
136    * This implementation of <code>process</code> performs the following steps:
137    *
138    * <ol>
139    *     <li>Read the beginning of the message.</li>
140    *     <li>Extract the service name from the message.</li>
141    *     <li>Using the service name to locate the appropriate processor.</li>
142    *     <li>Dispatch to the processor, with a decorated instance of TProtocol
143    *         that allows readMessageBegin() to return the original TMessage.</li>
144    * </ol>
145    *
146    * \throws TException If the message type is not T_CALL or T_ONEWAY, if
147    * the service name was not found in the message, or if the service
148    * name was not found in the service map.
149    */
process(std::shared_ptr<protocol::TProtocol> in,std::shared_ptr<protocol::TProtocol> out,void * connectionContext)150   bool process(std::shared_ptr<protocol::TProtocol> in,
151                std::shared_ptr<protocol::TProtocol> out,
152                void* connectionContext) override {
153     std::string name;
154     protocol::TMessageType type;
155     int32_t seqid;
156 
157     // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
158     // message header.  This pulls the message "off the wire", which we'll
159     // deal with at the end of this method.
160     in->readMessageBegin(name, type, seqid);
161 
162     if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
163       // Unexpected message type.
164       throw protocol_error(in, out, name, seqid, "Unexpected message type");
165     }
166 
167     // Extract the service name
168     boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
169 
170     std::vector<std::string> tokens;
171     std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
172 
173     // A valid message should consist of two tokens: the service
174     // name and the name of the method to call.
175     if (tokens.size() == 2) {
176       // Search for a processor associated with this service name.
177       auto it = services.find(tokens[0]);
178 
179       if (it != services.end()) {
180         std::shared_ptr<TProcessor> processor = it->second;
181         // Let the processor registered for this service name
182         // process the message.
183         return processor
184             ->process(std::shared_ptr<protocol::TProtocol>(
185                           new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
186                       out,
187                       connectionContext);
188       } else {
189         // Unknown service.
190         throw protocol_error(in, out, name, seqid,
191             "Unknown service: " + tokens[0] +
192 				". Did you forget to call registerProcessor()?");
193       }
194     } else if (tokens.size() == 1) {
195 	  if (defaultProcessor) {
196         // non-multiplexed client forwards to default processor
197         return defaultProcessor
198             ->process(std::shared_ptr<protocol::TProtocol>(
199                           new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
200                       out,
201                       connectionContext);
202 	  } else {
203 		throw protocol_error(in, out, name, seqid,
204 			"Non-multiplexed client request dropped. "
205 			"Did you forget to call defaultProcessor()?");
206 	  }
207     } else {
208 		throw protocol_error(in, out, name, seqid,
209 		    "Wrong number of tokens.");
210     }
211   }
212 
213 private:
214   /** Map of service processor objects, indexed by service names. */
215   services_t services;
216 
217   //! If a non-multi client requests something, it goes to the
218   //! default processor (if one is defined) for backwards compatibility.
219   std::shared_ptr<TProcessor> defaultProcessor;
220 };
221 }
222 }
223 
224 #endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_
225