1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * HTTP tranpsort implementation, modelled after the C++ one.
22  *
23  * Unfortunately, libcurl is quite heavyweight and supports only client-side
24  * applications. This is an implementation of the basic HTTP/1.1 parts
25  * supporting HTTP 100 Continue, chunked transfer encoding, keepalive, etc.
26  */
27 module thrift.transport.http;
28 
29 import std.algorithm : canFind, countUntil, endsWith, findSplit, min, startsWith;
30 import std.ascii : toLower;
31 import std.array : empty;
32 import std.conv : parse, to;
33 import std.datetime : Clock, UTC;
34 import std.string : stripLeft;
35 import thrift.base : VERSION;
36 import thrift.transport.base;
37 import thrift.transport.memory;
38 import thrift.transport.socket;
39 
40 /**
41  * Base class for both client- and server-side HTTP transports.
42  */
43 abstract class THttpTransport : TBaseTransport {
this(TTransport transport)44   this(TTransport transport) {
45     transport_ = transport;
46     readHeaders_ = true;
47     httpBuf_ = new ubyte[HTTP_BUFFER_SIZE];
48     httpBufRemaining_ = httpBuf_[0 .. 0];
49     readBuffer_ = new TMemoryBuffer;
50     writeBuffer_ = new TMemoryBuffer;
51   }
52 
isOpen()53   override bool isOpen() {
54     return transport_.isOpen();
55   }
56 
peek()57   override bool peek() {
58     return transport_.peek();
59   }
60 
open()61   override void open() {
62     transport_.open();
63   }
64 
close()65   override void close() {
66     transport_.close();
67   }
68 
read(ubyte[]buf)69   override size_t read(ubyte[] buf) {
70     if (!readBuffer_.peek()) {
71       readBuffer_.reset();
72 
73       if (!refill()) return 0;
74 
75       if (readHeaders_) {
76         readHeaders();
77       }
78 
79       size_t got;
80       if (chunked_) {
81         got = readChunked();
82       } else {
83         got = readContent(contentLength_);
84       }
85       readHeaders_ = true;
86 
87       if (got == 0) return 0;
88     }
89     return readBuffer_.read(buf);
90   }
91 
readEnd()92   override size_t readEnd() {
93     // Read any pending chunked data (footers etc.)
94     if (chunked_) {
95       while (!chunkedDone_) {
96         readChunked();
97       }
98     }
99     return 0;
100   }
101 
write(in ubyte[]buf)102   override void write(in ubyte[] buf) {
103     writeBuffer_.write(buf);
104   }
105 
flush()106   override void flush() {
107     auto data = writeBuffer_.getContents();
108     string header = getHeader(data.length);
109 
110     transport_.write(cast(const(ubyte)[]) header);
111     transport_.write(data);
112     transport_.flush();
113 
114     // Reset the buffer and header variables.
115     writeBuffer_.reset();
116     readHeaders_ = true;
117   }
118 
119   /**
120    * The size of the buffer to read HTTP requests into, in bytes. Will expand
121    * as required.
122    */
123   enum HTTP_BUFFER_SIZE = 1024;
124 
125 protected:
126   abstract string getHeader(size_t dataLength);
127   abstract bool parseStatusLine(const(ubyte)[] status);
128 
parseHeader(const (ubyte)[]header)129   void parseHeader(const(ubyte)[] header) {
130     auto split = findSplit(header, [':']);
131     if (split[1].empty) {
132       // No colon found.
133       return;
134     }
135 
136     static bool compToLower(ubyte a, ubyte b) {
137       return toLower(cast(char)a) == toLower(cast(char)b);
138     }
139 
140     if (startsWith!compToLower(split[0], cast(ubyte[])"transfer-encoding")) {
141       if (endsWith!compToLower(split[2], cast(ubyte[])"chunked")) {
142         chunked_ = true;
143       }
144     } else if (startsWith!compToLower(split[0], cast(ubyte[])"content-length")) {
145       chunked_ = false;
146       auto lengthString = stripLeft(cast(const(char)[])split[2]);
147       contentLength_ = parse!size_t(lengthString);
148     }
149   }
150 
151 private:
readLine()152   ubyte[] readLine() {
153     while (true) {
154       auto split = findSplit(httpBufRemaining_, cast(ubyte[])"\r\n");
155 
156       if (split[1].empty) {
157         // No CRLF yet, move whatever we have now to front and refill.
158         if (httpBufRemaining_.empty) {
159           httpBufRemaining_ = httpBuf_[0 .. 0];
160         } else {
161           httpBuf_[0 .. httpBufRemaining_.length] = httpBufRemaining_;
162           httpBufRemaining_ = httpBuf_[0 .. httpBufRemaining_.length];
163         }
164 
165         if (!refill()) {
166           auto buf = httpBufRemaining_;
167           httpBufRemaining_ = httpBufRemaining_[$ - 1 .. $ - 1];
168           return buf;
169         }
170       } else {
171         // Set the remaining buffer to the part after \r\n and return the part
172         // (line) before it.
173         httpBufRemaining_ = split[2];
174         return split[0];
175       }
176     }
177   }
178 
readHeaders()179   void readHeaders() {
180     // Initialize headers state variables
181     contentLength_ = 0;
182     chunked_ = false;
183     chunkedDone_ = false;
184     chunkSize_ = 0;
185 
186     // Control state flow
187     bool statusLine = true;
188     bool finished;
189 
190     // Loop until headers are finished
191     while (true) {
192       auto line = readLine();
193 
194       if (line.length == 0) {
195         if (finished) {
196           readHeaders_ = false;
197           return;
198         } else {
199           // Must have been an HTTP 100, keep going for another status line
200           statusLine = true;
201         }
202       } else {
203         if (statusLine) {
204           statusLine = false;
205           finished = parseStatusLine(line);
206         } else {
207           parseHeader(line);
208         }
209       }
210     }
211   }
212 
readChunked()213   size_t readChunked() {
214     size_t length;
215 
216     auto line = readLine();
217     size_t chunkSize;
218     try {
219       auto charLine = cast(char[])line;
220       chunkSize = parse!size_t(charLine, 16);
221     } catch (Exception e) {
222       throw new TTransportException("Invalid chunk size: " ~ to!string(line),
223         TTransportException.Type.CORRUPTED_DATA);
224     }
225 
226     if (chunkSize == 0) {
227       readChunkedFooters();
228     } else {
229       // Read data content
230       length += readContent(chunkSize);
231       // Read trailing CRLF after content
232       readLine();
233     }
234     return length;
235   }
236 
readChunkedFooters()237   void readChunkedFooters() {
238     while (true) {
239       auto line = readLine();
240       if (line.length == 0) {
241         chunkedDone_ = true;
242         break;
243       }
244     }
245   }
246 
readContent(size_t size)247   size_t readContent(size_t size) {
248     auto need = size;
249     while (need > 0) {
250       if (httpBufRemaining_.length == 0) {
251         // We have given all the data, reset position to head of the buffer.
252         httpBufRemaining_ = httpBuf_[0 .. 0];
253         if (!refill()) return size - need;
254       }
255 
256       auto give = min(httpBufRemaining_.length, need);
257       readBuffer_.write(cast(ubyte[])httpBufRemaining_[0 .. give]);
258       httpBufRemaining_ = httpBufRemaining_[give .. $];
259       need -= give;
260     }
261     return size;
262   }
263 
refill()264   bool refill() {
265     // Is there a nicer way to do this?
266     auto indexBegin = httpBufRemaining_.ptr - httpBuf_.ptr;
267     auto indexEnd = indexBegin + httpBufRemaining_.length;
268 
269     if (httpBuf_.length - indexEnd <= (httpBuf_.length / 4)) {
270       httpBuf_.length *= 2;
271     }
272 
273     // Read more data.
274     auto got = transport_.read(cast(ubyte[])httpBuf_[indexEnd .. $]);
275     if (got == 0) return false;
276     httpBufRemaining_ = httpBuf_[indexBegin .. indexEnd + got];
277     return true;
278   }
279 
280   TTransport transport_;
281 
282   TMemoryBuffer writeBuffer_;
283   TMemoryBuffer readBuffer_;
284 
285   bool readHeaders_;
286   bool chunked_;
287   bool chunkedDone_;
288   size_t chunkSize_;
289   size_t contentLength_;
290 
291   ubyte[] httpBuf_;
292   ubyte[] httpBufRemaining_;
293 }
294 
295 /**
296  * HTTP client transport.
297  */
298 final class TClientHttpTransport : THttpTransport {
299   /**
300    * Constructs a client http transport operating on the passed underlying
301    * transport.
302    *
303    * Params:
304    *   transport = The underlying transport used for the actual I/O.
305    *   host = The HTTP host string.
306    *   path = The HTTP path string.
307    */
this(TTransport transport,string host,string path)308   this(TTransport transport, string host, string path) {
309     super(transport);
310     host_ = host;
311     path_ = path;
312   }
313 
314   /**
315    * Convenience overload for constructing a client HTTP transport using a
316    * TSocket connecting to the specified host and port.
317    *
318    * Params:
319    *   host = The server to connect to, also used as HTTP host string.
320    *   port = The port to connect to.
321    *   path = The HTTP path string.
322    */
this(string host,ushort port,string path)323   this(string host, ushort port, string path) {
324     this(new TSocket(host, port), host, path);
325   }
326 
327 protected:
getHeader(size_t dataLength)328   override string getHeader(size_t dataLength) {
329     return "POST " ~ path_ ~ " HTTP/1.1\r\n" ~
330       "Host: " ~ host_ ~ "\r\n" ~
331       "Content-Type: application/x-thrift\r\n" ~
332       "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
333       "Accept: application/x-thrift\r\n" ~
334       "User-Agent: Thrift/" ~ VERSION ~ " (D/TClientHttpTransport)\r\n" ~
335       "\r\n";
336   }
337 
parseStatusLine(const (ubyte)[]status)338   override bool parseStatusLine(const(ubyte)[] status) {
339     // HTTP-Version SP Status-Code SP Reason-Phrase CRLF
340     auto firstSplit = findSplit(status, [' ']);
341     if (firstSplit[1].empty) {
342       throw new TTransportException("Bad status: " ~ to!string(status),
343         TTransportException.Type.CORRUPTED_DATA);
344     }
345 
346     auto codeReason = firstSplit[2][countUntil!"a != b"(firstSplit[2], ' ') .. $];
347     auto secondSplit = findSplit(codeReason, [' ']);
348     if (secondSplit[1].empty) {
349       throw new TTransportException("Bad status: " ~ to!string(status),
350         TTransportException.Type.CORRUPTED_DATA);
351     }
352 
353     if (secondSplit[0] == "200") {
354       // HTTP 200 = OK, we got the response
355       return true;
356     } else if (secondSplit[0] == "100") {
357       // HTTP 100 = continue, just keep reading
358       return false;
359     }
360 
361     throw new TTransportException("Bad status (unhandled status code): " ~
362       to!string(cast(const(char[]))status), TTransportException.Type.CORRUPTED_DATA);
363   }
364 
365 private:
366   string host_;
367   string path_;
368 }
369 
370 /**
371  * HTTP server transport.
372  */
373 final class TServerHttpTransport : THttpTransport {
374   /**
375    * Constructs a new instance.
376    *
377    * Param:
378    *   transport = The underlying transport used for the actual I/O.
379    */
this(TTransport transport)380   this(TTransport transport) {
381     super(transport);
382   }
383 
384 protected:
getHeader(size_t dataLength)385   override string getHeader(size_t dataLength) {
386     return "HTTP/1.1 200 OK\r\n" ~
387       "Date: " ~ getRFC1123Time() ~ "\r\n" ~
388       "Server: Thrift/" ~ VERSION ~ "\r\n" ~
389       "Content-Type: application/x-thrift\r\n" ~
390       "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
391       "Connection: Keep-Alive\r\n" ~
392       "\r\n";
393   }
394 
parseStatusLine(const (ubyte)[]status)395   override bool parseStatusLine(const(ubyte)[] status) {
396     // Method SP Request-URI SP HTTP-Version CRLF.
397     auto split = findSplit(status, [' ']);
398     if (split[1].empty) {
399       throw new TTransportException("Bad status: " ~ to!string(status),
400         TTransportException.Type.CORRUPTED_DATA);
401     }
402 
403     auto uriVersion = split[2][countUntil!"a != b"(split[2], ' ') .. $];
404     if (!canFind(uriVersion, ' ')) {
405       throw new TTransportException("Bad status: " ~ to!string(status),
406         TTransportException.Type.CORRUPTED_DATA);
407     }
408 
409     if (split[0] == "POST") {
410       // POST method ok, looking for content.
411       return true;
412     }
413 
414     throw new TTransportException("Bad status (unsupported method): " ~
415       to!string(status), TTransportException.Type.CORRUPTED_DATA);
416   }
417 }
418 
419 /**
420  * Wraps a transport into a HTTP server protocol.
421  */
422 alias TWrapperTransportFactory!TServerHttpTransport TServerHttpTransportFactory;
423 
424 private {
425   import std.string : format;
getRFC1123Time()426   string getRFC1123Time() {
427     auto sysTime = Clock.currTime(UTC());
428 
429     auto dayName = capMemberName(sysTime.dayOfWeek);
430     auto monthName = capMemberName(sysTime.month);
431 
432     return format("%s, %s %s %s %s:%s:%s GMT", dayName, sysTime.day,
433       monthName, sysTime.year, sysTime.hour, sysTime.minute, sysTime.second);
434   }
435 
436   import std.ascii : toUpper;
437   import std.traits : EnumMembers;
438   string capMemberName(T)(T val) if (is(T == enum)) {
439     foreach (i, e; EnumMembers!T) {
440       enum name = __traits(derivedMembers, T)[i];
441       enum capName = cast(char) toUpper(name[0]) ~ name [1 .. $];
442       if (val == e) {
443         return capName;
444       }
445     }
446     throw new Exception("Not a member of " ~ T.stringof ~ ": " ~ to!string(val));
447   }
448 
449   unittest {
450     enum Foo {
451       bar,
452       bAZ
453     }
454 
455     import std.exception;
456     enforce(capMemberName(Foo.bar) == "Bar");
457     enforce(capMemberName(Foo.bAZ) == "BAZ");
458   }
459 }
460