1 // Licensed to the Apache Software Foundation(ASF) under one 2 // or more contributor license agreements.See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership.The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 using System; 19 using System.Buffers.Binary; 20 using System.Text; 21 using System.Threading; 22 using System.Threading.Tasks; 23 using Thrift.Protocol.Entities; 24 using Thrift.Protocol.Utilities; 25 using Thrift.Transport; 26 27 28 namespace Thrift.Protocol 29 { 30 // ReSharper disable once InconsistentNaming 31 public class TBinaryProtocol : TProtocol 32 { 33 protected const uint VersionMask = 0xffff0000; 34 protected const uint Version1 = 0x80010000; 35 36 protected bool StrictRead; 37 protected bool StrictWrite; 38 39 // minimize memory allocations by means of an preallocated bytes buffer 40 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) 41 private readonly byte[] PreAllocatedBuffer = new byte[128]; 42 TBinaryProtocol(TTransport trans)43 public TBinaryProtocol(TTransport trans) 44 : this(trans, false, true) 45 { 46 } 47 TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite)48 public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite) 49 : base(trans) 50 { 51 StrictRead = strictRead; 52 StrictWrite = strictWrite; 53 } 54 WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)55 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) 56 { 57 cancellationToken.ThrowIfCancellationRequested(); 58 59 if (StrictWrite) 60 { 61 var version = Version1 | (uint) message.Type; 62 await WriteI32Async((int) version, cancellationToken); 63 await WriteStringAsync(message.Name, cancellationToken); 64 await WriteI32Async(message.SeqID, cancellationToken); 65 } 66 else 67 { 68 await WriteStringAsync(message.Name, cancellationToken); 69 await WriteByteAsync((sbyte) message.Type, cancellationToken); 70 await WriteI32Async(message.SeqID, cancellationToken); 71 } 72 } 73 WriteMessageEndAsync(CancellationToken cancellationToken)74 public override Task WriteMessageEndAsync(CancellationToken cancellationToken) 75 { 76 cancellationToken.ThrowIfCancellationRequested(); 77 return Task.CompletedTask; 78 } 79 80 public override Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) 81 { 82 cancellationToken.ThrowIfCancellationRequested(); 83 return Task.CompletedTask; 84 } 85 WriteStructEndAsync(CancellationToken cancellationToken)86 public override Task WriteStructEndAsync(CancellationToken cancellationToken) 87 { 88 cancellationToken.ThrowIfCancellationRequested(); 89 return Task.CompletedTask; 90 } 91 WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)92 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) 93 { 94 cancellationToken.ThrowIfCancellationRequested(); 95 await WriteByteAsync((sbyte) field.Type, cancellationToken); 96 await WriteI16Async(field.ID, cancellationToken); 97 } 98 WriteFieldEndAsync(CancellationToken cancellationToken)99 public override Task WriteFieldEndAsync(CancellationToken cancellationToken) 100 { 101 cancellationToken.ThrowIfCancellationRequested(); 102 return Task.CompletedTask; 103 } 104 WriteFieldStopAsync(CancellationToken cancellationToken)105 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) 106 { 107 cancellationToken.ThrowIfCancellationRequested(); 108 109 await WriteByteAsync((sbyte) TType.Stop, cancellationToken); 110 } 111 WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)112 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) 113 { 114 cancellationToken.ThrowIfCancellationRequested(); 115 116 PreAllocatedBuffer[0] = (byte)map.KeyType; 117 PreAllocatedBuffer[1] = (byte)map.ValueType; 118 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); 119 120 await WriteI32Async(map.Count, cancellationToken); 121 } 122 WriteMapEndAsync(CancellationToken cancellationToken)123 public override Task WriteMapEndAsync(CancellationToken cancellationToken) 124 { 125 cancellationToken.ThrowIfCancellationRequested(); 126 return Task.CompletedTask; 127 } 128 WriteListBeginAsync(TList list, CancellationToken cancellationToken)129 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) 130 { 131 cancellationToken.ThrowIfCancellationRequested(); 132 await WriteByteAsync((sbyte) list.ElementType, cancellationToken); 133 await WriteI32Async(list.Count, cancellationToken); 134 } 135 WriteListEndAsync(CancellationToken cancellationToken)136 public override Task WriteListEndAsync(CancellationToken cancellationToken) 137 { 138 cancellationToken.ThrowIfCancellationRequested(); 139 return Task.CompletedTask; 140 } 141 WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)142 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) 143 { 144 cancellationToken.ThrowIfCancellationRequested(); 145 await WriteByteAsync((sbyte) set.ElementType, cancellationToken); 146 await WriteI32Async(set.Count, cancellationToken); 147 } 148 WriteSetEndAsync(CancellationToken cancellationToken)149 public override Task WriteSetEndAsync(CancellationToken cancellationToken) 150 { 151 cancellationToken.ThrowIfCancellationRequested(); 152 return Task.CompletedTask; 153 } 154 WriteBoolAsync(bool b, CancellationToken cancellationToken)155 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) 156 { 157 cancellationToken.ThrowIfCancellationRequested(); 158 await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken); 159 } 160 WriteByteAsync(sbyte b, CancellationToken cancellationToken)161 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) 162 { 163 cancellationToken.ThrowIfCancellationRequested(); 164 165 PreAllocatedBuffer[0] = (byte)b; 166 167 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 168 } WriteI16Async(short i16, CancellationToken cancellationToken)169 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) 170 { 171 cancellationToken.ThrowIfCancellationRequested(); 172 173 BinaryPrimitives.WriteInt16BigEndian(PreAllocatedBuffer, i16); 174 175 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); 176 } 177 WriteI32Async(int i32, CancellationToken cancellationToken)178 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) 179 { 180 cancellationToken.ThrowIfCancellationRequested(); 181 182 BinaryPrimitives.WriteInt32BigEndian(PreAllocatedBuffer, i32); 183 184 await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken); 185 } 186 187 WriteI64Async(long i64, CancellationToken cancellationToken)188 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) 189 { 190 cancellationToken.ThrowIfCancellationRequested(); 191 192 BinaryPrimitives.WriteInt64BigEndian(PreAllocatedBuffer, i64); 193 194 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken); 195 } 196 WriteDoubleAsync(double d, CancellationToken cancellationToken)197 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) 198 { 199 cancellationToken.ThrowIfCancellationRequested(); 200 201 await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken); 202 } 203 204 WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)205 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) 206 { 207 cancellationToken.ThrowIfCancellationRequested(); 208 209 await WriteI32Async(bytes.Length, cancellationToken); 210 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); 211 } 212 WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)213 public override async Task WriteUuidAsync(Guid uuid, CancellationToken cancellationToken) 214 { 215 cancellationToken.ThrowIfCancellationRequested(); 216 217 var bytes = uuid.SwapByteOrder().ToByteArray(); 218 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); 219 } 220 ReadMessageBeginAsync(CancellationToken cancellationToken)221 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) 222 { 223 cancellationToken.ThrowIfCancellationRequested(); 224 225 var message = new TMessage(); 226 var size = await ReadI32Async(cancellationToken); 227 if (size < 0) 228 { 229 var version = (uint) size & VersionMask; 230 if (version != Version1) 231 { 232 throw new TProtocolException(TProtocolException.BAD_VERSION, 233 $"Bad version in ReadMessageBegin: {version}"); 234 } 235 message.Type = (TMessageType) (size & 0x000000ff); 236 message.Name = await ReadStringAsync(cancellationToken); 237 message.SeqID = await ReadI32Async(cancellationToken); 238 } 239 else 240 { 241 if (StrictRead) 242 { 243 throw new TProtocolException(TProtocolException.BAD_VERSION, 244 "Missing version in ReadMessageBegin, old client?"); 245 } 246 message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; 247 message.Type = (TMessageType) await ReadByteAsync(cancellationToken); 248 message.SeqID = await ReadI32Async(cancellationToken); 249 } 250 return message; 251 } 252 ReadMessageEndAsync(CancellationToken cancellationToken)253 public override Task ReadMessageEndAsync(CancellationToken cancellationToken) 254 { 255 cancellationToken.ThrowIfCancellationRequested(); 256 Transport.ResetConsumedMessageSize(); 257 return Task.CompletedTask; 258 } 259 ReadStructBeginAsync(CancellationToken cancellationToken)260 public override ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) 261 { 262 cancellationToken.ThrowIfCancellationRequested(); 263 return new ValueTask<TStruct>(AnonymousStruct); 264 } 265 ReadStructEndAsync(CancellationToken cancellationToken)266 public override Task ReadStructEndAsync(CancellationToken cancellationToken) 267 { 268 cancellationToken.ThrowIfCancellationRequested(); 269 return Task.CompletedTask; 270 } 271 ReadFieldBeginAsync(CancellationToken cancellationToken)272 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) 273 { 274 cancellationToken.ThrowIfCancellationRequested(); 275 276 var type = (TType)await ReadByteAsync(cancellationToken); 277 if (type == TType.Stop) 278 { 279 return StopField; 280 } 281 282 return new TField { 283 Type = type, 284 ID = await ReadI16Async(cancellationToken) 285 }; 286 } 287 ReadFieldEndAsync(CancellationToken cancellationToken)288 public override Task ReadFieldEndAsync(CancellationToken cancellationToken) 289 { 290 cancellationToken.ThrowIfCancellationRequested(); 291 return Task.CompletedTask; 292 } 293 ReadMapBeginAsync(CancellationToken cancellationToken)294 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) 295 { 296 cancellationToken.ThrowIfCancellationRequested(); 297 298 var map = new TMap 299 { 300 KeyType = (TType) await ReadByteAsync(cancellationToken), 301 ValueType = (TType) await ReadByteAsync(cancellationToken), 302 Count = await ReadI32Async(cancellationToken) 303 }; 304 CheckReadBytesAvailable(map); 305 return map; 306 } 307 ReadMapEndAsync(CancellationToken cancellationToken)308 public override Task ReadMapEndAsync(CancellationToken cancellationToken) 309 { 310 cancellationToken.ThrowIfCancellationRequested(); 311 return Task.CompletedTask; 312 } 313 ReadListBeginAsync(CancellationToken cancellationToken)314 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) 315 { 316 cancellationToken.ThrowIfCancellationRequested(); 317 318 var list = new TList 319 { 320 ElementType = (TType) await ReadByteAsync(cancellationToken), 321 Count = await ReadI32Async(cancellationToken) 322 }; 323 CheckReadBytesAvailable(list); 324 return list; 325 } 326 ReadListEndAsync(CancellationToken cancellationToken)327 public override Task ReadListEndAsync(CancellationToken cancellationToken) 328 { 329 cancellationToken.ThrowIfCancellationRequested(); 330 return Task.CompletedTask; 331 } 332 ReadSetBeginAsync(CancellationToken cancellationToken)333 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) 334 { 335 cancellationToken.ThrowIfCancellationRequested(); 336 337 var set = new TSet 338 { 339 ElementType = (TType) await ReadByteAsync(cancellationToken), 340 Count = await ReadI32Async(cancellationToken) 341 }; 342 CheckReadBytesAvailable(set); 343 return set; 344 } 345 ReadSetEndAsync(CancellationToken cancellationToken)346 public override Task ReadSetEndAsync(CancellationToken cancellationToken) 347 { 348 cancellationToken.ThrowIfCancellationRequested(); 349 return Task.CompletedTask; 350 } 351 ReadBoolAsync(CancellationToken cancellationToken)352 public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) 353 { 354 cancellationToken.ThrowIfCancellationRequested(); 355 356 return await ReadByteAsync(cancellationToken) == 1; 357 } 358 ReadByteAsync(CancellationToken cancellationToken)359 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) 360 { 361 cancellationToken.ThrowIfCancellationRequested(); 362 363 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken); 364 return (sbyte)PreAllocatedBuffer[0]; 365 } 366 ReadI16Async(CancellationToken cancellationToken)367 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) 368 { 369 cancellationToken.ThrowIfCancellationRequested(); 370 371 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken); 372 var result = BinaryPrimitives.ReadInt16BigEndian(PreAllocatedBuffer); 373 return result; 374 } 375 ReadI32Async(CancellationToken cancellationToken)376 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) 377 { 378 cancellationToken.ThrowIfCancellationRequested(); 379 380 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken); 381 382 var result = BinaryPrimitives.ReadInt32BigEndian(PreAllocatedBuffer); 383 384 return result; 385 } 386 ReadI64Async(CancellationToken cancellationToken)387 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) 388 { 389 cancellationToken.ThrowIfCancellationRequested(); 390 391 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken); 392 return BinaryPrimitives.ReadInt64BigEndian(PreAllocatedBuffer); 393 } 394 ReadDoubleAsync(CancellationToken cancellationToken)395 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) 396 { 397 cancellationToken.ThrowIfCancellationRequested(); 398 399 var d = await ReadI64Async(cancellationToken); 400 return BitConverter.Int64BitsToDouble(d); 401 } 402 ReadBinaryAsync(CancellationToken cancellationToken)403 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) 404 { 405 cancellationToken.ThrowIfCancellationRequested(); 406 407 var size = await ReadI32Async(cancellationToken); 408 Transport.CheckReadBytesAvailable(size); 409 var buf = new byte[size]; 410 await Trans.ReadAllAsync(buf, 0, size, cancellationToken); 411 return buf; 412 } 413 ReadUuidAsync(CancellationToken cancellationToken)414 public override async ValueTask<Guid> ReadUuidAsync(CancellationToken cancellationToken) 415 { 416 cancellationToken.ThrowIfCancellationRequested(); 417 418 Transport.CheckReadBytesAvailable(16); // = sizeof(uuid) 419 var buf = new byte[16]; 420 await Trans.ReadAllAsync(buf, 0, 16, cancellationToken); 421 return new Guid(buf).SwapByteOrder(); 422 } 423 ReadStringAsync(CancellationToken cancellationToken)424 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) 425 { 426 cancellationToken.ThrowIfCancellationRequested(); 427 428 var size = await ReadI32Async(cancellationToken); 429 return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; 430 } 431 ReadStringBodyAsync(int size, CancellationToken cancellationToken)432 private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken) 433 { 434 cancellationToken.ThrowIfCancellationRequested(); 435 436 if (size <= PreAllocatedBuffer.Length) 437 { 438 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken); 439 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size); 440 } 441 442 Transport.CheckReadBytesAvailable(size); 443 var buf = new byte[size]; 444 await Trans.ReadAllAsync(buf, 0, size, cancellationToken); 445 return Encoding.UTF8.GetString(buf, 0, buf.Length); 446 } 447 448 // Return the minimum number of bytes a type will consume on the wire GetMinSerializedSize(TType type)449 public override int GetMinSerializedSize(TType type) 450 { 451 switch (type) 452 { 453 case TType.Stop: return 0; 454 case TType.Void: return 0; 455 case TType.Bool: return sizeof(byte); 456 case TType.Byte: return sizeof(byte); 457 case TType.Double: return sizeof(double); 458 case TType.I16: return sizeof(short); 459 case TType.I32: return sizeof(int); 460 case TType.I64: return sizeof(long); 461 case TType.String: return sizeof(int); // string length 462 case TType.Struct: return 0; // empty struct 463 case TType.Map: return sizeof(int); // element count 464 case TType.Set: return sizeof(int); // element count 465 case TType.List: return sizeof(int); // element count 466 case TType.Uuid: return 16; // uuid bytes 467 default: throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "unrecognized type code"); 468 } 469 } 470 471 public class Factory : TProtocolFactory 472 { 473 protected bool StrictRead; 474 protected bool StrictWrite; 475 Factory()476 public Factory() 477 : this(false, true) 478 { 479 } 480 Factory(bool strictRead, bool strictWrite)481 public Factory(bool strictRead, bool strictWrite) 482 { 483 StrictRead = strictRead; 484 StrictWrite = strictWrite; 485 } 486 GetProtocol(TTransport trans)487 public override TProtocol GetProtocol(TTransport trans) 488 { 489 return new TBinaryProtocol(trans, StrictRead, StrictWrite); 490 } 491 } 492 } 493 } 494