1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_ 21 #define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1 22 23 #include <thrift/protocol/TProtocolDecorator.h> 24 #include <thrift/TApplicationException.h> 25 #include <thrift/TProcessor.h> 26 #include <boost/tokenizer.hpp> 27 28 namespace apache { 29 namespace thrift { 30 namespace protocol { 31 32 /** 33 * To be able to work with any protocol, we needed 34 * to allow them to call readMessageBegin() and get a TMessage in exactly 35 * the standard format, without the service name prepended to TMessage.name. 36 */ 37 class StoredMessageProtocol : public TProtocolDecorator { 38 public: StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,const std::string & _name,const TMessageType _type,const int32_t _seqid)39 StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol, 40 const std::string& _name, 41 const TMessageType _type, 42 const int32_t _seqid) 43 : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {} 44 readMessageBegin_virt(std::string & _name,TMessageType & _type,int32_t & _seqid)45 uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override { 46 47 _name = name; 48 _type = type; 49 _seqid = seqid; 50 51 return 0; // (Normal TProtocol read functions return number of bytes read) 52 } 53 54 std::string name; 55 TMessageType type; 56 int32_t seqid; 57 }; 58 } // namespace protocol 59 60 /** 61 * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing 62 * a single <code>TServer</code> to provide multiple services. 63 * 64 * <p>To do so, you instantiate the processor and then register additional 65 * processors with it, as shown in the following example:</p> 66 * 67 * <blockquote><code> 68 * std::shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor()); 69 * 70 * processor->registerProcessor( 71 * "Calculator", 72 * std::shared_ptr<TProcessor>( new CalculatorProcessor( 73 * std::shared_ptr<CalculatorHandler>( new CalculatorHandler())))); 74 * 75 * processor->registerProcessor( 76 * "WeatherReport", 77 * std::shared_ptr<TProcessor>( new WeatherReportProcessor( 78 * std::shared_ptr<WeatherReportHandler>( new WeatherReportHandler())))); 79 * 80 * std::shared_ptr<TServerTransport> transport(new TServerSocket(9090)); 81 * TSimpleServer server(processor, transport); 82 * 83 * server.serve(); 84 * </code></blockquote> 85 */ 86 class TMultiplexedProcessor : public TProcessor { 87 public: 88 typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t; 89 90 /** 91 * 'Register' a service with this <code>TMultiplexedProcessor</code>. This 92 * allows us to broker requests to individual services by using the service 93 * name to select them at request time. 94 * 95 * \param [in] serviceName Name of a service, has to be identical to the name 96 * declared in the Thrift IDL, e.g. "WeatherReport". 97 * \param [in] processor Implementation of a service, usually referred to 98 * as "handlers", e.g. WeatherReportHandler, 99 * implementing WeatherReportIf interface. 100 */ registerProcessor(const std::string & serviceName,std::shared_ptr<TProcessor> processor)101 void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) { 102 services[serviceName] = processor; 103 } 104 105 /** 106 * Register a service to be called to process queries without service name 107 * \param [in] processor Implementation of a service. 108 */ registerDefault(const std::shared_ptr<TProcessor> & processor)109 void registerDefault(const std::shared_ptr<TProcessor>& processor) { 110 defaultProcessor = processor; 111 } 112 113 /** 114 * Chew up invalid input and return an exception to throw. 115 */ protocol_error(std::shared_ptr<protocol::TProtocol> in,std::shared_ptr<protocol::TProtocol> out,const std::string & name,int32_t seqid,const std::string & msg)116 TException protocol_error(std::shared_ptr<protocol::TProtocol> in, 117 std::shared_ptr<protocol::TProtocol> out, 118 const std::string& name, 119 int32_t seqid, 120 const std::string& msg) const { 121 in->skip(::apache::thrift::protocol::T_STRUCT); 122 in->readMessageEnd(); 123 in->getTransport()->readEnd(); 124 ::apache::thrift::TApplicationException 125 x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, 126 "TMultiplexedProcessor: " + msg); 127 out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); 128 x.write(out.get()); 129 out->writeMessageEnd(); 130 out->getTransport()->writeEnd(); 131 out->getTransport()->flush(); 132 return TException(msg); 133 } 134 135 /** 136 * This implementation of <code>process</code> performs the following steps: 137 * 138 * <ol> 139 * <li>Read the beginning of the message.</li> 140 * <li>Extract the service name from the message.</li> 141 * <li>Using the service name to locate the appropriate processor.</li> 142 * <li>Dispatch to the processor, with a decorated instance of TProtocol 143 * that allows readMessageBegin() to return the original TMessage.</li> 144 * </ol> 145 * 146 * \throws TException If the message type is not T_CALL or T_ONEWAY, if 147 * the service name was not found in the message, or if the service 148 * name was not found in the service map. 149 */ process(std::shared_ptr<protocol::TProtocol> in,std::shared_ptr<protocol::TProtocol> out,void * connectionContext)150 bool process(std::shared_ptr<protocol::TProtocol> in, 151 std::shared_ptr<protocol::TProtocol> out, 152 void* connectionContext) override { 153 std::string name; 154 protocol::TMessageType type; 155 int32_t seqid; 156 157 // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the 158 // message header. This pulls the message "off the wire", which we'll 159 // deal with at the end of this method. 160 in->readMessageBegin(name, type, seqid); 161 162 if (type != protocol::T_CALL && type != protocol::T_ONEWAY) { 163 // Unexpected message type. 164 throw protocol_error(in, out, name, seqid, "Unexpected message type"); 165 } 166 167 // Extract the service name 168 boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":")); 169 170 std::vector<std::string> tokens; 171 std::copy(tok.begin(), tok.end(), std::back_inserter(tokens)); 172 173 // A valid message should consist of two tokens: the service 174 // name and the name of the method to call. 175 if (tokens.size() == 2) { 176 // Search for a processor associated with this service name. 177 auto it = services.find(tokens[0]); 178 179 if (it != services.end()) { 180 std::shared_ptr<TProcessor> processor = it->second; 181 // Let the processor registered for this service name 182 // process the message. 183 return processor 184 ->process(std::shared_ptr<protocol::TProtocol>( 185 new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)), 186 out, 187 connectionContext); 188 } else { 189 // Unknown service. 190 throw protocol_error(in, out, name, seqid, 191 "Unknown service: " + tokens[0] + 192 ". Did you forget to call registerProcessor()?"); 193 } 194 } else if (tokens.size() == 1) { 195 if (defaultProcessor) { 196 // non-multiplexed client forwards to default processor 197 return defaultProcessor 198 ->process(std::shared_ptr<protocol::TProtocol>( 199 new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)), 200 out, 201 connectionContext); 202 } else { 203 throw protocol_error(in, out, name, seqid, 204 "Non-multiplexed client request dropped. " 205 "Did you forget to call defaultProcessor()?"); 206 } 207 } else { 208 throw protocol_error(in, out, name, seqid, 209 "Wrong number of tokens."); 210 } 211 } 212 213 private: 214 /** Map of service processor objects, indexed by service names. */ 215 services_t services; 216 217 //! If a non-multi client requests something, it goes to the 218 //! default processor (if one is defined) for backwards compatibility. 219 std::shared_ptr<TProcessor> defaultProcessor; 220 }; 221 } 222 } 223 224 #endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_ 225