1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21 #include <cstring>
22
23 #include <thrift/transport/TPipe.h>
24 #include <thrift/transport/TPipeServer.h>
25 #include <thrift/TNonCopyable.h>
26
27 #ifdef _WIN32
28 #include <thrift/windows/OverlappedSubmissionThread.h>
29 #include <thrift/windows/Sync.h>
30 #include <AccCtrl.h>
31 #include <Aclapi.h>
32 #include <sddl.h>
33 #endif //_WIN32
34
35 namespace apache {
36 namespace thrift {
37 namespace transport {
38
39 #ifdef _WIN32
40
41 using std::shared_ptr;
42
43 class TPipeServerImpl : apache::thrift::TNonCopyable {
44 public:
TPipeServerImpl()45 TPipeServerImpl() {}
~TPipeServerImpl()46 virtual ~TPipeServerImpl() {}
47 virtual void interrupt() = 0;
48 virtual std::shared_ptr<TTransport> acceptImpl() = 0;
49
50 virtual HANDLE getPipeHandle() = 0;
51 virtual HANDLE getWrtPipeHandle() = 0;
52 virtual HANDLE getClientRdPipeHandle() = 0;
53 virtual HANDLE getClientWrtPipeHandle() = 0;
getNativeWaitHandle()54 virtual HANDLE getNativeWaitHandle() { return nullptr; }
55 };
56
57 class TAnonPipeServer : public TPipeServerImpl {
58 public:
TAnonPipeServer()59 TAnonPipeServer() {
60 // The anonymous pipe needs to be created first so that the server can
61 // pass the handles on to the client before the serve (acceptImpl)
62 // blocking call.
63 if (!createAnonPipe()) {
64 GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
65 throw TTransportException(TTransportException::NOT_OPEN,
66 " TPipeServer Create(Anon)Pipe failed");
67 }
68 }
69
~TAnonPipeServer()70 virtual ~TAnonPipeServer() {
71 PipeR_.reset();
72 PipeW_.reset();
73 ClientAnonRead_.reset();
74 ClientAnonWrite_.reset();
75 }
76
interrupt()77 virtual void interrupt() {} // not currently implemented
78
79 virtual std::shared_ptr<TTransport> acceptImpl();
80
getPipeHandle()81 virtual HANDLE getPipeHandle() { return PipeR_.h; }
getWrtPipeHandle()82 virtual HANDLE getWrtPipeHandle() { return PipeW_.h; }
getClientRdPipeHandle()83 virtual HANDLE getClientRdPipeHandle() { return ClientAnonRead_.h; }
getClientWrtPipeHandle()84 virtual HANDLE getClientWrtPipeHandle() { return ClientAnonWrite_.h; }
85
86 private:
87 bool createAnonPipe();
88
89 TAutoHandle PipeR_; // Anonymous Pipe (R)
90 TAutoHandle PipeW_; // Anonymous Pipe (W)
91
92 // Client side anonymous pipe handles
93 //? Do we need duplicates to send to client?
94 TAutoHandle ClientAnonRead_;
95 TAutoHandle ClientAnonWrite_;
96 };
97
98 class TNamedPipeServer : public TPipeServerImpl {
99 public:
TNamedPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections,const std::string & securityDescriptor)100 TNamedPipeServer(const std::string& pipename,
101 uint32_t bufsize,
102 uint32_t maxconnections,
103 const std::string& securityDescriptor)
104 : stopping_(false),
105 pipename_(pipename),
106 bufsize_(bufsize),
107 maxconns_(maxconnections),
108 securityDescriptor_(securityDescriptor) {
109 connectOverlap_.action = TOverlappedWorkItem::CONNECT;
110 cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
111 TAutoCrit lock(pipe_protect_);
112 initiateNamedConnect(lock);
113 }
~TNamedPipeServer()114 virtual ~TNamedPipeServer() {}
115
interrupt()116 virtual void interrupt() {
117 TAutoCrit lock(pipe_protect_);
118 cached_client_.reset();
119 if (Pipe_.h != INVALID_HANDLE_VALUE) {
120 stopping_ = true;
121 cancelOverlap_.h = Pipe_.h;
122 // This should wake up GetOverlappedResult
123 thread_->addWorkItem(&cancelOverlap_);
124 }
125 }
126
127 virtual std::shared_ptr<TTransport> acceptImpl();
128
getPipeHandle()129 virtual HANDLE getPipeHandle() { return Pipe_.h; }
getWrtPipeHandle()130 virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
getClientRdPipeHandle()131 virtual HANDLE getClientRdPipeHandle() { return INVALID_HANDLE_VALUE; }
getClientWrtPipeHandle()132 virtual HANDLE getClientWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
getNativeWaitHandle()133 virtual HANDLE getNativeWaitHandle() { return listen_event_.h; }
134
135 private:
136 bool createNamedPipe(const TAutoCrit &lockProof);
137 void initiateNamedConnect(const TAutoCrit &lockProof);
138
139 TAutoOverlapThread thread_;
140 TOverlappedWorkItem connectOverlap_;
141 TOverlappedWorkItem cancelOverlap_;
142
143 bool stopping_;
144 std::string pipename_;
145 std::string securityDescriptor_;
146 uint32_t bufsize_;
147 uint32_t maxconns_;
148 TManualResetEvent listen_event_;
149
150 TCriticalSection pipe_protect_;
151 // only read or write these variables underneath a locked pipe_protect_
152 std::shared_ptr<TPipe> cached_client_;
153 TAutoHandle Pipe_;
154 };
155
getNativeWaitHandle()156 HANDLE TPipeServer::getNativeWaitHandle() {
157 if (impl_)
158 return impl_->getNativeWaitHandle();
159 return nullptr;
160 }
161
162 //---- Constructors ----
TPipeServer(const std::string & pipename,uint32_t bufsize)163 TPipeServer::TPipeServer(const std::string& pipename, uint32_t bufsize)
164 : bufsize_(bufsize), isAnonymous_(false) {
165 setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
166 setPipename(pipename);
167 setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
168 }
169
TPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections)170 TPipeServer::TPipeServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections)
171 : bufsize_(bufsize), isAnonymous_(false) {
172 setMaxConnections(maxconnections);
173 setPipename(pipename);
174 setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
175 }
176
TPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections,const std::string & securityDescriptor)177 TPipeServer::TPipeServer(const std::string& pipename,
178 uint32_t bufsize,
179 uint32_t maxconnections,
180 const std::string& securityDescriptor)
181 : bufsize_(bufsize), isAnonymous_(false) {
182 setMaxConnections(maxconnections);
183 setPipename(pipename);
184 setSecurityDescriptor(securityDescriptor);
185 }
186
TPipeServer(const std::string & pipename)187 TPipeServer::TPipeServer(const std::string& pipename) : bufsize_(1024), isAnonymous_(false) {
188 setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
189 setPipename(pipename);
190 setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
191 }
192
TPipeServer(int bufsize)193 TPipeServer::TPipeServer(int bufsize) : bufsize_(bufsize), isAnonymous_(true) {
194 setMaxConnections(1);
195 impl_.reset(new TAnonPipeServer);
196 }
197
TPipeServer()198 TPipeServer::TPipeServer() : bufsize_(1024), isAnonymous_(true) {
199 setMaxConnections(1);
200 impl_.reset(new TAnonPipeServer);
201 }
202
203 //---- Destructor ----
~TPipeServer()204 TPipeServer::~TPipeServer() {}
205
isOpen() const206 bool TPipeServer::isOpen() const {
207 return (impl_->getPipeHandle() != INVALID_HANDLE_VALUE);
208 }
209
210 //---------------------------------------------------------
211 // Transport callbacks
212 //---------------------------------------------------------
listen()213 void TPipeServer::listen() {
214 if (isAnonymous_)
215 return;
216 impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_, securityDescriptor_));
217 }
218
acceptImpl()219 shared_ptr<TTransport> TPipeServer::acceptImpl() {
220 return impl_->acceptImpl();
221 }
222
acceptImpl()223 shared_ptr<TTransport> TAnonPipeServer::acceptImpl() {
224 // This 0-byte read serves merely as a blocking call.
225 byte buf;
226 DWORD br;
227 int fSuccess = ReadFile(PipeR_.h, // pipe handle
228 &buf, // buffer to receive reply
229 0, // size of buffer
230 &br, // number of bytes read
231 nullptr); // not overlapped
232
233 if (!fSuccess && GetLastError() != ERROR_MORE_DATA) {
234 GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
235 throw TTransportException(TTransportException::NOT_OPEN,
236 " TPipeServer unable to initiate pipe comms");
237 }
238 shared_ptr<TPipe> client(new TPipe(PipeR_.h, PipeW_.h));
239 return client;
240 }
241
initiateNamedConnect(const TAutoCrit & lockProof)242 void TNamedPipeServer::initiateNamedConnect(const TAutoCrit &lockProof) {
243 if (stopping_)
244 return;
245 if (!createNamedPipe(lockProof)) {
246 GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
247 throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
248 }
249
250 // The prior connection has been handled, so close the gate
251 ResetEvent(listen_event_.h);
252 connectOverlap_.reset(nullptr, 0, listen_event_.h);
253 connectOverlap_.h = Pipe_.h;
254 thread_->addWorkItem(&connectOverlap_);
255
256 // Wait for the client to connect; if it succeeds, the
257 // function returns a nonzero value. If the function returns
258 // zero, GetLastError should return ERROR_PIPE_CONNECTED.
259 if (connectOverlap_.success) {
260 GlobalOutput.printf("Client connected.");
261 cached_client_.reset(new TPipe(Pipe_));
262 // make sure people know that a connection is ready
263 SetEvent(listen_event_.h);
264 return;
265 }
266
267 DWORD dwErr = connectOverlap_.last_error;
268 switch (dwErr) {
269 case ERROR_PIPE_CONNECTED:
270 GlobalOutput.printf("Client connected.");
271 cached_client_.reset(new TPipe(Pipe_));
272 // make sure people know that a connection is ready
273 SetEvent(listen_event_.h);
274 return;
275 case ERROR_IO_PENDING:
276 return; // acceptImpl will do the appropriate WaitForMultipleObjects
277 default:
278 GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr);
279 throw TTransportException(TTransportException::NOT_OPEN,
280 " TPipeServer ConnectNamedPipe failed");
281 }
282 }
283
acceptImpl()284 shared_ptr<TTransport> TNamedPipeServer::acceptImpl() {
285 {
286 TAutoCrit lock(pipe_protect_);
287 if (cached_client_.get() != nullptr) {
288 shared_ptr<TPipe> client;
289 // zero out cached_client, since we are about to return it.
290 client.swap(cached_client_);
291
292 // kick off the next connection before returning
293 initiateNamedConnect(lock);
294 return client; // success!
295 }
296 }
297
298 if (Pipe_.h == INVALID_HANDLE_VALUE) {
299 throw TTransportException(TTransportException::NOT_OPEN,
300 "TNamedPipeServer: someone called accept on a closed pipe server");
301 }
302
303 DWORD dwDummy = 0;
304
305 // For the most part, Pipe_ should be protected with pipe_protect_. We can't
306 // reasonably do that here though without breaking interruptability. However,
307 // this should be safe, though I'm not happy about it. We only need to ensure
308 // that no one writes / modifies Pipe_.h while we are reading it. Well, the
309 // only two things that should be modifying Pipe_ are acceptImpl, the
310 // functions it calls, and the destructor. Those things shouldn't be run
311 // concurrently anyway. So this call is 'really' just a read that may happen
312 // concurrently with interrupt, and that should be fine.
313 if (GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) {
314 TAutoCrit lock(pipe_protect_);
315 shared_ptr<TPipe> client;
316 try {
317 client.reset(new TPipe(Pipe_));
318 } catch (TTransportException& ttx) {
319 if (ttx.getType() == TTransportException::INTERRUPTED) {
320 throw;
321 }
322
323 GlobalOutput.perror("Client connection failed. TTransportExceptionType=", ttx.getType());
324 // kick off the next connection before throwing
325 initiateNamedConnect(lock);
326 throw TTransportException(TTransportException::CLIENT_DISCONNECT, ttx.what());
327 }
328 GlobalOutput.printf("Client connected.");
329 // kick off the next connection before returning
330 initiateNamedConnect(lock);
331 return client; // success!
332 }
333 // if we got here, then we are in an error / shutdown case
334 DWORD gle = GetLastError(); // save error before doing cleanup
335 GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
336 if(gle == ERROR_OPERATION_ABORTED) {
337 TAutoCrit lock(pipe_protect_); // Needed to insure concurrent thread to be out of interrupt.
338 throw TTransportException(TTransportException::INTERRUPTED, "TPipeServer: server interupted");
339 }
340 throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
341 }
342
interrupt()343 void TPipeServer::interrupt() {
344 if (impl_)
345 impl_->interrupt();
346 }
347
close()348 void TPipeServer::close() {
349 impl_.reset();
350 }
351
createNamedPipe(const TAutoCrit &)352 bool TNamedPipeServer::createNamedPipe(const TAutoCrit& /*lockProof*/) {
353
354 PSECURITY_DESCRIPTOR psd = nullptr;
355 ULONG size = 0;
356
357 if (!ConvertStringSecurityDescriptorToSecurityDescriptorA(securityDescriptor_.c_str(),
358 SDDL_REVISION_1, &psd, &size)) {
359 DWORD lastError = GetLastError();
360 GlobalOutput.perror("TPipeServer::ConvertStringSecurityDescriptorToSecurityDescriptorA() GLE=",
361 lastError);
362 throw TTransportException(
363 TTransportException::NOT_OPEN,
364 "TPipeServer::ConvertStringSecurityDescriptorToSecurityDescriptorA() failed", lastError);
365 }
366
367 SECURITY_ATTRIBUTES sa;
368 sa.nLength = sizeof(SECURITY_ATTRIBUTES);
369 sa.lpSecurityDescriptor = psd;
370 sa.bInheritHandle = FALSE;
371
372 // Create an instance of the named pipe
373 TAutoHandle hPipe(CreateNamedPipeA(pipename_.c_str(), // pipe name
374 PIPE_ACCESS_DUPLEX | // read/write access
375 FILE_FLAG_OVERLAPPED, // async mode
376 PIPE_TYPE_BYTE | // byte type pipe
377 PIPE_READMODE_BYTE, // byte read mode
378 maxconns_, // max. instances
379 bufsize_, // output buffer size
380 bufsize_, // input buffer size
381 0, // client time-out
382 &sa)); // security attributes
383
384 auto lastError = GetLastError();
385 if (psd)
386 LocalFree(psd);
387
388 if (hPipe.h == INVALID_HANDLE_VALUE) {
389 Pipe_.reset();
390 GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", lastError);
391 throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed",
392 lastError);
393 }
394
395 Pipe_.reset(hPipe.release());
396 return true;
397 }
398
createAnonPipe()399 bool TAnonPipeServer::createAnonPipe() {
400 SECURITY_ATTRIBUTES sa;
401 SECURITY_DESCRIPTOR sd; // security information for pipes
402
403 if (!InitializeSecurityDescriptor(&sd, SECURITY_DESCRIPTOR_REVISION)) {
404 GlobalOutput.perror("TPipeServer InitializeSecurityDescriptor (anon) failed, GLE=",
405 GetLastError());
406 return false;
407 }
408 if (!SetSecurityDescriptorDacl(&sd, true, nullptr, false)) {
409 GlobalOutput.perror("TPipeServer SetSecurityDescriptorDacl (anon) failed, GLE=",
410 GetLastError());
411 return false;
412 }
413 sa.lpSecurityDescriptor = &sd;
414 sa.nLength = sizeof(SECURITY_ATTRIBUTES);
415 sa.bInheritHandle = true; // allow passing handle to child
416
417 HANDLE ClientAnonReadH, PipeW_H, ClientAnonWriteH, Pipe_H;
418 if (!CreatePipe(&ClientAnonReadH, &PipeW_H, &sa, 0)) // create stdin pipe
419 {
420 GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError());
421 return false;
422 }
423 if (!CreatePipe(&Pipe_H, &ClientAnonWriteH, &sa, 0)) // create stdout pipe
424 {
425 GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError());
426 CloseHandle(ClientAnonReadH);
427 CloseHandle(PipeW_H);
428 return false;
429 }
430
431 ClientAnonRead_.reset(ClientAnonReadH);
432 ClientAnonWrite_.reset(ClientAnonWriteH);
433 PipeR_.reset(Pipe_H);
434 PipeW_.reset(PipeW_H);
435
436 return true;
437 }
438
439 //---------------------------------------------------------
440 // Accessors
441 //---------------------------------------------------------
getPipename()442 std::string TPipeServer::getPipename() {
443 return pipename_;
444 }
445
setPipename(const std::string & pipename)446 void TPipeServer::setPipename(const std::string& pipename) {
447 if (pipename.find("\\\\") == std::string::npos)
448 pipename_ = "\\\\.\\pipe\\" + pipename;
449 else
450 pipename_ = pipename;
451 }
452
getBufferSize()453 int TPipeServer::getBufferSize() {
454 return bufsize_;
455 }
setBufferSize(int bufsize)456 void TPipeServer::setBufferSize(int bufsize) {
457 bufsize_ = bufsize;
458 }
459
getPipeHandle()460 HANDLE TPipeServer::getPipeHandle() {
461 return impl_ ? impl_->getPipeHandle() : INVALID_HANDLE_VALUE;
462 }
getWrtPipeHandle()463 HANDLE TPipeServer::getWrtPipeHandle() {
464 return impl_ ? impl_->getWrtPipeHandle() : INVALID_HANDLE_VALUE;
465 }
getClientRdPipeHandle()466 HANDLE TPipeServer::getClientRdPipeHandle() {
467 return impl_ ? impl_->getClientRdPipeHandle() : INVALID_HANDLE_VALUE;
468 }
getClientWrtPipeHandle()469 HANDLE TPipeServer::getClientWrtPipeHandle() {
470 return impl_ ? impl_->getClientWrtPipeHandle() : INVALID_HANDLE_VALUE;
471 }
472
getAnonymous()473 bool TPipeServer::getAnonymous() {
474 return isAnonymous_;
475 }
setAnonymous(bool anon)476 void TPipeServer::setAnonymous(bool anon) {
477 isAnonymous_ = anon;
478 }
479
setSecurityDescriptor(const std::string & securityDescriptor)480 void TPipeServer::setSecurityDescriptor(const std::string& securityDescriptor) {
481 securityDescriptor_ = securityDescriptor;
482 }
483
setMaxConnections(uint32_t maxconnections)484 void TPipeServer::setMaxConnections(uint32_t maxconnections) {
485 if (maxconnections == 0)
486 maxconns_ = 1;
487 else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
488 maxconns_ = PIPE_UNLIMITED_INSTANCES;
489 else
490 maxconns_ = maxconnections;
491 }
492
493 #endif //_WIN32
494 }
495 }
496 } // apache::thrift::transport
497