1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 #include <cstring>
22 
23 #include <thrift/transport/TPipe.h>
24 #include <thrift/transport/TPipeServer.h>
25 #include <thrift/TNonCopyable.h>
26 
27 #ifdef _WIN32
28 #include <thrift/windows/OverlappedSubmissionThread.h>
29 #include <thrift/windows/Sync.h>
30 #include <AccCtrl.h>
31 #include <Aclapi.h>
32 #include <sddl.h>
33 #endif //_WIN32
34 
35 namespace apache {
36 namespace thrift {
37 namespace transport {
38 
39 #ifdef _WIN32
40 
41 using std::shared_ptr;
42 
43 class TPipeServerImpl : apache::thrift::TNonCopyable {
44 public:
TPipeServerImpl()45   TPipeServerImpl() {}
~TPipeServerImpl()46   virtual ~TPipeServerImpl() {}
47   virtual void interrupt() = 0;
48   virtual std::shared_ptr<TTransport> acceptImpl() = 0;
49 
50   virtual HANDLE getPipeHandle() = 0;
51   virtual HANDLE getWrtPipeHandle() = 0;
52   virtual HANDLE getClientRdPipeHandle() = 0;
53   virtual HANDLE getClientWrtPipeHandle() = 0;
getNativeWaitHandle()54   virtual HANDLE getNativeWaitHandle() { return nullptr; }
55 };
56 
57 class TAnonPipeServer : public TPipeServerImpl {
58 public:
TAnonPipeServer()59   TAnonPipeServer() {
60     // The anonymous pipe needs to be created first so that the server can
61     // pass the handles on to the client before the serve (acceptImpl)
62     // blocking call.
63     if (!createAnonPipe()) {
64       GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError());
65       throw TTransportException(TTransportException::NOT_OPEN,
66                                 " TPipeServer Create(Anon)Pipe failed");
67     }
68   }
69 
~TAnonPipeServer()70   virtual ~TAnonPipeServer() {
71     PipeR_.reset();
72     PipeW_.reset();
73     ClientAnonRead_.reset();
74     ClientAnonWrite_.reset();
75   }
76 
interrupt()77   virtual void interrupt() {} // not currently implemented
78 
79   virtual std::shared_ptr<TTransport> acceptImpl();
80 
getPipeHandle()81   virtual HANDLE getPipeHandle() { return PipeR_.h; }
getWrtPipeHandle()82   virtual HANDLE getWrtPipeHandle() { return PipeW_.h; }
getClientRdPipeHandle()83   virtual HANDLE getClientRdPipeHandle() { return ClientAnonRead_.h; }
getClientWrtPipeHandle()84   virtual HANDLE getClientWrtPipeHandle() { return ClientAnonWrite_.h; }
85 
86 private:
87   bool createAnonPipe();
88 
89   TAutoHandle PipeR_; // Anonymous Pipe (R)
90   TAutoHandle PipeW_; // Anonymous Pipe (W)
91 
92   // Client side anonymous pipe handles
93   //? Do we need duplicates to send to client?
94   TAutoHandle ClientAnonRead_;
95   TAutoHandle ClientAnonWrite_;
96 };
97 
98 class TNamedPipeServer : public TPipeServerImpl {
99 public:
TNamedPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections,const std::string & securityDescriptor)100   TNamedPipeServer(const std::string& pipename,
101                    uint32_t bufsize,
102                    uint32_t maxconnections,
103                    const std::string& securityDescriptor)
104     : stopping_(false),
105       pipename_(pipename),
106       bufsize_(bufsize),
107       maxconns_(maxconnections),
108       securityDescriptor_(securityDescriptor) {
109     connectOverlap_.action = TOverlappedWorkItem::CONNECT;
110     cancelOverlap_.action = TOverlappedWorkItem::CANCELIO;
111     TAutoCrit lock(pipe_protect_);
112     initiateNamedConnect(lock);
113   }
~TNamedPipeServer()114   virtual ~TNamedPipeServer() {}
115 
interrupt()116   virtual void interrupt() {
117     TAutoCrit lock(pipe_protect_);
118     cached_client_.reset();
119     if (Pipe_.h != INVALID_HANDLE_VALUE) {
120       stopping_ = true;
121       cancelOverlap_.h = Pipe_.h;
122       // This should wake up GetOverlappedResult
123       thread_->addWorkItem(&cancelOverlap_);
124     }
125   }
126 
127   virtual std::shared_ptr<TTransport> acceptImpl();
128 
getPipeHandle()129   virtual HANDLE getPipeHandle() { return Pipe_.h; }
getWrtPipeHandle()130   virtual HANDLE getWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
getClientRdPipeHandle()131   virtual HANDLE getClientRdPipeHandle() { return INVALID_HANDLE_VALUE; }
getClientWrtPipeHandle()132   virtual HANDLE getClientWrtPipeHandle() { return INVALID_HANDLE_VALUE; }
getNativeWaitHandle()133   virtual HANDLE getNativeWaitHandle() { return listen_event_.h; }
134 
135 private:
136   bool createNamedPipe(const TAutoCrit &lockProof);
137   void initiateNamedConnect(const TAutoCrit &lockProof);
138 
139   TAutoOverlapThread thread_;
140   TOverlappedWorkItem connectOverlap_;
141   TOverlappedWorkItem cancelOverlap_;
142 
143   bool stopping_;
144   std::string pipename_;
145   std::string securityDescriptor_;
146   uint32_t bufsize_;
147   uint32_t maxconns_;
148   TManualResetEvent listen_event_;
149 
150   TCriticalSection pipe_protect_;
151   // only read or write these variables underneath a locked pipe_protect_
152   std::shared_ptr<TPipe> cached_client_;
153   TAutoHandle Pipe_;
154 };
155 
getNativeWaitHandle()156 HANDLE TPipeServer::getNativeWaitHandle() {
157   if (impl_)
158     return impl_->getNativeWaitHandle();
159   return nullptr;
160 }
161 
162 //---- Constructors ----
TPipeServer(const std::string & pipename,uint32_t bufsize)163 TPipeServer::TPipeServer(const std::string& pipename, uint32_t bufsize)
164   : bufsize_(bufsize), isAnonymous_(false) {
165   setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
166   setPipename(pipename);
167   setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
168 }
169 
TPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections)170 TPipeServer::TPipeServer(const std::string& pipename, uint32_t bufsize, uint32_t maxconnections)
171   : bufsize_(bufsize), isAnonymous_(false) {
172   setMaxConnections(maxconnections);
173   setPipename(pipename);
174   setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
175 }
176 
TPipeServer(const std::string & pipename,uint32_t bufsize,uint32_t maxconnections,const std::string & securityDescriptor)177 TPipeServer::TPipeServer(const std::string& pipename,
178                          uint32_t bufsize,
179                          uint32_t maxconnections,
180                          const std::string& securityDescriptor)
181   : bufsize_(bufsize), isAnonymous_(false) {
182   setMaxConnections(maxconnections);
183   setPipename(pipename);
184   setSecurityDescriptor(securityDescriptor);
185 }
186 
TPipeServer(const std::string & pipename)187 TPipeServer::TPipeServer(const std::string& pipename) : bufsize_(1024), isAnonymous_(false) {
188   setMaxConnections(TPIPE_SERVER_MAX_CONNS_DEFAULT);
189   setPipename(pipename);
190   setSecurityDescriptor(DEFAULT_PIPE_SECURITY);
191 }
192 
TPipeServer(int bufsize)193 TPipeServer::TPipeServer(int bufsize) : bufsize_(bufsize), isAnonymous_(true) {
194   setMaxConnections(1);
195   impl_.reset(new TAnonPipeServer);
196 }
197 
TPipeServer()198 TPipeServer::TPipeServer() : bufsize_(1024), isAnonymous_(true) {
199   setMaxConnections(1);
200   impl_.reset(new TAnonPipeServer);
201 }
202 
203 //---- Destructor ----
~TPipeServer()204 TPipeServer::~TPipeServer() {}
205 
isOpen() const206 bool TPipeServer::isOpen() const {
207   return (impl_->getPipeHandle() != INVALID_HANDLE_VALUE);
208 }
209 
210 //---------------------------------------------------------
211 // Transport callbacks
212 //---------------------------------------------------------
listen()213 void TPipeServer::listen() {
214   if (isAnonymous_)
215     return;
216   impl_.reset(new TNamedPipeServer(pipename_, bufsize_, maxconns_, securityDescriptor_));
217 }
218 
acceptImpl()219 shared_ptr<TTransport> TPipeServer::acceptImpl() {
220   return impl_->acceptImpl();
221 }
222 
acceptImpl()223 shared_ptr<TTransport> TAnonPipeServer::acceptImpl() {
224   // This 0-byte read serves merely as a blocking call.
225   byte buf;
226   DWORD br;
227   int fSuccess = ReadFile(PipeR_.h, // pipe handle
228                           &buf,     // buffer to receive reply
229                           0,        // size of buffer
230                           &br,      // number of bytes read
231                           nullptr);    // not overlapped
232 
233   if (!fSuccess && GetLastError() != ERROR_MORE_DATA) {
234     GlobalOutput.perror("TPipeServer unable to initiate pipe comms, GLE=", GetLastError());
235     throw TTransportException(TTransportException::NOT_OPEN,
236                               " TPipeServer unable to initiate pipe comms");
237   }
238   shared_ptr<TPipe> client(new TPipe(PipeR_.h, PipeW_.h));
239   return client;
240 }
241 
initiateNamedConnect(const TAutoCrit & lockProof)242 void TNamedPipeServer::initiateNamedConnect(const TAutoCrit &lockProof) {
243   if (stopping_)
244     return;
245   if (!createNamedPipe(lockProof)) {
246     GlobalOutput.perror("TPipeServer CreateNamedPipe failed, GLE=", GetLastError());
247     throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer CreateNamedPipe failed");
248   }
249 
250   // The prior connection has been handled, so close the gate
251   ResetEvent(listen_event_.h);
252   connectOverlap_.reset(nullptr, 0, listen_event_.h);
253   connectOverlap_.h = Pipe_.h;
254   thread_->addWorkItem(&connectOverlap_);
255 
256   // Wait for the client to connect; if it succeeds, the
257   // function returns a nonzero value. If the function returns
258   // zero, GetLastError should return ERROR_PIPE_CONNECTED.
259   if (connectOverlap_.success) {
260     GlobalOutput.printf("Client connected.");
261     cached_client_.reset(new TPipe(Pipe_));
262     // make sure people know that a connection is ready
263     SetEvent(listen_event_.h);
264     return;
265   }
266 
267   DWORD dwErr = connectOverlap_.last_error;
268   switch (dwErr) {
269   case ERROR_PIPE_CONNECTED:
270     GlobalOutput.printf("Client connected.");
271     cached_client_.reset(new TPipe(Pipe_));
272     // make sure people know that a connection is ready
273     SetEvent(listen_event_.h);
274     return;
275   case ERROR_IO_PENDING:
276     return; // acceptImpl will do the appropriate WaitForMultipleObjects
277   default:
278     GlobalOutput.perror("TPipeServer ConnectNamedPipe failed, GLE=", dwErr);
279     throw TTransportException(TTransportException::NOT_OPEN,
280                               " TPipeServer ConnectNamedPipe failed");
281   }
282 }
283 
acceptImpl()284 shared_ptr<TTransport> TNamedPipeServer::acceptImpl() {
285   {
286     TAutoCrit lock(pipe_protect_);
287     if (cached_client_.get() != nullptr) {
288       shared_ptr<TPipe> client;
289       // zero out cached_client, since we are about to return it.
290       client.swap(cached_client_);
291 
292       // kick off the next connection before returning
293       initiateNamedConnect(lock);
294       return client; // success!
295     }
296   }
297 
298   if (Pipe_.h == INVALID_HANDLE_VALUE) {
299     throw TTransportException(TTransportException::NOT_OPEN,
300                               "TNamedPipeServer: someone called accept on a closed pipe server");
301   }
302 
303   DWORD dwDummy = 0;
304 
305   // For the most part, Pipe_ should be protected with pipe_protect_.  We can't
306   // reasonably do that here though without breaking interruptability.  However,
307   // this should be safe, though I'm not happy about it.  We only need to ensure
308   // that no one writes / modifies Pipe_.h while we are reading it.  Well, the
309   // only two things that should be modifying Pipe_ are acceptImpl, the
310   // functions it calls, and the destructor.  Those things shouldn't be run
311   // concurrently anyway.  So this call is 'really' just a read that may happen
312   // concurrently with interrupt, and that should be fine.
313   if (GetOverlappedResult(Pipe_.h, &connectOverlap_.overlap, &dwDummy, TRUE)) {
314     TAutoCrit lock(pipe_protect_);
315     shared_ptr<TPipe> client;
316     try {
317       client.reset(new TPipe(Pipe_));
318     } catch (TTransportException& ttx) {
319       if (ttx.getType() == TTransportException::INTERRUPTED) {
320         throw;
321       }
322 
323       GlobalOutput.perror("Client connection failed. TTransportExceptionType=", ttx.getType());
324       // kick off the next connection before throwing
325       initiateNamedConnect(lock);
326       throw TTransportException(TTransportException::CLIENT_DISCONNECT, ttx.what());
327     }
328     GlobalOutput.printf("Client connected.");
329     // kick off the next connection before returning
330     initiateNamedConnect(lock);
331     return client; // success!
332   }
333   // if we got here, then we are in an error / shutdown case
334   DWORD gle = GetLastError(); // save error before doing cleanup
335   GlobalOutput.perror("TPipeServer ConnectNamedPipe GLE=", gle);
336   if(gle == ERROR_OPERATION_ABORTED) {
337     TAutoCrit lock(pipe_protect_);    	// Needed to insure concurrent thread to be out of interrupt.
338     throw TTransportException(TTransportException::INTERRUPTED, "TPipeServer: server interupted");
339   }
340   throw TTransportException(TTransportException::NOT_OPEN, "TPipeServer: client connection failed");
341 }
342 
interrupt()343 void TPipeServer::interrupt() {
344   if (impl_)
345     impl_->interrupt();
346 }
347 
close()348 void TPipeServer::close() {
349   impl_.reset();
350 }
351 
createNamedPipe(const TAutoCrit &)352 bool TNamedPipeServer::createNamedPipe(const TAutoCrit& /*lockProof*/) {
353 
354   PSECURITY_DESCRIPTOR psd = nullptr;
355   ULONG size = 0;
356 
357   if (!ConvertStringSecurityDescriptorToSecurityDescriptorA(securityDescriptor_.c_str(),
358                                                             SDDL_REVISION_1, &psd, &size)) {
359     DWORD lastError = GetLastError();
360     GlobalOutput.perror("TPipeServer::ConvertStringSecurityDescriptorToSecurityDescriptorA() GLE=",
361                         lastError);
362     throw TTransportException(
363         TTransportException::NOT_OPEN,
364         "TPipeServer::ConvertStringSecurityDescriptorToSecurityDescriptorA() failed", lastError);
365   }
366 
367   SECURITY_ATTRIBUTES sa;
368   sa.nLength = sizeof(SECURITY_ATTRIBUTES);
369   sa.lpSecurityDescriptor = psd;
370   sa.bInheritHandle = FALSE;
371 
372   // Create an instance of the named pipe
373   TAutoHandle hPipe(CreateNamedPipeA(pipename_.c_str(),        // pipe name
374                                      PIPE_ACCESS_DUPLEX |      // read/write access
375                                          FILE_FLAG_OVERLAPPED, // async mode
376                                      PIPE_TYPE_BYTE |          // byte type pipe
377                                          PIPE_READMODE_BYTE,   // byte read mode
378                                      maxconns_,                // max. instances
379                                      bufsize_,                 // output buffer size
380                                      bufsize_,                 // input buffer size
381                                      0,                        // client time-out
382                                      &sa));                    // security attributes
383 
384   auto lastError = GetLastError();
385   if (psd)
386     LocalFree(psd);
387 
388   if (hPipe.h == INVALID_HANDLE_VALUE) {
389     Pipe_.reset();
390     GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", lastError);
391     throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed",
392                               lastError);
393   }
394 
395   Pipe_.reset(hPipe.release());
396   return true;
397 }
398 
createAnonPipe()399 bool TAnonPipeServer::createAnonPipe() {
400   SECURITY_ATTRIBUTES sa;
401   SECURITY_DESCRIPTOR sd; // security information for pipes
402 
403   if (!InitializeSecurityDescriptor(&sd, SECURITY_DESCRIPTOR_REVISION)) {
404     GlobalOutput.perror("TPipeServer InitializeSecurityDescriptor (anon) failed, GLE=",
405                         GetLastError());
406     return false;
407   }
408   if (!SetSecurityDescriptorDacl(&sd, true, nullptr, false)) {
409     GlobalOutput.perror("TPipeServer SetSecurityDescriptorDacl (anon) failed, GLE=",
410                         GetLastError());
411     return false;
412   }
413   sa.lpSecurityDescriptor = &sd;
414   sa.nLength = sizeof(SECURITY_ATTRIBUTES);
415   sa.bInheritHandle = true; // allow passing handle to child
416 
417   HANDLE ClientAnonReadH, PipeW_H, ClientAnonWriteH, Pipe_H;
418   if (!CreatePipe(&ClientAnonReadH, &PipeW_H, &sa, 0)) // create stdin pipe
419   {
420     GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError());
421     return false;
422   }
423   if (!CreatePipe(&Pipe_H, &ClientAnonWriteH, &sa, 0)) // create stdout pipe
424   {
425     GlobalOutput.perror("TPipeServer CreatePipe (anon) failed, GLE=", GetLastError());
426     CloseHandle(ClientAnonReadH);
427     CloseHandle(PipeW_H);
428     return false;
429   }
430 
431   ClientAnonRead_.reset(ClientAnonReadH);
432   ClientAnonWrite_.reset(ClientAnonWriteH);
433   PipeR_.reset(Pipe_H);
434   PipeW_.reset(PipeW_H);
435 
436   return true;
437 }
438 
439 //---------------------------------------------------------
440 // Accessors
441 //---------------------------------------------------------
getPipename()442 std::string TPipeServer::getPipename() {
443   return pipename_;
444 }
445 
setPipename(const std::string & pipename)446 void TPipeServer::setPipename(const std::string& pipename) {
447   if (pipename.find("\\\\") == std::string::npos)
448     pipename_ = "\\\\.\\pipe\\" + pipename;
449   else
450     pipename_ = pipename;
451 }
452 
getBufferSize()453 int TPipeServer::getBufferSize() {
454   return bufsize_;
455 }
setBufferSize(int bufsize)456 void TPipeServer::setBufferSize(int bufsize) {
457   bufsize_ = bufsize;
458 }
459 
getPipeHandle()460 HANDLE TPipeServer::getPipeHandle() {
461   return impl_ ? impl_->getPipeHandle() : INVALID_HANDLE_VALUE;
462 }
getWrtPipeHandle()463 HANDLE TPipeServer::getWrtPipeHandle() {
464   return impl_ ? impl_->getWrtPipeHandle() : INVALID_HANDLE_VALUE;
465 }
getClientRdPipeHandle()466 HANDLE TPipeServer::getClientRdPipeHandle() {
467   return impl_ ? impl_->getClientRdPipeHandle() : INVALID_HANDLE_VALUE;
468 }
getClientWrtPipeHandle()469 HANDLE TPipeServer::getClientWrtPipeHandle() {
470   return impl_ ? impl_->getClientWrtPipeHandle() : INVALID_HANDLE_VALUE;
471 }
472 
getAnonymous()473 bool TPipeServer::getAnonymous() {
474   return isAnonymous_;
475 }
setAnonymous(bool anon)476 void TPipeServer::setAnonymous(bool anon) {
477   isAnonymous_ = anon;
478 }
479 
setSecurityDescriptor(const std::string & securityDescriptor)480 void TPipeServer::setSecurityDescriptor(const std::string& securityDescriptor) {
481   securityDescriptor_ = securityDescriptor;
482 }
483 
setMaxConnections(uint32_t maxconnections)484 void TPipeServer::setMaxConnections(uint32_t maxconnections) {
485   if (maxconnections == 0)
486     maxconns_ = 1;
487   else if (maxconnections > PIPE_UNLIMITED_INSTANCES)
488     maxconns_ = PIPE_UNLIMITED_INSTANCES;
489   else
490     maxconns_ = maxconnections;
491 }
492 
493 #endif //_WIN32
494 }
495 }
496 } // apache::thrift::transport
497