1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Buffers; 20 using System.Buffers.Binary; 21 using System.Collections.Generic; 22 using System.Diagnostics; 23 using System.Text; 24 using System.Threading; 25 using System.Threading.Tasks; 26 using Thrift.Protocol.Entities; 27 using Thrift.Protocol.Utilities; 28 using Thrift.Transport; 29 30 31 namespace Thrift.Protocol 32 { 33 34 // ReSharper disable once InconsistentNaming 35 public class TCompactProtocol : TProtocol 36 { 37 private const byte ProtocolId = 0x82; 38 private const byte Version = 1; 39 private const byte VersionMask = 0x1f; // 0001 1111 40 private const byte TypeMask = 0xE0; // 1110 0000 41 private const byte TypeBits = 0x07; // 0000 0111 42 private const int TypeShiftAmount = 5; 43 44 private const byte NoTypeOverride = 0xFF; 45 46 // ReSharper disable once InconsistentNaming 47 private static readonly byte[] TTypeToCompactType = new byte[17]; 48 private static readonly TType[] CompactTypeToTType = new TType[14]; 49 50 /// <summary> 51 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff. 52 /// </summary> 53 private readonly Stack<short> _lastField = new Stack<short>(15); 54 55 /// <summary> 56 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated. 57 /// </summary> 58 private TField? _booleanField; 59 60 /// <summary> 61 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it. 62 /// </summary> 63 private bool? _boolValue; 64 65 private short _lastFieldId; 66 67 // minimize memory allocations by means of an preallocated bytes buffer 68 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) 69 private readonly byte[] PreAllocatedBuffer = new byte[128]; 70 71 private struct VarInt 72 { 73 public byte[] bytes; 74 public int count; 75 } 76 77 // minimize memory allocations by means of an preallocated VarInt buffer 78 private VarInt PreAllocatedVarInt = new VarInt() 79 { 80 bytes = new byte[10], // see Int64ToVarInt() 81 count = 0 82 }; 83 84 85 86 TCompactProtocol(TTransport trans)87 public TCompactProtocol(TTransport trans) 88 : base(trans) 89 { 90 TTypeToCompactType[(int)TType.Stop] = Types.Stop; 91 TTypeToCompactType[(int)TType.Bool] = Types.BooleanTrue; 92 TTypeToCompactType[(int)TType.Byte] = Types.Byte; 93 TTypeToCompactType[(int)TType.I16] = Types.I16; 94 TTypeToCompactType[(int)TType.I32] = Types.I32; 95 TTypeToCompactType[(int)TType.I64] = Types.I64; 96 TTypeToCompactType[(int)TType.Double] = Types.Double; 97 TTypeToCompactType[(int)TType.String] = Types.Binary; 98 TTypeToCompactType[(int)TType.List] = Types.List; 99 TTypeToCompactType[(int)TType.Set] = Types.Set; 100 TTypeToCompactType[(int)TType.Map] = Types.Map; 101 TTypeToCompactType[(int)TType.Struct] = Types.Struct; 102 TTypeToCompactType[(int)TType.Uuid] = Types.Uuid; 103 104 CompactTypeToTType[Types.Stop] = TType.Stop; 105 CompactTypeToTType[Types.BooleanTrue] = TType.Bool; 106 CompactTypeToTType[Types.BooleanFalse] = TType.Bool; 107 CompactTypeToTType[Types.Byte] = TType.Byte; 108 CompactTypeToTType[Types.I16] = TType.I16; 109 CompactTypeToTType[Types.I32] = TType.I32; 110 CompactTypeToTType[Types.I64] = TType.I64; 111 CompactTypeToTType[Types.Double] = TType.Double; 112 CompactTypeToTType[Types.Binary] = TType.String; 113 CompactTypeToTType[Types.List] = TType.List; 114 CompactTypeToTType[Types.Set] = TType.Set; 115 CompactTypeToTType[Types.Map] = TType.Map; 116 CompactTypeToTType[Types.Struct] = TType.Struct; 117 CompactTypeToTType[Types.Uuid] = TType.Uuid; 118 } 119 Reset()120 public void Reset() 121 { 122 _lastField.Clear(); 123 _lastFieldId = 0; 124 } 125 WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)126 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) 127 { 128 PreAllocatedBuffer[0] = ProtocolId; 129 PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask)); 130 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); 131 132 Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt); 133 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 134 135 await WriteStringAsync(message.Name, cancellationToken); 136 } 137 WriteMessageEndAsync(CancellationToken cancellationToken)138 public override Task WriteMessageEndAsync(CancellationToken cancellationToken) 139 { 140 cancellationToken.ThrowIfCancellationRequested(); 141 return Task.CompletedTask; 142 } 143 144 /// <summary> 145 /// Write a struct begin. This doesn't actually put anything on the wire. We 146 /// use it as an opportunity to put special placeholder markers on the field 147 /// stack so we can get the field id deltas correct. 148 /// </summary> 149 public override Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) 150 { 151 cancellationToken.ThrowIfCancellationRequested(); 152 153 _lastField.Push(_lastFieldId); 154 _lastFieldId = 0; 155 156 return Task.CompletedTask; 157 } 158 WriteStructEndAsync(CancellationToken cancellationToken)159 public override Task WriteStructEndAsync(CancellationToken cancellationToken) 160 { 161 cancellationToken.ThrowIfCancellationRequested(); 162 163 _lastFieldId = _lastField.Pop(); 164 165 return Task.CompletedTask; 166 } 167 WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)168 private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken) 169 { 170 // if there's a exType override passed in, use that. Otherwise ask GetCompactType(). 171 if (fieldType == NoTypeOverride) 172 fieldType = GetCompactType(field.Type); 173 174 175 // check if we can use delta encoding for the field id 176 if (field.ID > _lastFieldId) 177 { 178 var delta = field.ID - _lastFieldId; 179 if (delta <= 15) 180 { 181 // Write them together 182 PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType); 183 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 184 _lastFieldId = field.ID; 185 return; 186 } 187 } 188 189 // Write them separate 190 PreAllocatedBuffer[0] = fieldType; 191 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 192 await WriteI16Async(field.ID, cancellationToken); 193 _lastFieldId = field.ID; 194 } 195 WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)196 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) 197 { 198 if (field.Type == TType.Bool) 199 { 200 _booleanField = field; 201 } 202 else 203 { 204 await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken); 205 } 206 } 207 WriteFieldEndAsync(CancellationToken cancellationToken)208 public override Task WriteFieldEndAsync(CancellationToken cancellationToken) 209 { 210 cancellationToken.ThrowIfCancellationRequested(); 211 return Task.CompletedTask; 212 } 213 WriteFieldStopAsync(CancellationToken cancellationToken)214 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) 215 { 216 cancellationToken.ThrowIfCancellationRequested(); 217 218 PreAllocatedBuffer[0] = Types.Stop; 219 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 220 } 221 WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)222 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken) 223 { 224 cancellationToken.ThrowIfCancellationRequested(); 225 226 /* 227 Abstract method for writing the start of lists and sets. List and sets on 228 the wire differ only by the exType indicator. 229 */ 230 231 if (size <= 14) 232 { 233 PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType)); 234 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 235 } 236 else 237 { 238 PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType)); 239 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 240 241 Int32ToVarInt((uint) size, ref PreAllocatedVarInt); 242 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 243 } 244 } 245 WriteListBeginAsync(TList list, CancellationToken cancellationToken)246 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) 247 { 248 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken); 249 } 250 WriteListEndAsync(CancellationToken cancellationToken)251 public override Task WriteListEndAsync(CancellationToken cancellationToken) 252 { 253 cancellationToken.ThrowIfCancellationRequested(); 254 return Task.CompletedTask; 255 } 256 WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)257 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) 258 { 259 cancellationToken.ThrowIfCancellationRequested(); 260 261 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken); 262 } 263 WriteSetEndAsync(CancellationToken cancellationToken)264 public override Task WriteSetEndAsync(CancellationToken cancellationToken) 265 { 266 cancellationToken.ThrowIfCancellationRequested(); 267 return Task.CompletedTask; 268 } 269 WriteBoolAsync(bool b, CancellationToken cancellationToken)270 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) 271 { 272 cancellationToken.ThrowIfCancellationRequested(); 273 274 /* 275 Write a boolean value. Potentially, this could be a boolean field, in 276 which case the field header info isn't written yet. If so, decide what the 277 right exType header is for the value and then Write the field header. 278 Otherwise, Write a single byte. 279 */ 280 281 if (_booleanField != null) 282 { 283 // we haven't written the field header yet 284 var type = b ? Types.BooleanTrue : Types.BooleanFalse; 285 await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken); 286 _booleanField = null; 287 } 288 else 289 { 290 // we're not part of a field, so just write the value. 291 PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse; 292 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 293 } 294 } 295 WriteByteAsync(sbyte b, CancellationToken cancellationToken)296 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) 297 { 298 cancellationToken.ThrowIfCancellationRequested(); 299 300 PreAllocatedBuffer[0] = (byte)b; 301 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 302 } 303 WriteI16Async(short i16, CancellationToken cancellationToken)304 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) 305 { 306 cancellationToken.ThrowIfCancellationRequested(); 307 308 Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt); 309 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 310 } 311 Int32ToVarInt(uint n, ref VarInt varint)312 private static void Int32ToVarInt(uint n, ref VarInt varint) 313 { 314 // Write an i32 as a varint. Results in 1 - 5 bytes on the wire. 315 varint.count = 0; 316 Debug.Assert(varint.bytes.Length >= 5); 317 318 while (true) 319 { 320 if ((n & ~0x7F) == 0) 321 { 322 varint.bytes[varint.count++] = (byte)n; 323 break; 324 } 325 326 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80); 327 n >>= 7; 328 } 329 } 330 WriteI32Async(int i32, CancellationToken cancellationToken)331 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) 332 { 333 cancellationToken.ThrowIfCancellationRequested(); 334 335 Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt); 336 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 337 } 338 Int64ToVarInt(ulong n, ref VarInt varint)339 static private void Int64ToVarInt(ulong n, ref VarInt varint) 340 { 341 // Write an i64 as a varint. Results in 1-10 bytes on the wire. 342 varint.count = 0; 343 Debug.Assert(varint.bytes.Length >= 10); 344 345 while (true) 346 { 347 if ((n & ~(ulong)0x7FL) == 0) 348 { 349 varint.bytes[varint.count++] = (byte)n; 350 break; 351 } 352 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80); 353 n >>= 7; 354 } 355 } 356 WriteI64Async(long i64, CancellationToken cancellationToken)357 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) 358 { 359 cancellationToken.ThrowIfCancellationRequested(); 360 361 Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt); 362 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 363 } 364 WriteDoubleAsync(double d, CancellationToken cancellationToken)365 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) 366 { 367 cancellationToken.ThrowIfCancellationRequested(); 368 369 BinaryPrimitives.WriteInt64LittleEndian(PreAllocatedBuffer, BitConverter.DoubleToInt64Bits(d)); 370 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken); 371 } 372 WriteStringAsync(string str, CancellationToken cancellationToken)373 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken) 374 { 375 cancellationToken.ThrowIfCancellationRequested(); 376 377 var buf = ArrayPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(str)); 378 try 379 { 380 var numberOfBytes = Encoding.UTF8.GetBytes(str, 0, str.Length, buf, 0); 381 382 Int32ToVarInt((uint)numberOfBytes, ref PreAllocatedVarInt); 383 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 384 await Trans.WriteAsync(buf, 0, numberOfBytes, cancellationToken); 385 } 386 finally 387 { 388 ArrayPool<byte>.Shared.Return(buf); 389 } 390 } 391 WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)392 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) 393 { 394 cancellationToken.ThrowIfCancellationRequested(); 395 396 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt); 397 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 398 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); 399 } 400 WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)401 public override async Task WriteUuidAsync(Guid uuid, CancellationToken cancellationToken) 402 { 403 cancellationToken.ThrowIfCancellationRequested(); 404 405 var bytes = uuid.SwapByteOrder().ToByteArray(); 406 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); 407 } 408 WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)409 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) 410 { 411 cancellationToken.ThrowIfCancellationRequested(); 412 413 if (map.Count == 0) 414 { 415 PreAllocatedBuffer[0] = 0; 416 await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken); 417 } 418 else 419 { 420 Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt); 421 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); 422 423 PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType)); 424 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 425 } 426 } 427 WriteMapEndAsync(CancellationToken cancellationToken)428 public override Task WriteMapEndAsync(CancellationToken cancellationToken) 429 { 430 cancellationToken.ThrowIfCancellationRequested(); 431 return Task.CompletedTask; 432 } 433 ReadMessageBeginAsync(CancellationToken cancellationToken)434 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) 435 { 436 cancellationToken.ThrowIfCancellationRequested(); 437 438 var protocolId = (byte) await ReadByteAsync(cancellationToken); 439 if (protocolId != ProtocolId) 440 { 441 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}"); 442 } 443 444 var versionAndType = (byte) await ReadByteAsync(cancellationToken); 445 var version = (byte) (versionAndType & VersionMask); 446 447 if (version != Version) 448 { 449 throw new TProtocolException($"Expected version {Version} but got {version}"); 450 } 451 452 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits); 453 var seqid = (int) await ReadVarInt32Async(cancellationToken); 454 var messageName = await ReadStringAsync(cancellationToken); 455 456 return new TMessage(messageName, (TMessageType) type, seqid); 457 } 458 ReadMessageEndAsync(CancellationToken cancellationToken)459 public override Task ReadMessageEndAsync(CancellationToken cancellationToken) 460 { 461 cancellationToken.ThrowIfCancellationRequested(); 462 Transport.ResetConsumedMessageSize(); 463 return Task.CompletedTask; 464 } 465 ReadStructBeginAsync(CancellationToken cancellationToken)466 public override ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) 467 { 468 cancellationToken.ThrowIfCancellationRequested(); 469 470 _lastField.Push(_lastFieldId); 471 _lastFieldId = 0; 472 473 return new ValueTask<TStruct>(AnonymousStruct); 474 } 475 ReadStructEndAsync(CancellationToken cancellationToken)476 public override Task ReadStructEndAsync(CancellationToken cancellationToken) 477 { 478 cancellationToken.ThrowIfCancellationRequested(); 479 480 /* 481 Doesn't actually consume any wire data, just removes the last field for 482 this struct from the field stack. 483 */ 484 485 // consume the last field we Read off the wire. 486 _lastFieldId = _lastField.Pop(); 487 488 return Task.CompletedTask; 489 } 490 ReadFieldBeginAsync(CancellationToken cancellationToken)491 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) 492 { 493 // Read a field header off the wire. 494 var type = (byte) await ReadByteAsync(cancellationToken); 495 496 // if it's a stop, then we can return immediately, as the struct is over. 497 if (type == Types.Stop) 498 { 499 return StopField; 500 } 501 502 503 // mask off the 4 MSB of the exType header. it could contain a field id delta. 504 var modifier = (short) ((type & 0xf0) >> 4); 505 var compactType = (byte)(type & 0x0f); 506 507 short fieldId; 508 if (modifier == 0) 509 { 510 fieldId = await ReadI16Async(cancellationToken); 511 } 512 else 513 { 514 fieldId = (short) (_lastFieldId + modifier); 515 } 516 517 var ttype = GetTType(compactType); 518 var field = new TField(string.Empty, ttype, fieldId); 519 520 // if this happens to be a boolean field, the value is encoded in the exType 521 if( ttype == TType.Bool) 522 { 523 _boolValue = (compactType == Types.BooleanTrue); 524 } 525 526 // push the new field onto the field stack so we can keep the deltas going. 527 _lastFieldId = field.ID; 528 return field; 529 } 530 ReadFieldEndAsync(CancellationToken cancellationToken)531 public override Task ReadFieldEndAsync(CancellationToken cancellationToken) 532 { 533 cancellationToken.ThrowIfCancellationRequested(); 534 return Task.CompletedTask; 535 } 536 ReadMapBeginAsync(CancellationToken cancellationToken)537 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) 538 { 539 cancellationToken.ThrowIfCancellationRequested(); 540 541 /* 542 Read a map header off the wire. If the size is zero, skip Reading the key 543 and value exType. This means that 0-length maps will yield TMaps without the 544 "correct" types. 545 */ 546 547 var size = (int) await ReadVarInt32Async(cancellationToken); 548 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken); 549 var map = new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size); 550 CheckReadBytesAvailable(map); 551 return map; 552 } 553 ReadMapEndAsync(CancellationToken cancellationToken)554 public override Task ReadMapEndAsync(CancellationToken cancellationToken) 555 { 556 cancellationToken.ThrowIfCancellationRequested(); 557 return Task.CompletedTask; 558 } 559 ReadSetBeginAsync(CancellationToken cancellationToken)560 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) 561 { 562 /* 563 Read a set header off the wire. If the set size is 0-14, the size will 564 be packed into the element exType header. If it's a longer set, the 4 MSB 565 of the element exType header will be 0xF, and a varint will follow with the 566 true size. 567 */ 568 569 return new TSet(await ReadListBeginAsync(cancellationToken)); 570 } 571 ReadBoolAsync(CancellationToken cancellationToken)572 public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) 573 { 574 /* 575 Read a boolean off the wire. If this is a boolean field, the value should 576 already have been Read during ReadFieldBegin, so we'll just consume the 577 pre-stored value. Otherwise, Read a byte. 578 */ 579 580 if (_boolValue != null) 581 { 582 var result = _boolValue.Value; 583 _boolValue = null; 584 return new ValueTask<bool>(result); 585 } 586 587 return InternalCall(); 588 589 async ValueTask<bool> InternalCall() 590 { 591 var data = await ReadByteAsync(cancellationToken); 592 return (data == Types.BooleanTrue); 593 } 594 } 595 596 ReadByteAsync(CancellationToken cancellationToken)597 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) 598 { 599 // Read a single byte off the wire. Nothing interesting here. 600 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 601 return (sbyte)PreAllocatedBuffer[0]; 602 } 603 ReadI16Async(CancellationToken cancellationToken)604 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) 605 { 606 cancellationToken.ThrowIfCancellationRequested(); 607 608 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken)); 609 } 610 ReadI32Async(CancellationToken cancellationToken)611 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) 612 { 613 cancellationToken.ThrowIfCancellationRequested(); 614 615 return ZigzagToInt(await ReadVarInt32Async(cancellationToken)); 616 } 617 ReadI64Async(CancellationToken cancellationToken)618 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) 619 { 620 cancellationToken.ThrowIfCancellationRequested(); 621 622 return ZigzagToLong(await ReadVarInt64Async(cancellationToken)); 623 } 624 ReadDoubleAsync(CancellationToken cancellationToken)625 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) 626 { 627 cancellationToken.ThrowIfCancellationRequested(); 628 629 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken); 630 631 return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(PreAllocatedBuffer)); 632 } 633 ReadStringAsync(CancellationToken cancellationToken)634 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) 635 { 636 // read length 637 var length = (int) await ReadVarInt32Async(cancellationToken); 638 if (length == 0) 639 { 640 return string.Empty; 641 } 642 643 // read and decode data 644 if (length < PreAllocatedBuffer.Length) 645 { 646 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken); 647 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length); 648 } 649 650 Transport.CheckReadBytesAvailable(length); 651 652 var buf = ArrayPool<byte>.Shared.Rent(length); 653 try 654 { 655 await Trans.ReadAllAsync(buf, 0, length, cancellationToken); 656 return Encoding.UTF8.GetString(buf, 0, length); 657 } 658 finally 659 { 660 ArrayPool<byte>.Shared.Return(buf); 661 } 662 } 663 ReadBinaryAsync(CancellationToken cancellationToken)664 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) 665 { 666 // read length 667 var length = (int) await ReadVarInt32Async(cancellationToken); 668 if (length == 0) 669 { 670 return Array.Empty<byte>(); 671 } 672 673 // read data 674 Transport.CheckReadBytesAvailable(length); 675 var buf = new byte[length]; 676 await Trans.ReadAllAsync(buf, 0, length, cancellationToken); 677 return buf; 678 } 679 ReadUuidAsync(CancellationToken cancellationToken)680 public override async ValueTask<Guid> ReadUuidAsync(CancellationToken cancellationToken) 681 { 682 cancellationToken.ThrowIfCancellationRequested(); 683 684 Transport.CheckReadBytesAvailable(16); // = sizeof(uuid) 685 var buf = new byte[16]; 686 await Trans.ReadAllAsync(buf, 0, 16, cancellationToken); 687 return new Guid(buf).SwapByteOrder(); 688 } 689 ReadListBeginAsync(CancellationToken cancellationToken)690 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) 691 { 692 cancellationToken.ThrowIfCancellationRequested(); 693 694 /* 695 Read a list header off the wire. If the list size is 0-14, the size will 696 be packed into the element exType header. If it's a longer list, the 4 MSB 697 of the element exType header will be 0xF, and a varint will follow with the 698 true size. 699 */ 700 701 var sizeAndType = (byte) await ReadByteAsync(cancellationToken); 702 var size = (sizeAndType >> 4) & 0x0f; 703 if (size == 15) 704 { 705 size = (int) await ReadVarInt32Async(cancellationToken); 706 } 707 708 var type = GetTType(sizeAndType); 709 var list = new TList(type, size); 710 CheckReadBytesAvailable(list); 711 return list; 712 } 713 ReadListEndAsync(CancellationToken cancellationToken)714 public override Task ReadListEndAsync(CancellationToken cancellationToken) 715 { 716 cancellationToken.ThrowIfCancellationRequested(); 717 return Task.CompletedTask; 718 } 719 ReadSetEndAsync(CancellationToken cancellationToken)720 public override Task ReadSetEndAsync(CancellationToken cancellationToken) 721 { 722 cancellationToken.ThrowIfCancellationRequested(); 723 return Task.CompletedTask; 724 } 725 GetCompactType(TType ttype)726 private static byte GetCompactType(TType ttype) 727 { 728 // Given a TType value, find the appropriate TCompactProtocol.Types constant. 729 return TTypeToCompactType[(int) ttype]; 730 } 731 732 ReadVarInt32Async(CancellationToken cancellationToken)733 private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken) 734 { 735 cancellationToken.ThrowIfCancellationRequested(); 736 737 /* 738 Read an i32 from the wire as a varint. The MSB of each byte is set 739 if there is another byte to follow. This can Read up to 5 bytes. 740 */ 741 742 uint result = 0; 743 var shift = 0; 744 745 while (true) 746 { 747 var b = (byte) await ReadByteAsync(cancellationToken); 748 result |= (uint) (b & 0x7f) << shift; 749 if ((b & 0x80) != 0x80) 750 { 751 break; 752 } 753 shift += 7; 754 } 755 756 return result; 757 } 758 ReadVarInt64Async(CancellationToken cancellationToken)759 private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken) 760 { 761 cancellationToken.ThrowIfCancellationRequested(); 762 763 /* 764 Read an i64 from the wire as a proper varint. The MSB of each byte is set 765 if there is another byte to follow. This can Read up to 10 bytes. 766 */ 767 768 var shift = 0; 769 ulong result = 0; 770 while (true) 771 { 772 var b = (byte) await ReadByteAsync(cancellationToken); 773 result |= (ulong) (b & 0x7f) << shift; 774 if ((b & 0x80) != 0x80) 775 { 776 break; 777 } 778 shift += 7; 779 } 780 781 return result; 782 } 783 ZigzagToInt(uint n)784 private static int ZigzagToInt(uint n) 785 { 786 return (int) (n >> 1) ^ -(int) (n & 1); 787 } 788 ZigzagToLong(ulong n)789 private static long ZigzagToLong(ulong n) 790 { 791 return (long) (n >> 1) ^ -(long) (n & 1); 792 } 793 GetTType(byte type)794 private static TType GetTType(byte type) 795 { 796 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value. 797 return CompactTypeToTType[type & 0x0f]; 798 } 799 LongToZigzag(long n)800 private static ulong LongToZigzag(long n) 801 { 802 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint 803 return (ulong) (n << 1) ^ (ulong) (n >> 63); 804 } 805 IntToZigzag(int n)806 private static uint IntToZigzag(int n) 807 { 808 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint 809 return (uint) (n << 1) ^ (uint) (n >> 31); 810 } 811 812 // Return the minimum number of bytes a type will consume on the wire GetMinSerializedSize(TType type)813 public override int GetMinSerializedSize(TType type) 814 { 815 switch (type) 816 { 817 case TType.Stop: return 0; 818 case TType.Void: return 0; 819 case TType.Bool: return sizeof(byte); 820 case TType.Double: return 8; // uses fixedLongToBytes() which always writes 8 bytes 821 case TType.Byte: return sizeof(byte); 822 case TType.I16: return sizeof(byte); // zigzag 823 case TType.I32: return sizeof(byte); // zigzag 824 case TType.I64: return sizeof(byte); // zigzag 825 case TType.String: return sizeof(byte); // string length 826 case TType.Struct: return 0; // empty struct 827 case TType.Map: return sizeof(byte); // element count 828 case TType.Set: return sizeof(byte); // element count 829 case TType.List: return sizeof(byte); // element count 830 case TType.Uuid: return 16; // uuid bytes 831 default: throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "unrecognized type code"); 832 } 833 } 834 835 public class Factory : TProtocolFactory 836 { GetProtocol(TTransport trans)837 public override TProtocol GetProtocol(TTransport trans) 838 { 839 return new TCompactProtocol(trans); 840 } 841 } 842 843 /// <summary> 844 /// All of the on-wire exType codes. 845 /// </summary> 846 private static class Types 847 { 848 public const byte Stop = 0x00; 849 public const byte BooleanTrue = 0x01; 850 public const byte BooleanFalse = 0x02; 851 public const byte Byte = 0x03; 852 public const byte I16 = 0x04; 853 public const byte I32 = 0x05; 854 public const byte I64 = 0x06; 855 public const byte Double = 0x07; 856 public const byte Binary = 0x08; 857 public const byte List = 0x09; 858 public const byte Set = 0x0A; 859 public const byte Map = 0x0B; 860 public const byte Struct = 0x0C; 861 public const byte Uuid = 0x0D; 862 } 863 } 864 } 865