1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Diagnostics;
20 using System.IO;
21 using System.Threading;
22 using System.Threading.Tasks;
23 
24 namespace Thrift.Transport
25 {
26     //TODO: think about client info
27     // ReSharper disable once InconsistentNaming
28     public abstract class TTransport : IDisposable
29     {
30         //TODO: think how to avoid peek byte
31         private readonly byte[] _peekBuffer = new byte[1];
32         private bool _hasPeekByte;
33 
34         public abstract bool IsOpen { get; }
35         public abstract TConfiguration Configuration { get; }
UpdateKnownMessageSize(long size)36         public abstract void UpdateKnownMessageSize(long size);
CheckReadBytesAvailable(long numBytes)37         public abstract void CheckReadBytesAvailable(long numBytes);
ResetConsumedMessageSize(long newSize = -1)38         public abstract void ResetConsumedMessageSize(long newSize = -1);
Dispose()39         public void Dispose()
40         {
41             Dispose(true);
42             GC.SuppressFinalize(this);
43         }
44 
PeekAsync(CancellationToken cancellationToken)45         public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken)
46         {
47             //If we already have a byte read but not consumed, do nothing.
48             if (_hasPeekByte)
49             {
50                 return true;
51             }
52 
53             //If transport closed we can't peek.
54             if (!IsOpen)
55             {
56                 return false;
57             }
58 
59             //Try to read one byte. If succeeds we will need to store it for the next read.
60             try
61             {
62                 var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken);
63                 if (bytes == 0)
64                 {
65                     return false;
66                 }
67             }
68             catch (IOException)
69             {
70                 return false;
71             }
72 
73             _hasPeekByte = true;
74             return true;
75         }
76 
77 
OpenAsync(CancellationToken cancellationToken = default)78         public abstract Task OpenAsync(CancellationToken cancellationToken = default);
79 
Close()80         public abstract void Close();
81 
ValidateBufferArgs(byte[] buffer, int offset, int length)82         protected static void ValidateBufferArgs(byte[] buffer, int offset, int length)
83         {
84             if (buffer == null)
85             {
86                 throw new ArgumentNullException(nameof(buffer));
87             }
88 
89 #if DEBUG // let it fail with OutOfRange in RELEASE mode
90             if (offset < 0)
91             {
92                 throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0");
93             }
94 
95             if (length < 0)
96             {
97                 throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0");
98             }
99 
100             if (offset + length > buffer.Length)
101             {
102                 throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data");
103             }
104 #endif
105         }
106 
107 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)108         public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
109 
ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)110         public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
111         {
112             cancellationToken.ThrowIfCancellationRequested();
113 
114             ValidateBufferArgs(buffer, offset, length);
115             if (length <= 0)
116                 return 0;
117 
118             // If we previously peeked a byte, we need to use that first.
119             var totalBytes = 0;
120             if (_hasPeekByte)
121             {
122                 buffer[offset++] = _peekBuffer[0];
123                 _hasPeekByte = false;
124                 if (1 == length)
125                 {
126                     return 1; // we're done
127                 }
128                 ++totalBytes;
129             }
130 
131             var remaining = length - totalBytes;
132             Debug.Assert(remaining > 0);  // any other possible cases should have been handled already
133             while (true)
134             {
135                 var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken);
136                 totalBytes += numBytes;
137                 if (totalBytes >= length)
138                 {
139                     return totalBytes; // we're done
140                 }
141 
142                 if (numBytes <= 0)
143                 {
144                     throw new TTransportException(TTransportException.ExceptionType.EndOfFile,
145                         "Cannot read, Remote side has closed");
146                 }
147 
148                 remaining -= numBytes;
149                 offset += numBytes;
150             }
151         }
152 
WriteAsync(byte[] buffer, CancellationToken cancellationToken)153         public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
154         {
155             await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None);
156         }
157 
WriteAsync(byte[] buffer, int offset, int length)158         public virtual async Task WriteAsync(byte[] buffer, int offset, int length)
159         {
160             await WriteAsync(buffer, offset, length, CancellationToken.None);
161         }
162 
WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)163         public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
164 
165 
FlushAsync(CancellationToken cancellationToken)166         public abstract Task FlushAsync(CancellationToken cancellationToken);
167 
Dispose(bool disposing)168         protected abstract void Dispose(bool disposing);
169     }
170 }
171