1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19var util = require('util');
20var http = require('http');
21var https = require('https');
22var EventEmitter = require('events').EventEmitter;
23var thrift = require('./thrift');
24
25var TBufferedTransport = require('./buffered_transport');
26var TBinaryProtocol = require('./binary_protocol');
27var InputBufferUnderrunError = require('./input_buffer_underrun_error');
28
29var createClient = require('./create_client');
30
31/**
32 * @class
33 * @name ConnectOptions
34 * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc).
35 * @property {string} protocol - The Thrift serialization protocol to use (TBinaryProtocol, etc.).
36 * @property {string} path - The URL path to POST to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.).
37 * @property {object} headers - A standard Node.js header hash, an object hash containing key/value
38 *        pairs where the key is the header name string and the value is the header value string.
39 * @property {boolean} https - True causes the connection to use https, otherwise http is used.
40 * @property {object} nodeOptions - Options passed on to node.
41 * @example
42 *     //Use a connection that requires ssl/tls, closes the connection after each request,
43 *     //  uses the buffered transport layer, uses the JSON protocol and directs RPC traffic
44 *     //  to https://thrift.example.com:9090/hello
45 *     var thrift = require('thrift');
46 *     var options = {
47 *        transport: thrift.TBufferedTransport,
48 *        protocol: thrift.TJSONProtocol,
49 *        path: "/hello",
50 *        headers: {"Connection": "close"},
51 *        https: true
52 *     };
53 *     var con = thrift.createHttpConnection("thrift.example.com", 9090, options);
54 *     var client = thrift.createHttpClient(myService, connection);
55 *     client.myServiceFunction();
56 */
57
58/**
59 * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than
60 *    instantiating directly).
61 * @constructor
62 * @param {ConnectOptions} options - The configuration options to use.
63 * @throws {error} Exceptions other than InputBufferUnderrunError are rethrown
64 * @event {error} The "error" event is fired when a Node.js error event occurs during
65 *     request or response processing, in which case the node error is passed on. An "error"
66 *     event may also be fired when the connection can not map a response back to the
67 *     appropriate client (an internal error), generating a TApplicationException.
68 * @classdesc HttpConnection objects provide Thrift end point transport
69 *     semantics implemented over the Node.js http.request() method.
70 * @see {@link createHttpConnection}
71 */
72var HttpConnection = exports.HttpConnection = function(options) {
73  //Initialize the emitter base object
74  EventEmitter.call(this);
75
76  //Set configuration
77  var self = this;
78  this.options = options || {};
79  this.host = this.options.host;
80  this.port = this.options.port;
81  this.socketPath = this.options.socketPath;
82  this.https = this.options.https || false;
83  this.transport = this.options.transport || TBufferedTransport;
84  this.protocol = this.options.protocol || TBinaryProtocol;
85
86  //Prepare Node.js options
87  this.nodeOptions = {
88    host: this.host,
89    port: this.port,
90    socketPath: this.socketPath,
91    path: this.options.path || '/',
92    method: 'POST',
93    headers: this.options.headers || {},
94    responseType: this.options.responseType || null
95  };
96  for (var attrname in this.options.nodeOptions) {
97    this.nodeOptions[attrname] = this.options.nodeOptions[attrname];
98  }
99  /*jshint -W069 */
100  if (! this.nodeOptions.headers['Connection']) {
101    this.nodeOptions.headers['Connection'] = 'keep-alive';
102  }
103  /*jshint +W069 */
104
105  //The sequence map is used to map seqIDs back to the
106  //  calling client in multiplexed scenarios
107  this.seqId2Service = {};
108
109  function decodeCallback(transport_with_data) {
110    var proto = new self.protocol(transport_with_data);
111    try {
112      while (true) {
113        var header = proto.readMessageBegin();
114        var dummy_seqid = header.rseqid * -1;
115        var client = self.client;
116        //The Multiplexed Protocol stores a hash of seqid to service names
117        //  in seqId2Service. If the SeqId is found in the hash we need to
118        //  lookup the appropriate client for this call.
119        //  The client var is a single client object when not multiplexing,
120        //  when using multiplexing it is a service name keyed hash of client
121        //  objects.
122        //NOTE: The 2 way interdependencies between protocols, transports,
123        //  connections and clients in the Node.js implementation are irregular
124        //  and make the implementation difficult to extend and maintain. We
125        //  should bring this stuff inline with typical thrift I/O stack
126        //  operation soon.
127        //  --ra
128        var service_name = self.seqId2Service[header.rseqid];
129        if (service_name) {
130          client = self.client[service_name];
131          delete self.seqId2Service[header.rseqid];
132        }
133        /*jshint -W083 */
134        client._reqs[dummy_seqid] = function(err, success){
135          transport_with_data.commitPosition();
136          var clientCallback = client._reqs[header.rseqid];
137          delete client._reqs[header.rseqid];
138          if (clientCallback) {
139            process.nextTick(function() {
140              clientCallback(err, success);
141            });
142          }
143        };
144        /*jshint +W083 */
145        if(client['recv_' + header.fname]) {
146          client['recv_' + header.fname](proto, header.mtype, dummy_seqid);
147        } else {
148          delete client._reqs[dummy_seqid];
149          self.emit("error",
150                    new thrift.TApplicationException(
151                       thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
152                       "Received a response to an unknown RPC function"));
153        }
154      }
155    }
156    catch (e) {
157      if (e instanceof InputBufferUnderrunError) {
158        transport_with_data.rollbackPosition();
159      } else {
160        self.emit('error', e);
161      }
162    }
163  }
164
165
166  //Response handler
167  //////////////////////////////////////////////////
168  this.responseCallback = function(response) {
169    var data = [];
170    var dataLen = 0;
171
172    if (response.statusCode !== 200) {
173      this.emit("error", new THTTPException(response));
174    }
175
176    response.on('error', function (e) {
177      self.emit("error", e);
178    });
179
180    // When running directly under node, chunk will be a buffer,
181    // however, when running in a Browser (e.g. Browserify), chunk
182    // will be a string or an ArrayBuffer.
183    response.on('data', function (chunk) {
184      if ((typeof chunk == 'string') ||
185          (Object.prototype.toString.call(chunk) == '[object Uint8Array]')) {
186        // Wrap ArrayBuffer/string in a Buffer so data[i].copy will work
187        data.push(new Buffer(chunk));
188      } else {
189        data.push(chunk);
190      }
191      dataLen += chunk.length;
192    });
193
194    response.on('end', function(){
195      var buf = new Buffer(dataLen);
196      for (var i=0, len=data.length, pos=0; i<len; i++) {
197        data[i].copy(buf, pos);
198        pos += data[i].length;
199      }
200      //Get the receiver function for the transport and
201      //  call it with the buffer
202      self.transport.receiver(decodeCallback)(buf);
203    });
204  };
205};
206util.inherits(HttpConnection, EventEmitter);
207
208/**
209 * Writes Thrift message data to the connection
210 * @param {Buffer} data - A Node.js Buffer containing the data to write
211 * @returns {void} No return value.
212 * @event {error} the "error" event is raised upon request failure passing the
213 *     Node.js error object to the listener.
214 */
215HttpConnection.prototype.write = function(data) {
216  var self = this;
217  var opts = self.nodeOptions;
218  opts.headers["Content-length"] = data.length;
219  if (!opts.headers["Content-Type"])
220    opts.headers["Content-Type"] = "application/x-thrift";
221  var req = (self.https) ?
222      https.request(opts, self.responseCallback) :
223      http.request(opts, self.responseCallback);
224  req.on('error', function(err) {
225    self.emit("error", err);
226  });
227  req.write(data);
228  req.end();
229};
230
231/**
232 * Creates a new HttpConnection object, used by Thrift clients to connect
233 *    to Thrift HTTP based servers.
234 * @param {string} host - The host name or IP to connect to.
235 * @param {number} port - The TCP port to connect to.
236 * @param {ConnectOptions} options - The configuration options to use.
237 * @returns {HttpConnection} The connection object.
238 * @see {@link ConnectOptions}
239 */
240exports.createHttpConnection = function(host, port, options) {
241  options.host = host;
242  options.port = port || 80;
243  return new HttpConnection(options);
244};
245
246exports.createHttpUDSConnection = function(path, options) {
247  options.socketPath = path;
248  return new HttpConnection(options);
249};
250
251exports.createHttpClient = createClient
252
253
254function THTTPException(response) {
255  thrift.TApplicationException.call(this);
256  if (Error.captureStackTrace !== undefined) {
257    Error.captureStackTrace(this, this.constructor);
258  }
259
260  this.name = this.constructor.name;
261  this.statusCode = response.statusCode;
262  this.response = response;
263  this.type = thrift.TApplicationExceptionType.PROTOCOL_ERROR;
264  this.message = "Received a response with a bad HTTP status code: " + response.statusCode;
265}
266util.inherits(THTTPException, thrift.TApplicationException);
267