1/* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19var util = require('util'); 20var http = require('http'); 21var https = require('https'); 22var EventEmitter = require('events').EventEmitter; 23var thrift = require('./thrift'); 24 25var TBufferedTransport = require('./buffered_transport'); 26var TBinaryProtocol = require('./binary_protocol'); 27var InputBufferUnderrunError = require('./input_buffer_underrun_error'); 28 29var createClient = require('./create_client'); 30 31/** 32 * @class 33 * @name ConnectOptions 34 * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc). 35 * @property {string} protocol - The Thrift serialization protocol to use (TBinaryProtocol, etc.). 36 * @property {string} path - The URL path to POST to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.). 37 * @property {object} headers - A standard Node.js header hash, an object hash containing key/value 38 * pairs where the key is the header name string and the value is the header value string. 39 * @property {boolean} https - True causes the connection to use https, otherwise http is used. 40 * @property {object} nodeOptions - Options passed on to node. 41 * @example 42 * //Use a connection that requires ssl/tls, closes the connection after each request, 43 * // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic 44 * // to https://thrift.example.com:9090/hello 45 * var thrift = require('thrift'); 46 * var options = { 47 * transport: thrift.TBufferedTransport, 48 * protocol: thrift.TJSONProtocol, 49 * path: "/hello", 50 * headers: {"Connection": "close"}, 51 * https: true 52 * }; 53 * var con = thrift.createHttpConnection("thrift.example.com", 9090, options); 54 * var client = thrift.createHttpClient(myService, connection); 55 * client.myServiceFunction(); 56 */ 57 58/** 59 * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than 60 * instantiating directly). 61 * @constructor 62 * @param {ConnectOptions} options - The configuration options to use. 63 * @throws {error} Exceptions other than InputBufferUnderrunError are rethrown 64 * @event {error} The "error" event is fired when a Node.js error event occurs during 65 * request or response processing, in which case the node error is passed on. An "error" 66 * event may also be fired when the connection can not map a response back to the 67 * appropriate client (an internal error), generating a TApplicationException. 68 * @classdesc HttpConnection objects provide Thrift end point transport 69 * semantics implemented over the Node.js http.request() method. 70 * @see {@link createHttpConnection} 71 */ 72var HttpConnection = exports.HttpConnection = function(options) { 73 //Initialize the emitter base object 74 EventEmitter.call(this); 75 76 //Set configuration 77 var self = this; 78 this.options = options || {}; 79 this.host = this.options.host; 80 this.port = this.options.port; 81 this.socketPath = this.options.socketPath; 82 this.https = this.options.https || false; 83 this.transport = this.options.transport || TBufferedTransport; 84 this.protocol = this.options.protocol || TBinaryProtocol; 85 86 //Prepare Node.js options 87 this.nodeOptions = { 88 host: this.host, 89 port: this.port, 90 socketPath: this.socketPath, 91 path: this.options.path || '/', 92 method: 'POST', 93 headers: this.options.headers || {}, 94 responseType: this.options.responseType || null 95 }; 96 for (var attrname in this.options.nodeOptions) { 97 this.nodeOptions[attrname] = this.options.nodeOptions[attrname]; 98 } 99 /*jshint -W069 */ 100 if (! this.nodeOptions.headers['Connection']) { 101 this.nodeOptions.headers['Connection'] = 'keep-alive'; 102 } 103 /*jshint +W069 */ 104 105 //The sequence map is used to map seqIDs back to the 106 // calling client in multiplexed scenarios 107 this.seqId2Service = {}; 108 109 function decodeCallback(transport_with_data) { 110 var proto = new self.protocol(transport_with_data); 111 try { 112 while (true) { 113 var header = proto.readMessageBegin(); 114 var dummy_seqid = header.rseqid * -1; 115 var client = self.client; 116 //The Multiplexed Protocol stores a hash of seqid to service names 117 // in seqId2Service. If the SeqId is found in the hash we need to 118 // lookup the appropriate client for this call. 119 // The client var is a single client object when not multiplexing, 120 // when using multiplexing it is a service name keyed hash of client 121 // objects. 122 //NOTE: The 2 way interdependencies between protocols, transports, 123 // connections and clients in the Node.js implementation are irregular 124 // and make the implementation difficult to extend and maintain. We 125 // should bring this stuff inline with typical thrift I/O stack 126 // operation soon. 127 // --ra 128 var service_name = self.seqId2Service[header.rseqid]; 129 if (service_name) { 130 client = self.client[service_name]; 131 delete self.seqId2Service[header.rseqid]; 132 } 133 /*jshint -W083 */ 134 client._reqs[dummy_seqid] = function(err, success){ 135 transport_with_data.commitPosition(); 136 var clientCallback = client._reqs[header.rseqid]; 137 delete client._reqs[header.rseqid]; 138 if (clientCallback) { 139 process.nextTick(function() { 140 clientCallback(err, success); 141 }); 142 } 143 }; 144 /*jshint +W083 */ 145 if(client['recv_' + header.fname]) { 146 client['recv_' + header.fname](proto, header.mtype, dummy_seqid); 147 } else { 148 delete client._reqs[dummy_seqid]; 149 self.emit("error", 150 new thrift.TApplicationException( 151 thrift.TApplicationExceptionType.WRONG_METHOD_NAME, 152 "Received a response to an unknown RPC function")); 153 } 154 } 155 } 156 catch (e) { 157 if (e instanceof InputBufferUnderrunError) { 158 transport_with_data.rollbackPosition(); 159 } else { 160 self.emit('error', e); 161 } 162 } 163 } 164 165 166 //Response handler 167 ////////////////////////////////////////////////// 168 this.responseCallback = function(response) { 169 var data = []; 170 var dataLen = 0; 171 172 if (response.statusCode !== 200) { 173 this.emit("error", new THTTPException(response)); 174 } 175 176 response.on('error', function (e) { 177 self.emit("error", e); 178 }); 179 180 // When running directly under node, chunk will be a buffer, 181 // however, when running in a Browser (e.g. Browserify), chunk 182 // will be a string or an ArrayBuffer. 183 response.on('data', function (chunk) { 184 if ((typeof chunk == 'string') || 185 (Object.prototype.toString.call(chunk) == '[object Uint8Array]')) { 186 // Wrap ArrayBuffer/string in a Buffer so data[i].copy will work 187 data.push(new Buffer(chunk)); 188 } else { 189 data.push(chunk); 190 } 191 dataLen += chunk.length; 192 }); 193 194 response.on('end', function(){ 195 var buf = new Buffer(dataLen); 196 for (var i=0, len=data.length, pos=0; i<len; i++) { 197 data[i].copy(buf, pos); 198 pos += data[i].length; 199 } 200 //Get the receiver function for the transport and 201 // call it with the buffer 202 self.transport.receiver(decodeCallback)(buf); 203 }); 204 }; 205}; 206util.inherits(HttpConnection, EventEmitter); 207 208/** 209 * Writes Thrift message data to the connection 210 * @param {Buffer} data - A Node.js Buffer containing the data to write 211 * @returns {void} No return value. 212 * @event {error} the "error" event is raised upon request failure passing the 213 * Node.js error object to the listener. 214 */ 215HttpConnection.prototype.write = function(data) { 216 var self = this; 217 var opts = self.nodeOptions; 218 opts.headers["Content-length"] = data.length; 219 if (!opts.headers["Content-Type"]) 220 opts.headers["Content-Type"] = "application/x-thrift"; 221 var req = (self.https) ? 222 https.request(opts, self.responseCallback) : 223 http.request(opts, self.responseCallback); 224 req.on('error', function(err) { 225 self.emit("error", err); 226 }); 227 req.write(data); 228 req.end(); 229}; 230 231/** 232 * Creates a new HttpConnection object, used by Thrift clients to connect 233 * to Thrift HTTP based servers. 234 * @param {string} host - The host name or IP to connect to. 235 * @param {number} port - The TCP port to connect to. 236 * @param {ConnectOptions} options - The configuration options to use. 237 * @returns {HttpConnection} The connection object. 238 * @see {@link ConnectOptions} 239 */ 240exports.createHttpConnection = function(host, port, options) { 241 options.host = host; 242 options.port = port || 80; 243 return new HttpConnection(options); 244}; 245 246exports.createHttpUDSConnection = function(path, options) { 247 options.socketPath = path; 248 return new HttpConnection(options); 249}; 250 251exports.createHttpClient = createClient 252 253 254function THTTPException(response) { 255 thrift.TApplicationException.call(this); 256 if (Error.captureStackTrace !== undefined) { 257 Error.captureStackTrace(this, this.constructor); 258 } 259 260 this.name = this.constructor.name; 261 this.statusCode = response.statusCode; 262 this.response = response; 263 this.type = thrift.TApplicationExceptionType.PROTOCOL_ERROR; 264 this.message = "Received a response with a bad HTTP status code: " + response.statusCode; 265} 266util.inherits(THTTPException, thrift.TApplicationException); 267