1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Diagnostics; 20 using System.IO; 21 using System.Threading; 22 using System.Threading.Tasks; 23 24 25 namespace Thrift.Transport.Client 26 { 27 // ReSharper disable once InconsistentNaming 28 public class TMemoryBufferTransport : TEndpointTransport 29 { 30 private bool IsDisposed; 31 private byte[] Bytes; 32 private int _bytesUsed; 33 TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048)34 public TMemoryBufferTransport(TConfiguration config, int initialCapacity = 2048) 35 : base(config) 36 { 37 Bytes = new byte[initialCapacity]; 38 } 39 TMemoryBufferTransport(byte[] buf, TConfiguration config)40 public TMemoryBufferTransport(byte[] buf, TConfiguration config) 41 :base(config) 42 { 43 Bytes = (byte[])buf.Clone(); 44 _bytesUsed = Bytes.Length; 45 UpdateKnownMessageSize(_bytesUsed); 46 } 47 48 public int Position { get; set; } 49 50 public int Capacity 51 { 52 get 53 { 54 Debug.Assert(_bytesUsed <= Bytes.Length); 55 return Bytes.Length; 56 } 57 set 58 { 59 Array.Resize(ref Bytes, value); 60 _bytesUsed = value; 61 } 62 } 63 64 public int Length 65 { 66 get { 67 Debug.Assert(_bytesUsed <= Bytes.Length); 68 return _bytesUsed; 69 } 70 set { 71 if ((Bytes.Length < value) || (Bytes.Length > (10 * value))) 72 Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25))); 73 _bytesUsed = value; 74 } 75 } 76 SetLength(int value)77 public void SetLength(int value) 78 { 79 Length = value; 80 Position = Math.Min(Position, value); 81 } 82 83 public override bool IsOpen => true; 84 OpenAsync(CancellationToken cancellationToken)85 public override Task OpenAsync(CancellationToken cancellationToken) 86 { 87 cancellationToken.ThrowIfCancellationRequested(); 88 return Task.CompletedTask; 89 } 90 Close()91 public override void Close() 92 { 93 /* do nothing */ 94 } 95 Seek(int delta, SeekOrigin origin)96 public void Seek(int delta, SeekOrigin origin) 97 { 98 int newPos; 99 switch (origin) 100 { 101 case SeekOrigin.Begin: 102 newPos = delta; 103 break; 104 case SeekOrigin.Current: 105 newPos = Position + delta; 106 break; 107 case SeekOrigin.End: 108 newPos = _bytesUsed + delta; 109 break; 110 default: 111 throw new ArgumentException("Unrecognized value",nameof(origin)); 112 } 113 114 if ((0 > newPos) || (newPos > _bytesUsed)) 115 throw new ArgumentException("Cannot seek outside of the valid range",nameof(origin)); 116 Position = newPos; 117 118 ResetConsumedMessageSize(); 119 CountConsumedMessageBytes(Position); 120 } 121 ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)122 public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) 123 { 124 var count = Math.Min(Length - Position, length); 125 Buffer.BlockCopy(Bytes, Position, buffer, offset, count); 126 Position += count; 127 CountConsumedMessageBytes(count); 128 return new ValueTask<int>(count); 129 } 130 WriteAsync(byte[] buffer, CancellationToken cancellationToken)131 public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) 132 { 133 return WriteAsync(buffer, 0, buffer.Length, cancellationToken); 134 } 135 WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)136 public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) 137 { 138 var free = Length - Position; 139 Length = Length + count - free; 140 Buffer.BlockCopy(buffer, offset, Bytes, Position, count); 141 Position += count; 142 return Task.CompletedTask; 143 } 144 FlushAsync(CancellationToken cancellationToken)145 public override Task FlushAsync(CancellationToken cancellationToken) 146 { 147 cancellationToken.ThrowIfCancellationRequested(); 148 ResetConsumedMessageSize(); 149 return Task.CompletedTask; 150 } 151 GetBuffer()152 public byte[] GetBuffer() 153 { 154 var retval = new byte[Length]; 155 Buffer.BlockCopy(Bytes, 0, retval, 0, Length); 156 return retval; 157 } 158 TryGetBuffer(out ArraySegment<byte> bufSegment)159 internal bool TryGetBuffer(out ArraySegment<byte> bufSegment) 160 { 161 bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed); 162 return true; 163 } 164 165 // IDisposable Dispose(bool disposing)166 protected override void Dispose(bool disposing) 167 { 168 if (!IsDisposed) 169 { 170 if (disposing) 171 { 172 // nothing to do 173 } 174 } 175 IsDisposed = true; 176 } 177 } 178 } 179