1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using Microsoft.Win32.SafeHandles; 19 using System; 20 using System.IO.Pipes; 21 using System.Runtime.InteropServices; 22 using System.Threading; 23 using System.Threading.Tasks; 24 using System.ComponentModel; 25 using System.Security.AccessControl; 26 using System.Security.Principal; 27 using System.Collections.Generic; 28 using System.IO; 29 using System.Diagnostics; 30 31 #pragma warning disable CS1998 // async no await 32 33 namespace Thrift.Transport.Server 34 { 35 [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")] 36 [Flags] 37 public enum NamedPipeClientFlags { // bad name 38 None = 0x00, 39 OnlyLocalClients = 0x01 40 }; 41 42 [Flags] 43 public enum NamedPipeServerFlags 44 { 45 None = 0x00, 46 OnlyLocalClients = 0x01, 47 }; 48 49 50 // ReSharper disable once InconsistentNaming 51 public class TNamedPipeServerTransport : TServerTransport 52 { 53 // to manage incoming connections, we set up a task for each stream to listen on 54 private struct TaskStreamPair 55 { 56 public NamedPipeServerStream Stream; 57 public Task Task; 58 TaskStreamPairThrift.Transport.Server.TNamedPipeServerTransport.TaskStreamPair59 public TaskStreamPair(NamedPipeServerStream stream, Task task) 60 { 61 Stream = stream; 62 Task = task; 63 } 64 } 65 66 /// <summary> 67 /// This is the address of the Pipe on the localhost. 68 /// </summary> 69 private readonly string _pipeAddress; 70 private bool _asyncMode = true; 71 private volatile bool _isPending = true; 72 private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>(); 73 private readonly bool _onlyLocalClients = false; // compatibility default 74 private readonly byte _numListenPipes = 1; // compatibility default 75 TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)76 public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes) 77 : base(config) 78 { 79 if ((numListenPipes < 1) || (numListenPipes > 254)) 80 throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]"); 81 82 _pipeAddress = pipeAddress; 83 _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients); 84 _numListenPipes = (byte)numListenPipes; 85 } 86 87 [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")] TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)88 public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1) 89 : base(config) 90 { 91 if ((numListenPipes < 1) || (numListenPipes > 254)) 92 throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]"); 93 94 _pipeAddress = pipeAddress; 95 _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients); 96 _numListenPipes = (byte)numListenPipes; 97 } 98 99 IsOpen()100 public override bool IsOpen() { 101 return true; 102 } 103 Listen()104 public override void Listen() 105 { 106 // nothing to do here 107 } 108 Close(NamedPipeServerStream pipe)109 private static void Close(NamedPipeServerStream pipe) 110 { 111 if (pipe != null) 112 { 113 try 114 { 115 if (pipe.IsConnected) 116 pipe.Disconnect(); 117 } 118 finally 119 { 120 pipe.Dispose(); 121 } 122 } 123 } 124 Close()125 public override void Close() 126 { 127 try 128 { 129 if (_streams != null) 130 { 131 while(_streams.Count > 0) 132 { 133 Close(_streams[0].Stream); 134 _streams.RemoveAt(0); 135 } 136 } 137 } 138 finally 139 { 140 _streams.Clear(); 141 _isPending = false; 142 } 143 } 144 IsClientPending()145 public override bool IsClientPending() 146 { 147 return _isPending; 148 } 149 EnsurePipeInstances()150 private void EnsurePipeInstances() 151 { 152 // set up a pool for accepting multiple calls when in multithread mode 153 // once connected, we hand that stream over to the processor and create a fresh one 154 try 155 { 156 while (_streams.Count < _numListenPipes) 157 _streams.Add(CreatePipeInstance()); 158 } 159 catch 160 { 161 // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls 162 // if we have at least one pipe to listen on -> Good Enough(tm) 163 if (_streams.Count < 1) 164 throw; // no pipes is really bad 165 } 166 } 167 CreatePipeInstance()168 private TaskStreamPair CreatePipeInstance() 169 { 170 const PipeDirection direction = PipeDirection.InOut; 171 const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances; 172 const PipeTransmissionMode mode = PipeTransmissionMode.Byte; 173 const int inbuf = 4096; 174 const int outbuf = 4096; 175 var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; 176 177 178 // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes: 179 // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative() 180 // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative() 181 // EITHER WAY, 182 // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings 183 184 NamedPipeServerStream instance; 185 try 186 { 187 var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients); 188 if ((handle != null) && (!handle.IsInvalid)) 189 { 190 instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle); 191 handle = null; // we don't own it any longer 192 } 193 else 194 { 195 handle?.Dispose(); 196 instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/); 197 } 198 } 199 catch (NotImplementedException) // Mono still does not support async, fallback to sync 200 { 201 if (_asyncMode) 202 { 203 options &= (~PipeOptions.Asynchronous); 204 instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); 205 _asyncMode = false; 206 } 207 else 208 { 209 throw; 210 } 211 } 212 213 // the task gets added later 214 return new TaskStreamPair( instance, null); 215 } 216 217 218 #region CreatePipeNative workaround 219 220 221 [StructLayout(LayoutKind.Sequential)] 222 internal class SECURITY_ATTRIBUTES 223 { 224 internal int nLength = 0; 225 internal IntPtr lpSecurityDescriptor = IntPtr.Zero; 226 internal int bInheritHandle = 0; 227 } 228 229 230 private const string Kernel32 = "kernel32.dll"; 231 232 [DllImport(Kernel32, SetLastError = true, CharSet = CharSet.Unicode)] CreateNamedPipe( string lpName, uint dwOpenMode, uint dwPipeMode, uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut, SECURITY_ATTRIBUTES pipeSecurityDescriptor )233 internal static extern IntPtr CreateNamedPipe( 234 string lpName, uint dwOpenMode, uint dwPipeMode, 235 uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut, 236 SECURITY_ATTRIBUTES pipeSecurityDescriptor 237 ); 238 239 240 241 // Workaround: create the pipe via API call 242 // we have to do it this way, since NamedPipeServerStream() for netstd still lacks a few CTORs and/or arguments 243 // and _stream.SetAccessControl(pipesec); only keeps throwing ACCESS_DENIED errors at us 244 // References: 245 // - https://github.com/dotnet/corefx/issues/30170 (closed, continued in 31190) 246 // - https://github.com/dotnet/corefx/issues/31190 System.IO.Pipes.AccessControl package does not work 247 // - https://github.com/dotnet/corefx/issues/24040 NamedPipeServerStream: Provide support for WRITE_DAC 248 // - https://github.com/dotnet/corefx/issues/34400 Have a mechanism for lower privileged user to connect to a privileged user's pipe CreatePipeNative(string name, int inbuf, int outbuf, bool OnlyLocalClients)249 private static SafePipeHandle CreatePipeNative(string name, int inbuf, int outbuf, bool OnlyLocalClients) 250 { 251 if (! RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) 252 return null; // Windows only 253 254 var pinningHandle = new GCHandle(); 255 try 256 { 257 // owner gets full access, everyone else read/write 258 var pipesec = new PipeSecurity(); 259 using (var currentIdentity = WindowsIdentity.GetCurrent()) 260 { 261 var sidOwner = currentIdentity.Owner; 262 var sidWorld = new SecurityIdentifier(WellKnownSidType.WorldSid, null); 263 264 pipesec.SetOwner(sidOwner); 265 pipesec.AddAccessRule(new PipeAccessRule(sidOwner, PipeAccessRights.FullControl, AccessControlType.Allow)); 266 pipesec.AddAccessRule(new PipeAccessRule(sidWorld, PipeAccessRights.ReadWrite, AccessControlType.Allow)); 267 } 268 269 // create a security descriptor and assign it to the security attribs 270 var secAttrs = new SECURITY_ATTRIBUTES(); 271 byte[] sdBytes = pipesec.GetSecurityDescriptorBinaryForm(); 272 pinningHandle = GCHandle.Alloc(sdBytes, GCHandleType.Pinned); 273 unsafe { 274 fixed (byte* pSD = sdBytes) { 275 secAttrs.lpSecurityDescriptor = (IntPtr)pSD; 276 } 277 } 278 279 // a bunch of constants we will need shortly 280 const uint PIPE_ACCESS_DUPLEX = 0x00000003; 281 const uint FILE_FLAG_OVERLAPPED = 0x40000000; 282 const uint WRITE_DAC = 0x00040000; 283 const uint PIPE_TYPE_BYTE = 0x00000000; 284 const uint PIPE_READMODE_BYTE = 0x00000000; 285 const uint PIPE_UNLIMITED_INSTANCES = 255; 286 const uint PIPE_ACCEPT_REMOTE_CLIENTS = 0x00000000; // Connections from remote clients can be accepted and checked against the security descriptor for the pipe. 287 const uint PIPE_REJECT_REMOTE_CLIENTS = 0x00000008; // Connections from remote clients are automatically rejected. 288 289 // any extra flags we want to add 290 uint dwPipeModeXtra 291 = (OnlyLocalClients ? PIPE_REJECT_REMOTE_CLIENTS : PIPE_ACCEPT_REMOTE_CLIENTS) 292 ; 293 294 // create the pipe via API call 295 var rawHandle = CreateNamedPipe( 296 @"\\.\pipe\" + name, 297 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC, 298 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | dwPipeModeXtra, 299 PIPE_UNLIMITED_INSTANCES, (uint)inbuf, (uint)outbuf, 300 5 * 1000, 301 secAttrs 302 ); 303 304 // make a SafePipeHandle() from it 305 var handle = new SafePipeHandle(rawHandle, true); 306 if (handle.IsInvalid) 307 throw new Win32Exception(Marshal.GetLastWin32Error()); 308 309 // return it (to be packaged) 310 return handle; 311 } 312 finally 313 { 314 if (pinningHandle.IsAllocated) 315 pinningHandle.Free(); 316 } 317 } 318 319 #endregion 320 AcceptImplementationAsync(CancellationToken cancellationToken)321 protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken) 322 { 323 try 324 { 325 EnsurePipeInstances(); 326 327 // fill the list and wait for any task to be completed 328 var tasks = new List<Task>(); 329 for (var i = 0; i < _streams.Count; ++i) 330 { 331 if (_streams[i].Task == null) 332 { 333 var pair = _streams[i]; 334 pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken); 335 _streams[i] = pair; 336 } 337 338 tasks.Add(_streams[i].Task); 339 } 340 341 // there must be an exact mapping between task index and stream index 342 Debug.Assert(_streams.Count == tasks.Count); 343 var index = Task.WaitAny(tasks.ToArray(), cancellationToken); 344 345 var trans = new ServerTransport(_streams[index].Stream, Configuration); 346 _streams.RemoveAt(index); // pass stream ownership to ServerTransport 347 348 return trans; 349 } 350 catch (TTransportException) 351 { 352 Close(); 353 throw; 354 } 355 catch (TaskCanceledException) 356 { 357 Close(); 358 throw; // let it bubble up 359 } 360 catch (Exception e) 361 { 362 Close(); 363 throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message); 364 } 365 } 366 367 private class ServerTransport : TEndpointTransport 368 { 369 private NamedPipeServerStream PipeStream; 370 ServerTransport(NamedPipeServerStream stream, TConfiguration config)371 public ServerTransport(NamedPipeServerStream stream, TConfiguration config) 372 : base(config) 373 { 374 PipeStream = stream; 375 } 376 377 public override bool IsOpen => PipeStream != null && PipeStream.IsConnected; 378 OpenAsync(CancellationToken cancellationToken)379 public override Task OpenAsync(CancellationToken cancellationToken) 380 { 381 cancellationToken.ThrowIfCancellationRequested(); 382 return Task.CompletedTask; 383 } 384 Close()385 public override void Close() 386 { 387 if (PipeStream != null) 388 { 389 if (PipeStream.IsConnected) 390 PipeStream.Disconnect(); 391 PipeStream.Dispose(); 392 PipeStream = null; 393 } 394 } 395 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)396 public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 397 { 398 if (PipeStream == null) 399 { 400 throw new TTransportException(TTransportException.ExceptionType.NotOpen); 401 } 402 403 CheckReadBytesAvailable(length); 404 #if NETSTANDARD2_0 405 var numBytes = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); 406 #else 407 var numBytes = await PipeStream.ReadAsync(buffer.AsMemory(offset, length), cancellationToken); 408 #endif 409 CountConsumedMessageBytes(numBytes); 410 return numBytes; 411 } 412 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)413 public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 414 { 415 if (PipeStream == null) 416 { 417 throw new TTransportException(TTransportException.ExceptionType.NotOpen); 418 } 419 420 // if necessary, send the data in chunks 421 // there's a system limit around 0x10000 bytes that we hit otherwise 422 // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." 423 var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit 424 while (nBytes > 0) 425 { 426 #if NET5_0_OR_GREATER 427 await PipeStream.WriteAsync(buffer.AsMemory(offset, nBytes), cancellationToken); 428 #else 429 await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken); 430 #endif 431 offset += nBytes; 432 length -= nBytes; 433 nBytes = Math.Min(nBytes, length); 434 } 435 } 436 FlushAsync(CancellationToken cancellationToken)437 public override async Task FlushAsync(CancellationToken cancellationToken) 438 { 439 await PipeStream.FlushAsync(cancellationToken); 440 ResetConsumedMessageSize(); 441 } 442 Dispose(bool disposing)443 protected override void Dispose(bool disposing) 444 { 445 if (disposing) 446 { 447 if (PipeStream != null) 448 { 449 if (PipeStream.IsConnected) 450 PipeStream.Disconnect(); 451 PipeStream.Dispose(); 452 PipeStream = null; 453 } 454 } 455 } 456 } 457 } 458 } 459