1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/async/TEvhttpServer.h>
21 #include <thrift/async/TAsyncBufferProcessor.h>
22 #include <thrift/transport/TBufferTransports.h>
23 #include <memory>
24 #include <evhttp.h>
25 #include <event2/buffer.h>
26 #include <event2/buffer_compat.h>
27 #include <iostream>
28 
29 #ifndef HTTP_INTERNAL // libevent < 2
30 #define HTTP_INTERNAL 500
31 #endif
32 
33 using apache::thrift::transport::TMemoryBuffer;
34 using std::shared_ptr;
35 
36 namespace apache {
37 namespace thrift {
38 namespace async {
39 
40 struct TEvhttpServer::RequestContext {
41   struct evhttp_request* req;
42   std::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf;
43   std::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf;
44 
45   RequestContext(struct evhttp_request* req);
46 };
47 
TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor)48 TEvhttpServer::TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor)
49   : processor_(processor), eb_(nullptr), eh_(nullptr) {
50 }
51 
TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor,int port)52 TEvhttpServer::TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor, int port)
53   : processor_(processor), eb_(nullptr), eh_(nullptr) {
54   // Create event_base and evhttp.
55   eb_ = event_base_new();
56   if (eb_ == nullptr) {
57     throw TException("event_base_new failed");
58   }
59   eh_ = evhttp_new(eb_);
60   if (eh_ == nullptr) {
61     event_base_free(eb_);
62     throw TException("evhttp_new failed");
63   }
64 
65   // Bind to port.
66   int ret = evhttp_bind_socket(eh_, nullptr, port);
67   if (ret < 0) {
68     evhttp_free(eh_);
69     event_base_free(eb_);
70     throw TException("evhttp_bind_socket failed");
71   }
72 
73   // Register a handler.  If you use the other constructor,
74   // you will want to do this yourself.
75   // Don't forget to unregister before destorying this TEvhttpServer.
76   evhttp_set_cb(eh_, "/", request, (void*)this);
77 }
78 
~TEvhttpServer()79 TEvhttpServer::~TEvhttpServer() {
80   if (eh_ != nullptr) {
81     evhttp_free(eh_);
82   }
83   if (eb_ != nullptr) {
84     event_base_free(eb_);
85   }
86 }
87 
serve()88 int TEvhttpServer::serve() {
89   if (eb_ == nullptr) {
90     throw TException("Unexpected call to TEvhttpServer::serve");
91   }
92   return event_base_dispatch(eb_);
93 }
94 
RequestContext(struct evhttp_request * req)95 TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req)
96   : req(req),
97     ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer),
98                            static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer)))),
99     obuf(new TMemoryBuffer()) {
100 }
101 
request(struct evhttp_request * req,void * self)102 void TEvhttpServer::request(struct evhttp_request* req, void* self) {
103   try {
104     static_cast<TEvhttpServer*>(self)->process(req);
105   } catch (std::exception& e) {
106     evhttp_send_reply(req, HTTP_INTERNAL, e.what(), nullptr);
107   }
108 }
109 
process(struct evhttp_request * req)110 void TEvhttpServer::process(struct evhttp_request* req) {
111   auto* ctx = new RequestContext(req);
112   return processor_->process(std::bind(&TEvhttpServer::complete,
113                                                           this,
114                                                           ctx,
115                                                           std::placeholders::_1),
116                              ctx->ibuf,
117                              ctx->obuf);
118 }
119 
complete(RequestContext * ctx,bool success)120 void TEvhttpServer::complete(RequestContext* ctx, bool success) {
121   (void)success;
122   std::unique_ptr<RequestContext> ptr(ctx);
123 
124   int code = success ? 200 : 400;
125   const char* reason = success ? "OK" : "Bad Request";
126 
127   int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
128   if (rv != 0) {
129     // TODO: Log an error.
130     std::cerr << "evhttp_add_header failed " << __FILE__ << ":" << __LINE__ << std::endl;
131   }
132 
133   struct evbuffer* buf = evbuffer_new();
134   if (buf == nullptr) {
135     // TODO: Log an error.
136     std::cerr << "evbuffer_new failed " << __FILE__ << ":" << __LINE__ << std::endl;
137   } else {
138     uint8_t* obuf;
139     uint32_t sz;
140     ctx->obuf->getBuffer(&obuf, &sz);
141     int ret = evbuffer_add(buf, obuf, sz);
142     if (ret != 0) {
143       // TODO: Log an error.
144       std::cerr << "evhttp_add failed with " << ret << " " << __FILE__ << ":" << __LINE__
145                 << std::endl;
146     }
147   }
148 
149   evhttp_send_reply(ctx->req, code, reason, buf);
150   if (buf != nullptr) {
151     evbuffer_free(buf);
152   }
153 }
154 
getEventBase()155 struct event_base* TEvhttpServer::getEventBase() {
156   return eb_;
157 }
158 }
159 }
160 } // apache::thrift::async
161