1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Diagnostics;
20 using System.IO;
21 using System.Threading;
22 using System.Threading.Tasks;
23 
24 
25 namespace Thrift.Transport.Client
26 {
27     // ReSharper disable once InconsistentNaming
28     public class TMemoryBufferTransport : TEndpointTransport
29     {
30         private bool IsDisposed;
31         private byte[] Bytes;
32         private int _bytesUsed;
33 
TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)34         public TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)
35             : base(config)
36         {
37             Bytes = new byte[initialCapacity];
38         }
39 
TMemoryBufferTransport(byte[] buf, TConfiguration config)40         public TMemoryBufferTransport(byte[] buf, TConfiguration config)
41             :base(config)
42         {
43             Bytes = (byte[])buf.Clone();
44             _bytesUsed = Bytes.Length;
45             UpdateKnownMessageSize(_bytesUsed);
46         }
47 
48         public int Position { get; set; }
49 
50         public int Capacity
51         {
52             get
53             {
54                 Debug.Assert(_bytesUsed <= Bytes.Length);
55                 return Bytes.Length;
56             }
57             set
58             {
59                 Array.Resize(ref Bytes, value);
60                 _bytesUsed = value;
61             }
62         }
63 
64         public int Length
65         {
66             get {
67                 Debug.Assert(_bytesUsed <= Bytes.Length);
68                 return _bytesUsed;
69             }
70             set {
71                 if ((Bytes.Length < value) || (Bytes.Length > (10 * value)))
72                     Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25)));
73                 _bytesUsed = value;
74             }
75         }
76 
SetLength(int value)77         public void SetLength(int value)
78         {
79             Length = value;
80             Position = Math.Min(Position, value);
81         }
82 
83         public override bool IsOpen => true;
84 
OpenAsync(CancellationToken cancellationToken)85         public override Task OpenAsync(CancellationToken cancellationToken)
86         {
87             cancellationToken.ThrowIfCancellationRequested();
88             return Task.CompletedTask;
89         }
90 
Close()91         public override void Close()
92         {
93             /* do nothing */
94         }
95 
Seek(int delta, SeekOrigin origin)96         public void Seek(int delta, SeekOrigin origin)
97         {
98             int newPos;
99             switch (origin)
100             {
101                 case SeekOrigin.Begin:
102                     newPos = delta;
103                     break;
104                 case SeekOrigin.Current:
105                     newPos = Position + delta;
106                     break;
107                 case SeekOrigin.End:
108                     newPos = _bytesUsed + delta;
109                     break;
110                 default:
111                     throw new ArgumentException("Unrecognized value",nameof(origin));
112             }
113 
114             if ((0 > newPos) || (newPos > _bytesUsed))
115                 throw new ArgumentException("Cannot seek outside of the valid range",nameof(origin));
116             Position = newPos;
117 
118             ResetConsumedMessageSize();
119             CountConsumedMessageBytes(Position);
120         }
121 
ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)122         public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
123         {
124             var count = Math.Min(Length - Position, length);
125             Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
126             Position += count;
127             CountConsumedMessageBytes(count);
128             return new ValueTask<int>(count);
129         }
130 
WriteAsync(byte[] buffer, CancellationToken cancellationToken)131         public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
132         {
133             return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
134         }
135 
WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)136         public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
137         {
138             var free = Length - Position;
139             Length = Length + count - free;
140             Buffer.BlockCopy(buffer, offset, Bytes, Position, count);
141             Position += count;
142             return Task.CompletedTask;
143         }
144 
FlushAsync(CancellationToken cancellationToken)145         public override Task FlushAsync(CancellationToken cancellationToken)
146         {
147             cancellationToken.ThrowIfCancellationRequested();
148             ResetConsumedMessageSize();
149             return Task.CompletedTask;
150         }
151 
GetBuffer()152         public byte[] GetBuffer()
153         {
154             var retval = new byte[Length];
155             Buffer.BlockCopy(Bytes, 0, retval, 0, Length);
156             return retval;
157         }
158 
TryGetBuffer(out ArraySegment<byte> bufSegment)159         internal bool TryGetBuffer(out ArraySegment<byte> bufSegment)
160         {
161             bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed);
162             return true;
163         }
164 
165         // IDisposable
Dispose(bool disposing)166         protected override void Dispose(bool disposing)
167         {
168             if (!IsDisposed)
169             {
170                 if (disposing)
171                 {
172                     // nothing to do
173                 }
174             }
175             IsDisposed = true;
176         }
177     }
178 }
179