1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using Microsoft.Win32.SafeHandles;
19 using System;
20 using System.IO.Pipes;
21 using System.Runtime.InteropServices;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using System.ComponentModel;
25 using System.Security.AccessControl;
26 using System.Security.Principal;
27 using System.Collections.Generic;
28 using System.IO;
29 using System.Diagnostics;
30 
31 #pragma warning disable CS1998  // async no await
32 
33 namespace Thrift.Transport.Server
34 {
35     [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
36     [Flags]
37     public enum NamedPipeClientFlags {  // bad name
38         None = 0x00,
39         OnlyLocalClients = 0x01
40     };
41 
42     [Flags]
43     public enum NamedPipeServerFlags
44     {
45         None = 0x00,
46         OnlyLocalClients = 0x01,
47     };
48 
49 
50     // ReSharper disable once InconsistentNaming
51     public class TNamedPipeServerTransport : TServerTransport
52     {
53         // to manage incoming connections, we set up a task for each stream to listen on
54         private struct TaskStreamPair
55         {
56             public NamedPipeServerStream Stream;
57             public Task Task;
58 
TaskStreamPairThrift.Transport.Server.TNamedPipeServerTransport.TaskStreamPair59             public TaskStreamPair(NamedPipeServerStream stream, Task task)
60             {
61                 Stream = stream;
62                 Task = task;
63             }
64         }
65 
66         /// <summary>
67         ///     This is the address of the Pipe on the localhost.
68         /// </summary>
69         private readonly string _pipeAddress;
70         private bool _asyncMode = true;
71         private volatile bool _isPending = true;
72         private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>();
73         private readonly bool _onlyLocalClients = false;  // compatibility default
74         private readonly byte _numListenPipes = 1;  // compatibility default
75 
TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)76         public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
77             : base(config)
78         {
79             if ((numListenPipes < 1) || (numListenPipes > 254))
80                 throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
81 
82             _pipeAddress = pipeAddress;
83             _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
84             _numListenPipes = (byte)numListenPipes;
85         }
86 
87         [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)88         public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
89             : base(config)
90         {
91             if ((numListenPipes < 1) || (numListenPipes > 254))
92                 throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
93 
94             _pipeAddress = pipeAddress;
95             _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
96             _numListenPipes = (byte)numListenPipes;
97         }
98 
99 
IsOpen()100         public override bool IsOpen() {
101             return true;
102         }
103 
Listen()104         public override void Listen()
105         {
106             // nothing to do here
107         }
108 
Close(NamedPipeServerStream pipe)109         private static void Close(NamedPipeServerStream pipe)
110         {
111             if (pipe != null)
112             {
113                 try
114                 {
115                     if (pipe.IsConnected)
116                         pipe.Disconnect();
117                 }
118                 finally
119                 {
120                     pipe.Dispose();
121                 }
122             }
123         }
124 
Close()125         public override void Close()
126         {
127             try
128             {
129                 if (_streams != null)
130                 {
131                     while(_streams.Count > 0)
132                     {
133                         Close(_streams[0].Stream);
134                         _streams.RemoveAt(0);
135                     }
136                 }
137             }
138             finally
139             {
140                 _streams.Clear();
141                 _isPending = false;
142             }
143         }
144 
IsClientPending()145         public override bool IsClientPending()
146         {
147             return _isPending;
148         }
149 
EnsurePipeInstances()150         private void EnsurePipeInstances()
151         {
152             // set up a pool for accepting multiple calls when in multithread mode
153             // once connected, we hand that stream over to the processor and create a fresh one
154             try
155             {
156                 while (_streams.Count < _numListenPipes)
157                     _streams.Add(CreatePipeInstance());
158             }
159             catch
160             {
161                 // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
162                 // if we have at least one pipe to listen on -> Good Enough(tm)
163                 if (_streams.Count < 1)
164                     throw;  // no pipes is really bad
165             }
166         }
167 
CreatePipeInstance()168         private TaskStreamPair CreatePipeInstance()
169         {
170             const PipeDirection direction = PipeDirection.InOut;
171             const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
172             const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
173             const int inbuf = 4096;
174             const int outbuf = 4096;
175             var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
176 
177 
178             // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
179             // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
180             // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
181             // EITHER WAY,
182             // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
183 
184             NamedPipeServerStream instance;
185             try
186             {
187                 var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
188                 if ((handle != null) && (!handle.IsInvalid))
189                 {
190                     instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
191                     handle = null; // we don't own it any longer
192                 }
193                 else
194                 {
195                     handle?.Dispose();
196                     instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
197                 }
198             }
199             catch (NotImplementedException) // Mono still does not support async, fallback to sync
200             {
201                 if (_asyncMode)
202                 {
203                     options &= (~PipeOptions.Asynchronous);
204                     instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
205                     _asyncMode = false;
206                 }
207                 else
208                 {
209                     throw;
210                 }
211             }
212 
213             // the task gets added later
214             return new TaskStreamPair( instance, null);
215         }
216 
217 
218         #region CreatePipeNative workaround
219 
220 
221         [StructLayout(LayoutKind.Sequential)]
222         internal class SECURITY_ATTRIBUTES
223         {
224             internal int nLength = 0;
225             internal IntPtr lpSecurityDescriptor = IntPtr.Zero;
226             internal int bInheritHandle = 0;
227         }
228 
229 
230         private const string Kernel32 = "kernel32.dll";
231 
232         [DllImport(Kernel32, SetLastError = true, CharSet = CharSet.Unicode)]
CreateNamedPipe( string lpName, uint dwOpenMode, uint dwPipeMode, uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut, SECURITY_ATTRIBUTES pipeSecurityDescriptor )233         internal static extern IntPtr CreateNamedPipe(
234             string lpName, uint dwOpenMode, uint dwPipeMode,
235             uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut,
236             SECURITY_ATTRIBUTES pipeSecurityDescriptor
237             );
238 
239 
240 
241         // Workaround: create the pipe via API call
242         // we have to do it this way, since NamedPipeServerStream() for netstd still lacks a few CTORs and/or arguments
243         // and _stream.SetAccessControl(pipesec); only keeps throwing ACCESS_DENIED errors at us
244         // References:
245         // - https://github.com/dotnet/corefx/issues/30170 (closed, continued in 31190)
246         // - https://github.com/dotnet/corefx/issues/31190 System.IO.Pipes.AccessControl package does not work
247         // - https://github.com/dotnet/corefx/issues/24040 NamedPipeServerStream: Provide support for WRITE_DAC
248         // - https://github.com/dotnet/corefx/issues/34400 Have a mechanism for lower privileged user to connect to a privileged user's pipe
CreatePipeNative(string name, int inbuf, int outbuf, bool OnlyLocalClients)249         private static SafePipeHandle CreatePipeNative(string name, int inbuf, int outbuf, bool OnlyLocalClients)
250         {
251             if (! RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
252                 return null; // Windows only
253 
254             var pinningHandle = new GCHandle();
255             try
256             {
257                 // owner gets full access, everyone else read/write
258                 var pipesec = new PipeSecurity();
259                 using (var currentIdentity = WindowsIdentity.GetCurrent())
260                 {
261                     var sidOwner = currentIdentity.Owner;
262                     var sidWorld = new SecurityIdentifier(WellKnownSidType.WorldSid, null);
263 
264                     pipesec.SetOwner(sidOwner);
265                     pipesec.AddAccessRule(new PipeAccessRule(sidOwner, PipeAccessRights.FullControl, AccessControlType.Allow));
266                     pipesec.AddAccessRule(new PipeAccessRule(sidWorld, PipeAccessRights.ReadWrite, AccessControlType.Allow));
267                 }
268 
269                 // create a security descriptor and assign it to the security attribs
270                 var secAttrs = new SECURITY_ATTRIBUTES();
271                 byte[] sdBytes = pipesec.GetSecurityDescriptorBinaryForm();
272                 pinningHandle = GCHandle.Alloc(sdBytes, GCHandleType.Pinned);
273                 unsafe {
274                     fixed (byte* pSD = sdBytes) {
275                         secAttrs.lpSecurityDescriptor = (IntPtr)pSD;
276                     }
277                 }
278 
279                 // a bunch of constants we will need shortly
280                 const uint PIPE_ACCESS_DUPLEX = 0x00000003;
281                 const uint FILE_FLAG_OVERLAPPED = 0x40000000;
282                 const uint WRITE_DAC = 0x00040000;
283                 const uint PIPE_TYPE_BYTE = 0x00000000;
284                 const uint PIPE_READMODE_BYTE = 0x00000000;
285                 const uint PIPE_UNLIMITED_INSTANCES = 255;
286                 const uint PIPE_ACCEPT_REMOTE_CLIENTS = 0x00000000;  // Connections from remote clients can be accepted and checked against the security descriptor for the pipe.
287                 const uint PIPE_REJECT_REMOTE_CLIENTS = 0x00000008;  // Connections from remote clients are automatically rejected.
288 
289                 // any extra flags we want to add
290                 uint dwPipeModeXtra
291                     = (OnlyLocalClients ? PIPE_REJECT_REMOTE_CLIENTS : PIPE_ACCEPT_REMOTE_CLIENTS)
292                     ;
293 
294                 // create the pipe via API call
295                 var rawHandle = CreateNamedPipe(
296                     @"\\.\pipe\" + name,
297                     PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC,
298                     PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | dwPipeModeXtra,
299                     PIPE_UNLIMITED_INSTANCES, (uint)inbuf, (uint)outbuf,
300                     5 * 1000,
301                     secAttrs
302                     );
303 
304                 // make a SafePipeHandle() from it
305                 var handle = new SafePipeHandle(rawHandle, true);
306                 if (handle.IsInvalid)
307                     throw new Win32Exception(Marshal.GetLastWin32Error());
308 
309                 // return it (to be packaged)
310                 return handle;
311             }
312             finally
313             {
314                 if (pinningHandle.IsAllocated)
315                     pinningHandle.Free();
316             }
317         }
318 
319         #endregion
320 
AcceptImplementationAsync(CancellationToken cancellationToken)321         protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
322         {
323             try
324             {
325                 EnsurePipeInstances();
326 
327                 // fill the list and wait for any task to be completed
328                 var tasks = new List<Task>();
329                 for (var i = 0; i < _streams.Count; ++i)
330                 {
331                     if (_streams[i].Task == null)
332                     {
333                         var pair = _streams[i];
334                         pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
335                         _streams[i] = pair;
336                     }
337 
338                     tasks.Add(_streams[i].Task);
339                 }
340 
341                 // there must be an exact mapping between task index and stream index
342                 Debug.Assert(_streams.Count == tasks.Count);
343                 var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
344 
345                 var trans = new ServerTransport(_streams[index].Stream, Configuration);
346                 _streams.RemoveAt(index); // pass stream ownership to ServerTransport
347 
348                 return trans;
349             }
350             catch (TTransportException)
351             {
352                 Close();
353                 throw;
354             }
355             catch (TaskCanceledException)
356             {
357                 Close();
358                 throw;  // let it bubble up
359             }
360             catch (Exception e)
361             {
362                 Close();
363                 throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
364             }
365         }
366 
367         private class ServerTransport : TEndpointTransport
368         {
369             private NamedPipeServerStream PipeStream;
370 
ServerTransport(NamedPipeServerStream stream, TConfiguration config)371             public ServerTransport(NamedPipeServerStream stream, TConfiguration config)
372                 : base(config)
373             {
374                 PipeStream = stream;
375             }
376 
377             public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
378 
OpenAsync(CancellationToken cancellationToken)379             public override Task OpenAsync(CancellationToken cancellationToken)
380             {
381                 cancellationToken.ThrowIfCancellationRequested();
382                 return Task.CompletedTask;
383             }
384 
Close()385             public override void Close()
386             {
387                 if (PipeStream != null)
388                 {
389                     if (PipeStream.IsConnected)
390                         PipeStream.Disconnect();
391                     PipeStream.Dispose();
392                     PipeStream = null;
393                 }
394             }
395 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)396             public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
397             {
398                 if (PipeStream == null)
399                 {
400                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
401                 }
402 
403                 CheckReadBytesAvailable(length);
404 #if NETSTANDARD2_0
405                 var numBytes = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
406 #else
407                 var numBytes = await PipeStream.ReadAsync(buffer.AsMemory(offset, length), cancellationToken);
408 #endif
409                 CountConsumedMessageBytes(numBytes);
410                 return numBytes;
411             }
412 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)413             public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
414             {
415                 if (PipeStream == null)
416                 {
417                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
418                 }
419 
420                 // if necessary, send the data in chunks
421                 // there's a system limit around 0x10000 bytes that we hit otherwise
422                 // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
423                 var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
424                 while (nBytes > 0)
425                 {
426 #if NET5_0_OR_GREATER
427                     await PipeStream.WriteAsync(buffer.AsMemory(offset, nBytes), cancellationToken);
428 #else
429                     await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
430 #endif
431                     offset += nBytes;
432                     length -= nBytes;
433                     nBytes = Math.Min(nBytes, length);
434                 }
435             }
436 
FlushAsync(CancellationToken cancellationToken)437             public override async Task FlushAsync(CancellationToken cancellationToken)
438             {
439                 await PipeStream.FlushAsync(cancellationToken);
440                 ResetConsumedMessageSize();
441             }
442 
Dispose(bool disposing)443             protected override void Dispose(bool disposing)
444             {
445                 if (disposing)
446                 {
447                     if (PipeStream != null)
448                     {
449                         if (PipeStream.IsConnected)
450                             PipeStream.Disconnect();
451                         PipeStream.Dispose();
452                         PipeStream = null;
453                     }
454                 }
455             }
456         }
457     }
458 }
459