1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Buffers;
20 using System.Buffers.Binary;
21 using System.Collections.Generic;
22 using System.Diagnostics;
23 using System.Text;
24 using System.Threading;
25 using System.Threading.Tasks;
26 using Thrift.Protocol.Entities;
27 using Thrift.Protocol.Utilities;
28 using Thrift.Transport;
29 
30 
31 namespace Thrift.Protocol
32 {
33 
34     // ReSharper disable once InconsistentNaming
35     public class TCompactProtocol : TProtocol
36     {
37         private const byte ProtocolId = 0x82;
38         private const byte Version = 1;
39         private const byte VersionMask = 0x1f; // 0001 1111
40         private const byte TypeMask = 0xE0; // 1110 0000
41         private const byte TypeBits = 0x07; // 0000 0111
42         private const int TypeShiftAmount = 5;
43 
44         private const byte NoTypeOverride = 0xFF;
45 
46         // ReSharper disable once InconsistentNaming
47         private static readonly byte[] TTypeToCompactType = new byte[17];
48         private static readonly TType[] CompactTypeToTType = new TType[14];
49 
50         /// <summary>
51         ///     Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
52         /// </summary>
53         private readonly Stack<short> _lastField = new Stack<short>(15);
54 
55         /// <summary>
56         ///     If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
57         /// </summary>
58         private TField? _booleanField;
59 
60         /// <summary>
61         ///     If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
62         /// </summary>
63         private bool? _boolValue;
64 
65         private short _lastFieldId;
66 
67         // minimize memory allocations by means of an preallocated bytes buffer
68         // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
69         private readonly byte[] PreAllocatedBuffer = new byte[128];
70 
71         private struct VarInt
72         {
73             public byte[] bytes;
74             public int count;
75         }
76 
77         // minimize memory allocations by means of an preallocated VarInt buffer
78         private VarInt PreAllocatedVarInt = new VarInt()
79         {
80             bytes = new byte[10], // see Int64ToVarInt()
81             count = 0
82         };
83 
84 
85 
86 
TCompactProtocol(TTransport trans)87         public TCompactProtocol(TTransport trans)
88             : base(trans)
89         {
90             TTypeToCompactType[(int)TType.Stop] = Types.Stop;
91             TTypeToCompactType[(int)TType.Bool] = Types.BooleanTrue;
92             TTypeToCompactType[(int)TType.Byte] = Types.Byte;
93             TTypeToCompactType[(int)TType.I16] = Types.I16;
94             TTypeToCompactType[(int)TType.I32] = Types.I32;
95             TTypeToCompactType[(int)TType.I64] = Types.I64;
96             TTypeToCompactType[(int)TType.Double] = Types.Double;
97             TTypeToCompactType[(int)TType.String] = Types.Binary;
98             TTypeToCompactType[(int)TType.List] = Types.List;
99             TTypeToCompactType[(int)TType.Set] = Types.Set;
100             TTypeToCompactType[(int)TType.Map] = Types.Map;
101             TTypeToCompactType[(int)TType.Struct] = Types.Struct;
102             TTypeToCompactType[(int)TType.Uuid] = Types.Uuid;
103 
104             CompactTypeToTType[Types.Stop] = TType.Stop;
105             CompactTypeToTType[Types.BooleanTrue] = TType.Bool;
106             CompactTypeToTType[Types.BooleanFalse] = TType.Bool;
107             CompactTypeToTType[Types.Byte] = TType.Byte;
108             CompactTypeToTType[Types.I16] = TType.I16;
109             CompactTypeToTType[Types.I32] = TType.I32;
110             CompactTypeToTType[Types.I64] = TType.I64;
111             CompactTypeToTType[Types.Double] = TType.Double;
112             CompactTypeToTType[Types.Binary] = TType.String;
113             CompactTypeToTType[Types.List] = TType.List;
114             CompactTypeToTType[Types.Set] = TType.Set;
115             CompactTypeToTType[Types.Map] = TType.Map;
116             CompactTypeToTType[Types.Struct] = TType.Struct;
117             CompactTypeToTType[Types.Uuid] = TType.Uuid;
118         }
119 
Reset()120         public void Reset()
121         {
122             _lastField.Clear();
123             _lastFieldId = 0;
124         }
125 
WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)126         public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
127         {
128             PreAllocatedBuffer[0] = ProtocolId;
129             PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask));
130             await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
131 
132             Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt);
133             await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
134 
135             await WriteStringAsync(message.Name, cancellationToken);
136         }
137 
WriteMessageEndAsync(CancellationToken cancellationToken)138         public override Task WriteMessageEndAsync(CancellationToken cancellationToken)
139         {
140             cancellationToken.ThrowIfCancellationRequested();
141             return Task.CompletedTask;
142         }
143 
144         /// <summary>
145         ///     Write a struct begin. This doesn't actually put anything on the wire. We
146         ///     use it as an opportunity to put special placeholder markers on the field
147         ///     stack so we can get the field id deltas correct.
148         /// </summary>
149         public override Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
150         {
151             cancellationToken.ThrowIfCancellationRequested();
152 
153             _lastField.Push(_lastFieldId);
154             _lastFieldId = 0;
155 
156             return Task.CompletedTask;
157         }
158 
WriteStructEndAsync(CancellationToken cancellationToken)159         public override Task WriteStructEndAsync(CancellationToken cancellationToken)
160         {
161             cancellationToken.ThrowIfCancellationRequested();
162 
163             _lastFieldId = _lastField.Pop();
164 
165             return Task.CompletedTask;
166         }
167 
WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)168         private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)
169         {
170             // if there's a exType override passed in, use that. Otherwise ask GetCompactType().
171             if (fieldType == NoTypeOverride)
172                 fieldType = GetCompactType(field.Type);
173 
174 
175             // check if we can use delta encoding for the field id
176             if (field.ID > _lastFieldId)
177             {
178                 var delta = field.ID - _lastFieldId;
179                 if (delta <= 15)
180                 {
181                     // Write them together
182                     PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType);
183                     await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
184                     _lastFieldId = field.ID;
185                     return;
186                 }
187             }
188 
189             // Write them separate
190             PreAllocatedBuffer[0] = fieldType;
191             await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
192             await WriteI16Async(field.ID, cancellationToken);
193             _lastFieldId = field.ID;
194         }
195 
WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)196         public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
197         {
198             if (field.Type == TType.Bool)
199             {
200                 _booleanField = field;
201             }
202             else
203             {
204                 await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken);
205             }
206         }
207 
WriteFieldEndAsync(CancellationToken cancellationToken)208         public override Task WriteFieldEndAsync(CancellationToken cancellationToken)
209         {
210             cancellationToken.ThrowIfCancellationRequested();
211             return Task.CompletedTask;
212         }
213 
WriteFieldStopAsync(CancellationToken cancellationToken)214         public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
215         {
216             cancellationToken.ThrowIfCancellationRequested();
217 
218             PreAllocatedBuffer[0] = Types.Stop;
219             await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
220         }
221 
WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)222         protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
223         {
224             cancellationToken.ThrowIfCancellationRequested();
225 
226             /*
227             Abstract method for writing the start of lists and sets. List and sets on
228              the wire differ only by the exType indicator.
229             */
230 
231             if (size <= 14)
232             {
233                 PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType));
234                 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
235             }
236             else
237             {
238                 PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType));
239                 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
240 
241                 Int32ToVarInt((uint) size, ref PreAllocatedVarInt);
242                 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
243             }
244         }
245 
WriteListBeginAsync(TList list, CancellationToken cancellationToken)246         public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
247         {
248             await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
249         }
250 
WriteListEndAsync(CancellationToken cancellationToken)251         public override Task WriteListEndAsync(CancellationToken cancellationToken)
252         {
253             cancellationToken.ThrowIfCancellationRequested();
254             return Task.CompletedTask;
255         }
256 
WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)257         public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
258         {
259             cancellationToken.ThrowIfCancellationRequested();
260 
261             await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
262         }
263 
WriteSetEndAsync(CancellationToken cancellationToken)264         public override Task WriteSetEndAsync(CancellationToken cancellationToken)
265         {
266             cancellationToken.ThrowIfCancellationRequested();
267             return Task.CompletedTask;
268         }
269 
WriteBoolAsync(bool b, CancellationToken cancellationToken)270         public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
271         {
272             cancellationToken.ThrowIfCancellationRequested();
273 
274             /*
275             Write a boolean value. Potentially, this could be a boolean field, in
276             which case the field header info isn't written yet. If so, decide what the
277             right exType header is for the value and then Write the field header.
278             Otherwise, Write a single byte.
279             */
280 
281             if (_booleanField != null)
282             {
283                 // we haven't written the field header yet
284                 var type = b ? Types.BooleanTrue : Types.BooleanFalse;
285                 await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken);
286                 _booleanField = null;
287             }
288             else
289             {
290                 // we're not part of a field, so just write the value.
291                 PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse;
292                 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
293             }
294         }
295 
WriteByteAsync(sbyte b, CancellationToken cancellationToken)296         public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
297         {
298             cancellationToken.ThrowIfCancellationRequested();
299 
300             PreAllocatedBuffer[0] = (byte)b;
301             await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
302         }
303 
WriteI16Async(short i16, CancellationToken cancellationToken)304         public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
305         {
306             cancellationToken.ThrowIfCancellationRequested();
307 
308             Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt);
309             await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
310         }
311 
Int32ToVarInt(uint n, ref VarInt varint)312         private static void Int32ToVarInt(uint n, ref VarInt varint)
313         {
314             // Write an i32 as a varint. Results in 1 - 5 bytes on the wire.
315             varint.count = 0;
316             Debug.Assert(varint.bytes.Length >= 5);
317 
318             while (true)
319             {
320                 if ((n & ~0x7F) == 0)
321                 {
322                     varint.bytes[varint.count++] = (byte)n;
323                     break;
324                 }
325 
326                 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
327                 n >>= 7;
328             }
329         }
330 
WriteI32Async(int i32, CancellationToken cancellationToken)331         public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
332         {
333             cancellationToken.ThrowIfCancellationRequested();
334 
335             Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt);
336             await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
337         }
338 
Int64ToVarInt(ulong n, ref VarInt varint)339         static private void Int64ToVarInt(ulong n, ref VarInt varint)
340         {
341             // Write an i64 as a varint. Results in 1-10 bytes on the wire.
342             varint.count = 0;
343             Debug.Assert(varint.bytes.Length >= 10);
344 
345             while (true)
346             {
347                 if ((n & ~(ulong)0x7FL) == 0)
348                 {
349                     varint.bytes[varint.count++] = (byte)n;
350                     break;
351                 }
352                 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
353                 n >>= 7;
354             }
355         }
356 
WriteI64Async(long i64, CancellationToken cancellationToken)357         public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
358         {
359             cancellationToken.ThrowIfCancellationRequested();
360 
361             Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt);
362             await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
363         }
364 
WriteDoubleAsync(double d, CancellationToken cancellationToken)365         public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
366         {
367             cancellationToken.ThrowIfCancellationRequested();
368 
369             BinaryPrimitives.WriteInt64LittleEndian(PreAllocatedBuffer, BitConverter.DoubleToInt64Bits(d));
370             await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
371         }
372 
WriteStringAsync(string str, CancellationToken cancellationToken)373         public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
374         {
375             cancellationToken.ThrowIfCancellationRequested();
376 
377             var buf = ArrayPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(str));
378             try
379             {
380                 var numberOfBytes = Encoding.UTF8.GetBytes(str, 0, str.Length, buf, 0);
381 
382                 Int32ToVarInt((uint)numberOfBytes, ref PreAllocatedVarInt);
383                 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
384                 await Trans.WriteAsync(buf, 0, numberOfBytes, cancellationToken);
385             }
386             finally
387             {
388                 ArrayPool<byte>.Shared.Return(buf);
389             }
390         }
391 
WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)392         public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
393         {
394             cancellationToken.ThrowIfCancellationRequested();
395 
396             Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
397             await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
398             await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
399         }
400 
WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)401         public override async Task WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)
402         {
403             cancellationToken.ThrowIfCancellationRequested();
404 
405             var bytes = uuid.SwapByteOrder().ToByteArray();
406             await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
407         }
408 
WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)409         public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
410         {
411             cancellationToken.ThrowIfCancellationRequested();
412 
413             if (map.Count == 0)
414             {
415                 PreAllocatedBuffer[0] = 0;
416                 await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken);
417             }
418             else
419             {
420                 Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt);
421                 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
422 
423                 PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType));
424                 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
425             }
426         }
427 
WriteMapEndAsync(CancellationToken cancellationToken)428         public override Task WriteMapEndAsync(CancellationToken cancellationToken)
429         {
430             cancellationToken.ThrowIfCancellationRequested();
431             return Task.CompletedTask;
432         }
433 
ReadMessageBeginAsync(CancellationToken cancellationToken)434         public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
435         {
436             cancellationToken.ThrowIfCancellationRequested();
437 
438             var protocolId = (byte) await ReadByteAsync(cancellationToken);
439             if (protocolId != ProtocolId)
440             {
441                 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
442             }
443 
444             var versionAndType = (byte) await ReadByteAsync(cancellationToken);
445             var version = (byte) (versionAndType & VersionMask);
446 
447             if (version != Version)
448             {
449                 throw new TProtocolException($"Expected version {Version} but got {version}");
450             }
451 
452             var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
453             var seqid = (int) await ReadVarInt32Async(cancellationToken);
454             var messageName = await ReadStringAsync(cancellationToken);
455 
456             return new TMessage(messageName, (TMessageType) type, seqid);
457         }
458 
ReadMessageEndAsync(CancellationToken cancellationToken)459         public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
460         {
461             cancellationToken.ThrowIfCancellationRequested();
462             Transport.ResetConsumedMessageSize();
463             return Task.CompletedTask;
464         }
465 
ReadStructBeginAsync(CancellationToken cancellationToken)466         public override ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
467         {
468             cancellationToken.ThrowIfCancellationRequested();
469 
470             _lastField.Push(_lastFieldId);
471             _lastFieldId = 0;
472 
473             return new ValueTask<TStruct>(AnonymousStruct);
474         }
475 
ReadStructEndAsync(CancellationToken cancellationToken)476         public override Task ReadStructEndAsync(CancellationToken cancellationToken)
477         {
478             cancellationToken.ThrowIfCancellationRequested();
479 
480             /*
481             Doesn't actually consume any wire data, just removes the last field for
482             this struct from the field stack.
483             */
484 
485             // consume the last field we Read off the wire.
486             _lastFieldId = _lastField.Pop();
487 
488             return Task.CompletedTask;
489         }
490 
ReadFieldBeginAsync(CancellationToken cancellationToken)491         public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
492         {
493             // Read a field header off the wire.
494             var type = (byte) await ReadByteAsync(cancellationToken);
495 
496             // if it's a stop, then we can return immediately, as the struct is over.
497             if (type == Types.Stop)
498             {
499                 return StopField;
500             }
501 
502 
503             // mask off the 4 MSB of the exType header. it could contain a field id delta.
504             var modifier = (short) ((type & 0xf0) >> 4);
505             var compactType = (byte)(type & 0x0f);
506 
507             short fieldId;
508             if (modifier == 0)
509             {
510                 fieldId = await ReadI16Async(cancellationToken);
511             }
512             else
513             {
514                 fieldId = (short) (_lastFieldId + modifier);
515             }
516 
517             var ttype = GetTType(compactType);
518             var field = new TField(string.Empty, ttype, fieldId);
519 
520             // if this happens to be a boolean field, the value is encoded in the exType
521             if( ttype == TType.Bool)
522             {
523                 _boolValue = (compactType == Types.BooleanTrue);
524             }
525 
526             // push the new field onto the field stack so we can keep the deltas going.
527             _lastFieldId = field.ID;
528             return field;
529         }
530 
ReadFieldEndAsync(CancellationToken cancellationToken)531         public override Task ReadFieldEndAsync(CancellationToken cancellationToken)
532         {
533             cancellationToken.ThrowIfCancellationRequested();
534             return Task.CompletedTask;
535         }
536 
ReadMapBeginAsync(CancellationToken cancellationToken)537         public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
538         {
539             cancellationToken.ThrowIfCancellationRequested();
540 
541             /*
542             Read a map header off the wire. If the size is zero, skip Reading the key
543             and value exType. This means that 0-length maps will yield TMaps without the
544             "correct" types.
545             */
546 
547             var size = (int) await ReadVarInt32Async(cancellationToken);
548             var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
549             var map = new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
550             CheckReadBytesAvailable(map);
551             return map;
552         }
553 
ReadMapEndAsync(CancellationToken cancellationToken)554         public override Task ReadMapEndAsync(CancellationToken cancellationToken)
555         {
556             cancellationToken.ThrowIfCancellationRequested();
557             return Task.CompletedTask;
558         }
559 
ReadSetBeginAsync(CancellationToken cancellationToken)560         public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
561         {
562             /*
563             Read a set header off the wire. If the set size is 0-14, the size will
564             be packed into the element exType header. If it's a longer set, the 4 MSB
565             of the element exType header will be 0xF, and a varint will follow with the
566             true size.
567             */
568 
569             return new TSet(await ReadListBeginAsync(cancellationToken));
570         }
571 
ReadBoolAsync(CancellationToken cancellationToken)572         public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
573         {
574             /*
575             Read a boolean off the wire. If this is a boolean field, the value should
576             already have been Read during ReadFieldBegin, so we'll just consume the
577             pre-stored value. Otherwise, Read a byte.
578             */
579 
580             if (_boolValue != null)
581             {
582                 var result = _boolValue.Value;
583                 _boolValue = null;
584                 return new ValueTask<bool>(result);
585             }
586 
587             return InternalCall();
588 
589             async ValueTask<bool> InternalCall()
590             {
591                 var data = await ReadByteAsync(cancellationToken);
592                 return (data == Types.BooleanTrue);
593             }
594         }
595 
596 
ReadByteAsync(CancellationToken cancellationToken)597         public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
598         {
599             // Read a single byte off the wire. Nothing interesting here.
600             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
601             return (sbyte)PreAllocatedBuffer[0];
602         }
603 
ReadI16Async(CancellationToken cancellationToken)604         public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
605         {
606             cancellationToken.ThrowIfCancellationRequested();
607 
608             return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
609         }
610 
ReadI32Async(CancellationToken cancellationToken)611         public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
612         {
613             cancellationToken.ThrowIfCancellationRequested();
614 
615             return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
616         }
617 
ReadI64Async(CancellationToken cancellationToken)618         public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
619         {
620             cancellationToken.ThrowIfCancellationRequested();
621 
622             return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
623         }
624 
ReadDoubleAsync(CancellationToken cancellationToken)625         public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
626         {
627             cancellationToken.ThrowIfCancellationRequested();
628 
629             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
630 
631             return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(PreAllocatedBuffer));
632         }
633 
ReadStringAsync(CancellationToken cancellationToken)634         public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
635         {
636             // read length
637             var length = (int) await ReadVarInt32Async(cancellationToken);
638             if (length == 0)
639             {
640                 return string.Empty;
641             }
642 
643             // read and decode data
644             if (length < PreAllocatedBuffer.Length)
645             {
646                 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken);
647                 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length);
648             }
649 
650             Transport.CheckReadBytesAvailable(length);
651 
652             var buf = ArrayPool<byte>.Shared.Rent(length);
653             try
654             {
655                 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
656                 return Encoding.UTF8.GetString(buf, 0, length);
657             }
658             finally
659             {
660                 ArrayPool<byte>.Shared.Return(buf);
661             }
662         }
663 
ReadBinaryAsync(CancellationToken cancellationToken)664         public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
665         {
666             // read length
667             var length = (int) await ReadVarInt32Async(cancellationToken);
668             if (length == 0)
669             {
670                 return Array.Empty<byte>();
671             }
672 
673             // read data
674             Transport.CheckReadBytesAvailable(length);
675             var buf = new byte[length];
676             await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
677             return buf;
678         }
679 
ReadUuidAsync(CancellationToken cancellationToken)680         public override async ValueTask<Guid> ReadUuidAsync(CancellationToken cancellationToken)
681         {
682             cancellationToken.ThrowIfCancellationRequested();
683 
684             Transport.CheckReadBytesAvailable(16);  // = sizeof(uuid)
685             var buf = new byte[16];
686             await Trans.ReadAllAsync(buf, 0, 16, cancellationToken);
687             return new Guid(buf).SwapByteOrder();
688         }
689 
ReadListBeginAsync(CancellationToken cancellationToken)690         public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
691         {
692             cancellationToken.ThrowIfCancellationRequested();
693 
694             /*
695             Read a list header off the wire. If the list size is 0-14, the size will
696             be packed into the element exType header. If it's a longer list, the 4 MSB
697             of the element exType header will be 0xF, and a varint will follow with the
698             true size.
699             */
700 
701             var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
702             var size = (sizeAndType >> 4) & 0x0f;
703             if (size == 15)
704             {
705                 size = (int) await ReadVarInt32Async(cancellationToken);
706             }
707 
708             var type = GetTType(sizeAndType);
709             var list = new TList(type, size);
710             CheckReadBytesAvailable(list);
711             return list;
712         }
713 
ReadListEndAsync(CancellationToken cancellationToken)714         public override Task ReadListEndAsync(CancellationToken cancellationToken)
715         {
716             cancellationToken.ThrowIfCancellationRequested();
717             return Task.CompletedTask;
718         }
719 
ReadSetEndAsync(CancellationToken cancellationToken)720         public override Task ReadSetEndAsync(CancellationToken cancellationToken)
721         {
722             cancellationToken.ThrowIfCancellationRequested();
723             return Task.CompletedTask;
724         }
725 
GetCompactType(TType ttype)726         private static byte GetCompactType(TType ttype)
727         {
728             // Given a TType value, find the appropriate TCompactProtocol.Types constant.
729             return TTypeToCompactType[(int) ttype];
730         }
731 
732 
ReadVarInt32Async(CancellationToken cancellationToken)733         private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken)
734         {
735             cancellationToken.ThrowIfCancellationRequested();
736 
737             /*
738             Read an i32 from the wire as a varint. The MSB of each byte is set
739             if there is another byte to follow. This can Read up to 5 bytes.
740             */
741 
742             uint result = 0;
743             var shift = 0;
744 
745             while (true)
746             {
747                 var b = (byte) await ReadByteAsync(cancellationToken);
748                 result |= (uint) (b & 0x7f) << shift;
749                 if ((b & 0x80) != 0x80)
750                 {
751                     break;
752                 }
753                 shift += 7;
754             }
755 
756             return result;
757         }
758 
ReadVarInt64Async(CancellationToken cancellationToken)759         private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
760         {
761             cancellationToken.ThrowIfCancellationRequested();
762 
763             /*
764             Read an i64 from the wire as a proper varint. The MSB of each byte is set
765             if there is another byte to follow. This can Read up to 10 bytes.
766             */
767 
768             var shift = 0;
769             ulong result = 0;
770             while (true)
771             {
772                 var b = (byte) await ReadByteAsync(cancellationToken);
773                 result |= (ulong) (b & 0x7f) << shift;
774                 if ((b & 0x80) != 0x80)
775                 {
776                     break;
777                 }
778                 shift += 7;
779             }
780 
781             return result;
782         }
783 
ZigzagToInt(uint n)784         private static int ZigzagToInt(uint n)
785         {
786             return (int) (n >> 1) ^ -(int) (n & 1);
787         }
788 
ZigzagToLong(ulong n)789         private static long ZigzagToLong(ulong n)
790         {
791             return (long) (n >> 1) ^ -(long) (n & 1);
792         }
793 
GetTType(byte type)794         private static TType GetTType(byte type)
795         {
796             // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
797             return CompactTypeToTType[type & 0x0f];
798         }
799 
LongToZigzag(long n)800         private static ulong LongToZigzag(long n)
801         {
802             // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
803             return (ulong) (n << 1) ^ (ulong) (n >> 63);
804         }
805 
IntToZigzag(int n)806         private static uint IntToZigzag(int n)
807         {
808             // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
809             return (uint) (n << 1) ^ (uint) (n >> 31);
810         }
811 
812         // Return the minimum number of bytes a type will consume on the wire
GetMinSerializedSize(TType type)813         public override int GetMinSerializedSize(TType type)
814         {
815             switch (type)
816             {
817                 case TType.Stop: return 0;
818                 case TType.Void: return 0;
819                 case TType.Bool: return sizeof(byte);
820                 case TType.Double: return 8;  // uses fixedLongToBytes() which always writes 8 bytes
821                 case TType.Byte: return sizeof(byte);
822                 case TType.I16: return sizeof(byte);  // zigzag
823                 case TType.I32: return sizeof(byte);  // zigzag
824                 case TType.I64: return sizeof(byte);  // zigzag
825                 case TType.String: return sizeof(byte);  // string length
826                 case TType.Struct: return 0;             // empty struct
827                 case TType.Map: return sizeof(byte);  // element count
828                 case TType.Set: return sizeof(byte);  // element count
829                 case TType.List: return sizeof(byte);  // element count
830                 case TType.Uuid: return 16;  // uuid bytes
831                 default: throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "unrecognized type code");
832             }
833         }
834 
835         public class Factory : TProtocolFactory
836         {
GetProtocol(TTransport trans)837             public override TProtocol GetProtocol(TTransport trans)
838             {
839                 return new TCompactProtocol(trans);
840             }
841         }
842 
843         /// <summary>
844         ///     All of the on-wire exType codes.
845         /// </summary>
846         private static class Types
847         {
848             public const byte Stop = 0x00;
849             public const byte BooleanTrue = 0x01;
850             public const byte BooleanFalse = 0x02;
851             public const byte Byte = 0x03;
852             public const byte I16 = 0x04;
853             public const byte I32 = 0x05;
854             public const byte I64 = 0x06;
855             public const byte Double = 0x07;
856             public const byte Binary = 0x08;
857             public const byte List = 0x09;
858             public const byte Set = 0x0A;
859             public const byte Map = 0x0B;
860             public const byte Struct = 0x0C;
861             public const byte Uuid = 0x0D;
862         }
863     }
864 }
865