1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Diagnostics; 20 using System.IO; 21 using System.Threading; 22 using System.Threading.Tasks; 23 24 namespace Thrift.Transport 25 { 26 //TODO: think about client info 27 // ReSharper disable once InconsistentNaming 28 public abstract class TTransport : IDisposable 29 { 30 //TODO: think how to avoid peek byte 31 private readonly byte[] _peekBuffer = new byte[1]; 32 private bool _hasPeekByte; 33 34 public abstract bool IsOpen { get; } 35 public abstract TConfiguration Configuration { get; } UpdateKnownMessageSize(long size)36 public abstract void UpdateKnownMessageSize(long size); CheckReadBytesAvailable(long numBytes)37 public abstract void CheckReadBytesAvailable(long numBytes); ResetConsumedMessageSize(long newSize = -1)38 public abstract void ResetConsumedMessageSize(long newSize = -1); Dispose()39 public void Dispose() 40 { 41 Dispose(true); 42 GC.SuppressFinalize(this); 43 } 44 PeekAsync(CancellationToken cancellationToken)45 public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken) 46 { 47 //If we already have a byte read but not consumed, do nothing. 48 if (_hasPeekByte) 49 { 50 return true; 51 } 52 53 //If transport closed we can't peek. 54 if (!IsOpen) 55 { 56 return false; 57 } 58 59 //Try to read one byte. If succeeds we will need to store it for the next read. 60 try 61 { 62 var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken); 63 if (bytes == 0) 64 { 65 return false; 66 } 67 } 68 catch (IOException) 69 { 70 return false; 71 } 72 73 _hasPeekByte = true; 74 return true; 75 } 76 77 OpenAsync(CancellationToken cancellationToken = default)78 public abstract Task OpenAsync(CancellationToken cancellationToken = default); 79 Close()80 public abstract void Close(); 81 ValidateBufferArgs(byte[] buffer, int offset, int length)82 protected static void ValidateBufferArgs(byte[] buffer, int offset, int length) 83 { 84 if (buffer == null) 85 { 86 throw new ArgumentNullException(nameof(buffer)); 87 } 88 89 #if DEBUG // let it fail with OutOfRange in RELEASE mode 90 if (offset < 0) 91 { 92 throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0"); 93 } 94 95 if (length < 0) 96 { 97 throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0"); 98 } 99 100 if (offset + length > buffer.Length) 101 { 102 throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data"); 103 } 104 #endif 105 } 106 107 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)108 public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); 109 ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)110 public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 111 { 112 cancellationToken.ThrowIfCancellationRequested(); 113 114 ValidateBufferArgs(buffer, offset, length); 115 if (length <= 0) 116 return 0; 117 118 // If we previously peeked a byte, we need to use that first. 119 var totalBytes = 0; 120 if (_hasPeekByte) 121 { 122 buffer[offset++] = _peekBuffer[0]; 123 _hasPeekByte = false; 124 if (1 == length) 125 { 126 return 1; // we're done 127 } 128 ++totalBytes; 129 } 130 131 var remaining = length - totalBytes; 132 Debug.Assert(remaining > 0); // any other possible cases should have been handled already 133 while (true) 134 { 135 var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken); 136 totalBytes += numBytes; 137 if (totalBytes >= length) 138 { 139 return totalBytes; // we're done 140 } 141 142 if (numBytes <= 0) 143 { 144 throw new TTransportException(TTransportException.ExceptionType.EndOfFile, 145 "Cannot read, Remote side has closed"); 146 } 147 148 remaining -= numBytes; 149 offset += numBytes; 150 } 151 } 152 WriteAsync(byte[] buffer, CancellationToken cancellationToken)153 public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) 154 { 155 await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None); 156 } 157 WriteAsync(byte[] buffer, int offset, int length)158 public virtual async Task WriteAsync(byte[] buffer, int offset, int length) 159 { 160 await WriteAsync(buffer, offset, length, CancellationToken.None); 161 } 162 WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)163 public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); 164 165 FlushAsync(CancellationToken cancellationToken)166 public abstract Task FlushAsync(CancellationToken cancellationToken); 167 Dispose(bool disposing)168 protected abstract void Dispose(bool disposing); 169 } 170 } 171