1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/async/TEvhttpServer.h>
21 #include <thrift/async/TAsyncBufferProcessor.h>
22 #include <thrift/transport/TBufferTransports.h>
23 #include <memory>
24 #include <evhttp.h>
25 #include <event2/buffer.h>
26 #include <event2/buffer_compat.h>
27 #include <iostream>
28
29 #ifndef HTTP_INTERNAL // libevent < 2
30 #define HTTP_INTERNAL 500
31 #endif
32
33 using apache::thrift::transport::TMemoryBuffer;
34 using std::shared_ptr;
35
36 namespace apache {
37 namespace thrift {
38 namespace async {
39
40 struct TEvhttpServer::RequestContext {
41 struct evhttp_request* req;
42 std::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf;
43 std::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf;
44
45 RequestContext(struct evhttp_request* req);
46 };
47
TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor)48 TEvhttpServer::TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor)
49 : processor_(processor), eb_(nullptr), eh_(nullptr) {
50 }
51
TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor,int port)52 TEvhttpServer::TEvhttpServer(std::shared_ptr<TAsyncBufferProcessor> processor, int port)
53 : processor_(processor), eb_(nullptr), eh_(nullptr) {
54 // Create event_base and evhttp.
55 eb_ = event_base_new();
56 if (eb_ == nullptr) {
57 throw TException("event_base_new failed");
58 }
59 eh_ = evhttp_new(eb_);
60 if (eh_ == nullptr) {
61 event_base_free(eb_);
62 throw TException("evhttp_new failed");
63 }
64
65 // Bind to port.
66 int ret = evhttp_bind_socket(eh_, nullptr, port);
67 if (ret < 0) {
68 evhttp_free(eh_);
69 event_base_free(eb_);
70 throw TException("evhttp_bind_socket failed");
71 }
72
73 // Register a handler. If you use the other constructor,
74 // you will want to do this yourself.
75 // Don't forget to unregister before destorying this TEvhttpServer.
76 evhttp_set_cb(eh_, "/", request, (void*)this);
77 }
78
~TEvhttpServer()79 TEvhttpServer::~TEvhttpServer() {
80 if (eh_ != nullptr) {
81 evhttp_free(eh_);
82 }
83 if (eb_ != nullptr) {
84 event_base_free(eb_);
85 }
86 }
87
serve()88 int TEvhttpServer::serve() {
89 if (eb_ == nullptr) {
90 throw TException("Unexpected call to TEvhttpServer::serve");
91 }
92 return event_base_dispatch(eb_);
93 }
94
RequestContext(struct evhttp_request * req)95 TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req)
96 : req(req),
97 ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer),
98 static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer)))),
99 obuf(new TMemoryBuffer()) {
100 }
101
request(struct evhttp_request * req,void * self)102 void TEvhttpServer::request(struct evhttp_request* req, void* self) {
103 try {
104 static_cast<TEvhttpServer*>(self)->process(req);
105 } catch (std::exception& e) {
106 evhttp_send_reply(req, HTTP_INTERNAL, e.what(), nullptr);
107 }
108 }
109
process(struct evhttp_request * req)110 void TEvhttpServer::process(struct evhttp_request* req) {
111 auto* ctx = new RequestContext(req);
112 return processor_->process(std::bind(&TEvhttpServer::complete,
113 this,
114 ctx,
115 std::placeholders::_1),
116 ctx->ibuf,
117 ctx->obuf);
118 }
119
complete(RequestContext * ctx,bool success)120 void TEvhttpServer::complete(RequestContext* ctx, bool success) {
121 (void)success;
122 std::unique_ptr<RequestContext> ptr(ctx);
123
124 int code = success ? 200 : 400;
125 const char* reason = success ? "OK" : "Bad Request";
126
127 int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
128 if (rv != 0) {
129 // TODO: Log an error.
130 std::cerr << "evhttp_add_header failed " << __FILE__ << ":" << __LINE__ << std::endl;
131 }
132
133 struct evbuffer* buf = evbuffer_new();
134 if (buf == nullptr) {
135 // TODO: Log an error.
136 std::cerr << "evbuffer_new failed " << __FILE__ << ":" << __LINE__ << std::endl;
137 } else {
138 uint8_t* obuf;
139 uint32_t sz;
140 ctx->obuf->getBuffer(&obuf, &sz);
141 int ret = evbuffer_add(buf, obuf, sz);
142 if (ret != 0) {
143 // TODO: Log an error.
144 std::cerr << "evhttp_add failed with " << ret << " " << __FILE__ << ":" << __LINE__
145 << std::endl;
146 }
147 }
148
149 evhttp_send_reply(ctx->req, code, reason, buf);
150 if (buf != nullptr) {
151 evbuffer_free(buf);
152 }
153 }
154
getEventBase()155 struct event_base* TEvhttpServer::getEventBase() {
156 return eb_;
157 }
158 }
159 }
160 } // apache::thrift::async
161