1/* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19var http = require('http'); 20var https = require('https'); 21var url = require("url"); 22var path = require("path"); 23var fs = require("fs"); 24var crypto = require("crypto"); 25var log = require('./log'); 26 27var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor; 28 29var TBufferedTransport = require('./buffered_transport'); 30var TBinaryProtocol = require('./binary_protocol'); 31var InputBufferUnderrunError = require('./input_buffer_underrun_error'); 32 33// WSFrame constructor and prototype 34///////////////////////////////////////////////////////////////////// 35 36/** Apache Thrift RPC Web Socket Transport 37 * Frame layout conforming to RFC 6455 circa 12/2011 38 * 39 * Theoretical frame size limit is 4GB*4GB, however the Node Buffer 40 * limit is 1GB as of v0.10. The frame length encoding is also 41 * configured for a max of 4GB presently and needs to be adjusted 42 * if Node/Browsers become capabile of > 4GB frames. 43 * 44 * - FIN is 1 if the message is complete 45 * - RSV1/2/3 are always 0 46 * - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol 47 * - Mask Present bit is 1 sending to-server and 0 sending to-client 48 * - Payload Len: 49 * + If < 126: then represented directly 50 * + If >=126: but within range of an unsigned 16 bit integer 51 * then Payload Len is 126 and the two following bytes store 52 * the length 53 * + Else: Payload Len is 127 and the following 8 bytes store the 54 * length as an unsigned 64 bit integer 55 * - Masking key is a 32 bit key only present when sending to the server 56 * - Payload follows the masking key or length 57 * 58 * 0 1 2 3 59 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 60 * +-+-+-+-+-------+-+-------------+-------------------------------+ 61 * |F|R|R|R| opcode|M| Payload len | Extended payload length | 62 * |I|S|S|S| (4) |A| (7) | (16/64) | 63 * |N|V|V|V| |S| | (if payload len==126/127) | 64 * | |1|2|3| |K| | | 65 * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + 66 * | Extended payload length continued, if payload len == 127 | 67 * + - - - - - - - - - - - - - - - +-------------------------------+ 68 * | |Masking-key, if MASK set to 1 | 69 * +-------------------------------+-------------------------------+ 70 * | Masking-key (continued) | Payload Data | 71 * +-------------------------------- - - - - - - - - - - - - - - - + 72 * : Payload Data continued ... : 73 * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + 74 * | Payload Data continued ... | 75 * +---------------------------------------------------------------+ 76 */ 77var wsFrame = { 78 /** Encodes a WebSocket frame 79 * 80 * @param {Buffer} data - The raw data to encode 81 * @param {Buffer} mask - The mask to apply when sending to server, null for no mask 82 * @param {Boolean} binEncoding - True for binary encoding, false for text encoding 83 * @returns {Buffer} - The WebSocket frame, ready to send 84 */ 85 encode: function(data, mask, binEncoding) { 86 var frame = new Buffer(wsFrame.frameSizeFromData(data, mask)); 87 //Byte 0 - FIN & OPCODE 88 frame[0] = wsFrame.fin.FIN + 89 (binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT); 90 //Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE 91 var payloadOffset = 2; 92 if (data.length < 0x7E) { 93 frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); 94 } else if (data.length < 0xFFFF) { 95 frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); 96 frame.writeUInt16BE(data.length, 2, true); 97 payloadOffset = 4; 98 } else { 99 frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); 100 frame.writeUInt32BE(0, 2, true); 101 frame.writeUInt32BE(data.length, 6, true); 102 payloadOffset = 10; 103 } 104 //MASK 105 if (mask) { 106 mask.copy(frame, payloadOffset, 0, 4); 107 payloadOffset += 4; 108 } 109 //Payload 110 data.copy(frame, payloadOffset); 111 if (mask) { 112 wsFrame.applyMask(frame.slice(payloadOffset), frame.slice(payloadOffset-4,payloadOffset)); 113 } 114 return frame; 115 }, 116 117 /** 118 * @class 119 * @name WSDecodeResult 120 * @property {Buffer} data - The decoded data for the first ATRPC message 121 * @property {Buffer} mask - The frame mask 122 * @property {Boolean} binEncoding - True if binary (TBinaryProtocol), 123 * False if text (TJSONProtocol) 124 * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a 125 * single WebSocket frame, this Buffer contains 126 * any bytes remaining to be decoded 127 * @property {Boolean} FIN - True is the message is complete 128 */ 129 130 /** Decodes a WebSocket frame 131 * 132 * @param {Buffer} frame - The raw inbound frame, if this is a continuation 133 * frame it must have a mask property with the mask. 134 * @returns {WSDecodeResult} - The decoded payload 135 * 136 * @see {@link WSDecodeResult} 137 */ 138 decode: function(frame) { 139 var result = { 140 data: null, 141 mask: null, 142 binEncoding: false, 143 nextFrame: null, 144 FIN: true 145 }; 146 147 //Byte 0 - FIN & OPCODE 148 if (wsFrame.fin.FIN != (frame[0] & wsFrame.fin.FIN)) { 149 result.FIN = false; 150 } 151 result.binEncoding = (wsFrame.frameOpCodes.BIN == (frame[0] & wsFrame.frameOpCodes.BIN)); 152 //Byte 1 or 1-3 or 1-9 - SIZE 153 var lenByte = (frame[1] & 0x0000007F); 154 var len = lenByte; 155 var dataOffset = 2; 156 if (lenByte == 0x7E) { 157 len = frame.readUInt16BE(2); 158 dataOffset = 4; 159 } else if (lenByte == 0x7F) { 160 len = frame.readUInt32BE(6); 161 dataOffset = 10; 162 } 163 //MASK 164 if (wsFrame.mask.TO_SERVER == (frame[1] & wsFrame.mask.TO_SERVER)) { 165 result.mask = new Buffer(4); 166 frame.copy(result.mask, 0, dataOffset, dataOffset + 4); 167 dataOffset += 4; 168 } 169 //Payload 170 result.data = new Buffer(len); 171 frame.copy(result.data, 0, dataOffset, dataOffset+len); 172 if (result.mask) { 173 wsFrame.applyMask(result.data, result.mask); 174 } 175 //Next Frame 176 if (frame.length > dataOffset+len) { 177 result.nextFrame = new Buffer(frame.length - (dataOffset+len)); 178 frame.copy(result.nextFrame, 0, dataOffset+len, frame.length); 179 } 180 //Don't forward control frames 181 if (frame[0] & wsFrame.frameOpCodes.FINCTRL) { 182 result.data = null; 183 } 184 185 return result; 186 }, 187 188 /** Masks/Unmasks data 189 * 190 * @param {Buffer} data - data to mask/unmask in place 191 * @param {Buffer} mask - the mask 192 */ 193 applyMask: function(data, mask){ 194 //TODO: look into xoring words at a time 195 var dataLen = data.length; 196 var maskLen = mask.length; 197 for (var i = 0; i < dataLen; i++) { 198 data[i] = data[i] ^ mask[i%maskLen]; 199 } 200 }, 201 202 /** Computes frame size on the wire from data to be sent 203 * 204 * @param {Buffer} data - data.length is the assumed payload size 205 * @param {Boolean} mask - true if a mask will be sent (TO_SERVER) 206 */ 207 frameSizeFromData: function(data, mask) { 208 var headerSize = 10; 209 if (data.length < 0x7E) { 210 headerSize = 2; 211 } else if (data.length < 0xFFFF) { 212 headerSize = 4; 213 } 214 return headerSize + data.length + (mask ? 4 : 0); 215 }, 216 217 frameOpCodes: { 218 CONT: 0x00, 219 TEXT: 0x01, 220 BIN: 0x02, 221 CTRL: 0x80 222 }, 223 224 mask: { 225 TO_SERVER: 0x80, 226 TO_CLIENT: 0x00 227 }, 228 229 fin: { 230 CONT: 0x00, 231 FIN: 0x80 232 } 233}; 234 235 236// createWebServer constructor and options 237///////////////////////////////////////////////////////////////////// 238 239/** 240 * @class 241 * @name ServerOptions 242 * @property {array} cors - Array of CORS origin strings to permit requests from. 243 * @property {string} files - Path to serve static files from, if absent or "" 244 * static file service is disabled. 245 * @property {object} headers - An object hash mapping header strings to header value 246 * strings, these headers are transmitted in response to 247 * static file GET operations. 248 * @property {object} services - An object hash mapping service URI strings 249 * to ServiceOptions objects 250 * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html), 251 * if not present or null regular http is used, 252 * at least a key and a cert must be defined to use SSL/TLS 253 * @see {@link ServiceOptions} 254 */ 255 256/** 257 * @class 258 * @name ServiceOptions 259 * @property {object} transport - The layered transport to use (defaults 260 * to TBufferedTransport). 261 * @property {object} protocol - The serialization Protocol to use (defaults to 262 * TBinaryProtocol). 263 * @property {object} processor - The Thrift Service class/processor generated 264 * by the IDL Compiler for the service (the "cls" 265 * key can also be used for this attribute). 266 * @property {object} handler - The handler methods for the Thrift Service. 267 */ 268 269/** 270 * Create a Thrift server which can serve static files and/or one or 271 * more Thrift Services. 272 * @param {ServerOptions} options - The server configuration. 273 * @returns {object} - The Apache Thrift Web Server. 274 */ 275exports.createWebServer = function(options) { 276 var baseDir = options.files; 277 var contentTypesByExtension = { 278 '.txt': 'text/plain', 279 '.html': 'text/html', 280 '.css': 'text/css', 281 '.xml': 'application/xml', 282 '.json': 'application/json', 283 '.js': 'application/javascript', 284 '.jpg': 'image/jpeg', 285 '.jpeg': 'image/jpeg', 286 '.gif': 'image/gif', 287 '.png': 'image/png', 288 '.svg': 'image/svg+xml' 289 }; 290 291 //Setup all of the services 292 var services = options.services; 293 for (var uri in services) { 294 var svcObj = services[uri]; 295 296 //Setup the processor 297 if (svcObj.processor instanceof MultiplexedProcessor) { 298 //Multiplex processors have pre embedded processor/handler pairs, save as is 299 svcObj.processor = svcObj.processor; 300 } else { 301 //For historical reasons Node.js supports processors passed in directly or via the 302 // IDL Compiler generated class housing the processor. Also, the options property 303 // for a Processor has been called both cls and processor at different times. We 304 // support any of the four possibilities here. 305 var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) : 306 (svcObj.cls.Processor || svcObj.cls); 307 //Processors can be supplied as constructed objects with handlers already embedded, 308 // if a handler is provided we construct a new processor, if not we use the processor 309 // object directly 310 if (svcObj.handler) { 311 svcObj.processor = new processor(svcObj.handler); 312 } else { 313 svcObj.processor = processor; 314 } 315 } 316 svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport; 317 svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol; 318 } 319 320 //Verify CORS requirements 321 function VerifyCORSAndSetHeaders(request, response) { 322 if (request.headers.origin && options.cors) { 323 if (options.cors["*"] || options.cors[request.headers.origin]) { 324 //Allow, origin allowed 325 response.setHeader("access-control-allow-origin", request.headers.origin); 326 response.setHeader("access-control-allow-methods", "GET, POST, OPTIONS"); 327 response.setHeader("access-control-allow-headers", "content-type, accept"); 328 response.setHeader("access-control-max-age", "60"); 329 return true; 330 } else { 331 //Disallow, origin denied 332 return false; 333 } 334 } 335 //Allow, CORS is not in use 336 return true; 337 } 338 339 340 //Handle OPTIONS method (CORS) 341 /////////////////////////////////////////////////// 342 function processOptions(request, response) { 343 if (VerifyCORSAndSetHeaders(request, response)) { 344 response.writeHead("204", "No Content", {"content-length": 0}); 345 } else { 346 response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); 347 } 348 response.end(); 349 } 350 351 352 //Handle POST methods (TXHRTransport) 353 /////////////////////////////////////////////////// 354 function processPost(request, response) { 355 //Lookup service 356 var uri = url.parse(request.url).pathname; 357 var svc = services[uri]; 358 if (!svc) { 359 response.writeHead("403", "No Apache Thrift Service at " + uri, {}); 360 response.end(); 361 return; 362 } 363 364 //Verify CORS requirements 365 if (!VerifyCORSAndSetHeaders(request, response)) { 366 response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); 367 response.end(); 368 return; 369 } 370 371 //Process XHR payload 372 request.on('data', svc.transport.receiver(function(transportWithData) { 373 var input = new svc.protocol(transportWithData); 374 var output = new svc.protocol(new svc.transport(undefined, function(buf) { 375 try { 376 response.writeHead(200); 377 response.end(buf); 378 } catch (err) { 379 response.writeHead(500); 380 response.end(); 381 } 382 })); 383 384 try { 385 svc.processor.process(input, output); 386 transportWithData.commitPosition(); 387 } catch (err) { 388 if (err instanceof InputBufferUnderrunError) { 389 transportWithData.rollbackPosition(); 390 } else { 391 response.writeHead(500); 392 response.end(); 393 } 394 } 395 })); 396 } 397 398 399 //Handle GET methods (Static Page Server) 400 /////////////////////////////////////////////////// 401 function processGet(request, response) { 402 //Undefined or empty base directory means do not serve static files 403 if (!baseDir || "" === baseDir) { 404 response.writeHead(404); 405 response.end(); 406 return; 407 } 408 409 //Verify CORS requirements 410 if (!VerifyCORSAndSetHeaders(request, response)) { 411 response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); 412 response.end(); 413 return; 414 } 415 416 //Locate the file requested and send it 417 var uri = url.parse(request.url).pathname; 418 var filename = path.resolve(path.join(baseDir, uri)); 419 420 //Ensure the basedir path is not able to be escaped 421 if (filename.indexOf(baseDir) != 0) { 422 response.writeHead(400, "Invalid request path", {}); 423 response.end(); 424 return; 425 } 426 427 fs.exists(filename, function(exists) { 428 if(!exists) { 429 response.writeHead(404); 430 response.end(); 431 return; 432 } 433 434 if (fs.statSync(filename).isDirectory()) { 435 filename += '/index.html'; 436 } 437 438 fs.readFile(filename, "binary", function(err, file) { 439 if (err) { 440 response.writeHead(500); 441 response.end(err + "\n"); 442 return; 443 } 444 var headers = {}; 445 var contentType = contentTypesByExtension[path.extname(filename)]; 446 if (contentType) { 447 headers["Content-Type"] = contentType; 448 } 449 for (var k in options.headers) { 450 headers[k] = options.headers[k]; 451 } 452 response.writeHead(200, headers); 453 response.write(file, "binary"); 454 response.end(); 455 }); 456 }); 457 } 458 459 460 //Handle WebSocket calls (TWebSocketTransport) 461 /////////////////////////////////////////////////// 462 function processWS(data, socket, svc, binEncoding) { 463 svc.transport.receiver(function(transportWithData) { 464 var input = new svc.protocol(transportWithData); 465 var output = new svc.protocol(new svc.transport(undefined, function(buf) { 466 try { 467 var frame = wsFrame.encode(buf, null, binEncoding); 468 socket.write(frame); 469 } catch (err) { 470 //TODO: Add better error processing 471 } 472 })); 473 474 try { 475 svc.processor.process(input, output); 476 transportWithData.commitPosition(); 477 } 478 catch (err) { 479 if (err instanceof InputBufferUnderrunError) { 480 transportWithData.rollbackPosition(); 481 } 482 else { 483 //TODO: Add better error processing 484 } 485 } 486 })(data); 487 } 488 489 //Create the server (HTTP or HTTPS) 490 var server = null; 491 if (options.tls) { 492 server = https.createServer(options.tls); 493 } else { 494 server = http.createServer(); 495 } 496 497 //Wire up listeners for upgrade(to WebSocket) & request methods for: 498 // - GET static files, 499 // - POST XHR Thrift services 500 // - OPTIONS CORS requests 501 server.on('request', function(request, response) { 502 if (request.method === 'POST') { 503 processPost(request, response); 504 } else if (request.method === 'GET') { 505 processGet(request, response); 506 } else if (request.method === 'OPTIONS') { 507 processOptions(request, response); 508 } else { 509 response.writeHead(500); 510 response.end(); 511 } 512 }).on('upgrade', function(request, socket, head) { 513 //Lookup service 514 var svc; 515 try { 516 svc = services[Object.keys(services)[0]]; 517 } catch(e) { 518 socket.write("HTTP/1.1 403 No Apache Thrift Service available\r\n\r\n"); 519 return; 520 } 521 //Perform upgrade 522 var hash = crypto.createHash("sha1"); 523 hash.update(request.headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); 524 socket.write("HTTP/1.1 101 Switching Protocols\r\n" + 525 "Upgrade: websocket\r\n" + 526 "Connection: Upgrade\r\n" + 527 "Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" + 528 "Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" + 529 "Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" + 530 "\r\n"); 531 //Handle WebSocket traffic 532 var data = null; 533 socket.on('data', function(frame) { 534 try { 535 while (frame) { 536 var result = wsFrame.decode(frame); 537 //Prepend any existing decoded data 538 if (data) { 539 if (result.data) { 540 var newData = new Buffer(data.length + result.data.length); 541 data.copy(newData); 542 result.data.copy(newData, data.length); 543 result.data = newData; 544 } else { 545 result.data = data; 546 } 547 data = null; 548 } 549 //If this completes a message process it 550 if (result.FIN) { 551 processWS(result.data, socket, svc, result.binEncoding); 552 } else { 553 data = result.data; 554 } 555 //Prepare next frame for decoding (if any) 556 frame = result.nextFrame; 557 } 558 } catch(e) { 559 log.error('TWebSocketTransport Exception: ' + e); 560 socket.destroy(); 561 } 562 }); 563 }); 564 565 //Return the server 566 return server; 567}; 568