1 // Licensed to the Apache Software Foundation(ASF) under one
2 // or more contributor license agreements.See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 using System;
19 using System.Buffers.Binary;
20 using System.Text;
21 using System.Threading;
22 using System.Threading.Tasks;
23 using Thrift.Protocol.Entities;
24 using Thrift.Protocol.Utilities;
25 using Thrift.Transport;
26 
27 
28 namespace Thrift.Protocol
29 {
30     // ReSharper disable once InconsistentNaming
31     public class TBinaryProtocol : TProtocol
32     {
33         protected const uint VersionMask = 0xffff0000;
34         protected const uint Version1 = 0x80010000;
35 
36         protected bool StrictRead;
37         protected bool StrictWrite;
38 
39         // minimize memory allocations by means of an preallocated bytes buffer
40         // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
41         private readonly byte[] PreAllocatedBuffer = new byte[128];
42 
TBinaryProtocol(TTransport trans)43         public TBinaryProtocol(TTransport trans)
44             : this(trans, false, true)
45         {
46         }
47 
TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite)48         public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite)
49             : base(trans)
50         {
51             StrictRead = strictRead;
52             StrictWrite = strictWrite;
53         }
54 
WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)55         public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
56         {
57             cancellationToken.ThrowIfCancellationRequested();
58 
59             if (StrictWrite)
60             {
61                 var version = Version1 | (uint) message.Type;
62                 await WriteI32Async((int) version, cancellationToken);
63                 await WriteStringAsync(message.Name, cancellationToken);
64                 await WriteI32Async(message.SeqID, cancellationToken);
65             }
66             else
67             {
68                 await WriteStringAsync(message.Name, cancellationToken);
69                 await WriteByteAsync((sbyte) message.Type, cancellationToken);
70                 await WriteI32Async(message.SeqID, cancellationToken);
71             }
72         }
73 
WriteMessageEndAsync(CancellationToken cancellationToken)74         public override Task WriteMessageEndAsync(CancellationToken cancellationToken)
75         {
76             cancellationToken.ThrowIfCancellationRequested();
77             return Task.CompletedTask;
78         }
79 
80         public override Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
81         {
82             cancellationToken.ThrowIfCancellationRequested();
83             return Task.CompletedTask;
84         }
85 
WriteStructEndAsync(CancellationToken cancellationToken)86         public override Task WriteStructEndAsync(CancellationToken cancellationToken)
87         {
88             cancellationToken.ThrowIfCancellationRequested();
89             return Task.CompletedTask;
90         }
91 
WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)92         public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
93         {
94             cancellationToken.ThrowIfCancellationRequested();
95             await WriteByteAsync((sbyte) field.Type, cancellationToken);
96             await WriteI16Async(field.ID, cancellationToken);
97         }
98 
WriteFieldEndAsync(CancellationToken cancellationToken)99         public override Task WriteFieldEndAsync(CancellationToken cancellationToken)
100         {
101             cancellationToken.ThrowIfCancellationRequested();
102             return Task.CompletedTask;
103         }
104 
WriteFieldStopAsync(CancellationToken cancellationToken)105         public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
106         {
107             cancellationToken.ThrowIfCancellationRequested();
108 
109             await WriteByteAsync((sbyte) TType.Stop, cancellationToken);
110         }
111 
WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)112         public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
113         {
114             cancellationToken.ThrowIfCancellationRequested();
115 
116             PreAllocatedBuffer[0] = (byte)map.KeyType;
117             PreAllocatedBuffer[1] = (byte)map.ValueType;
118             await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
119 
120             await WriteI32Async(map.Count, cancellationToken);
121         }
122 
WriteMapEndAsync(CancellationToken cancellationToken)123         public override Task WriteMapEndAsync(CancellationToken cancellationToken)
124         {
125             cancellationToken.ThrowIfCancellationRequested();
126             return Task.CompletedTask;
127         }
128 
WriteListBeginAsync(TList list, CancellationToken cancellationToken)129         public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
130         {
131             cancellationToken.ThrowIfCancellationRequested();
132             await WriteByteAsync((sbyte) list.ElementType, cancellationToken);
133             await WriteI32Async(list.Count, cancellationToken);
134         }
135 
WriteListEndAsync(CancellationToken cancellationToken)136         public override Task WriteListEndAsync(CancellationToken cancellationToken)
137         {
138             cancellationToken.ThrowIfCancellationRequested();
139             return Task.CompletedTask;
140         }
141 
WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)142         public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
143         {
144             cancellationToken.ThrowIfCancellationRequested();
145             await WriteByteAsync((sbyte) set.ElementType, cancellationToken);
146             await WriteI32Async(set.Count, cancellationToken);
147         }
148 
WriteSetEndAsync(CancellationToken cancellationToken)149         public override Task WriteSetEndAsync(CancellationToken cancellationToken)
150         {
151             cancellationToken.ThrowIfCancellationRequested();
152             return Task.CompletedTask;
153         }
154 
WriteBoolAsync(bool b, CancellationToken cancellationToken)155         public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
156         {
157             cancellationToken.ThrowIfCancellationRequested();
158             await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken);
159         }
160 
WriteByteAsync(sbyte b, CancellationToken cancellationToken)161         public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
162         {
163             cancellationToken.ThrowIfCancellationRequested();
164 
165             PreAllocatedBuffer[0] = (byte)b;
166 
167             await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
168         }
WriteI16Async(short i16, CancellationToken cancellationToken)169         public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
170         {
171             cancellationToken.ThrowIfCancellationRequested();
172 
173             BinaryPrimitives.WriteInt16BigEndian(PreAllocatedBuffer, i16);
174 
175             await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
176         }
177 
WriteI32Async(int i32, CancellationToken cancellationToken)178         public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
179         {
180             cancellationToken.ThrowIfCancellationRequested();
181 
182             BinaryPrimitives.WriteInt32BigEndian(PreAllocatedBuffer, i32);
183 
184             await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
185         }
186 
187 
WriteI64Async(long i64, CancellationToken cancellationToken)188         public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
189         {
190             cancellationToken.ThrowIfCancellationRequested();
191 
192             BinaryPrimitives.WriteInt64BigEndian(PreAllocatedBuffer, i64);
193 
194             await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
195         }
196 
WriteDoubleAsync(double d, CancellationToken cancellationToken)197         public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
198         {
199             cancellationToken.ThrowIfCancellationRequested();
200 
201             await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken);
202         }
203 
204 
WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)205         public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
206         {
207             cancellationToken.ThrowIfCancellationRequested();
208 
209             await WriteI32Async(bytes.Length, cancellationToken);
210             await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
211         }
212 
WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)213         public override async Task WriteUuidAsync(Guid uuid, CancellationToken cancellationToken)
214         {
215             cancellationToken.ThrowIfCancellationRequested();
216 
217             var bytes = uuid.SwapByteOrder().ToByteArray();
218             await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
219         }
220 
ReadMessageBeginAsync(CancellationToken cancellationToken)221         public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
222         {
223             cancellationToken.ThrowIfCancellationRequested();
224 
225             var message = new TMessage();
226             var size = await ReadI32Async(cancellationToken);
227             if (size < 0)
228             {
229                 var version = (uint) size & VersionMask;
230                 if (version != Version1)
231                 {
232                     throw new TProtocolException(TProtocolException.BAD_VERSION,
233                         $"Bad version in ReadMessageBegin: {version}");
234                 }
235                 message.Type = (TMessageType) (size & 0x000000ff);
236                 message.Name = await ReadStringAsync(cancellationToken);
237                 message.SeqID = await ReadI32Async(cancellationToken);
238             }
239             else
240             {
241                 if (StrictRead)
242                 {
243                     throw new TProtocolException(TProtocolException.BAD_VERSION,
244                         "Missing version in ReadMessageBegin, old client?");
245                 }
246                 message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
247                 message.Type = (TMessageType) await ReadByteAsync(cancellationToken);
248                 message.SeqID = await ReadI32Async(cancellationToken);
249             }
250             return message;
251         }
252 
ReadMessageEndAsync(CancellationToken cancellationToken)253         public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
254         {
255             cancellationToken.ThrowIfCancellationRequested();
256             Transport.ResetConsumedMessageSize();
257             return Task.CompletedTask;
258         }
259 
ReadStructBeginAsync(CancellationToken cancellationToken)260         public override ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
261         {
262             cancellationToken.ThrowIfCancellationRequested();
263             return new ValueTask<TStruct>(AnonymousStruct);
264         }
265 
ReadStructEndAsync(CancellationToken cancellationToken)266         public override Task ReadStructEndAsync(CancellationToken cancellationToken)
267         {
268             cancellationToken.ThrowIfCancellationRequested();
269             return Task.CompletedTask;
270         }
271 
ReadFieldBeginAsync(CancellationToken cancellationToken)272         public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
273         {
274             cancellationToken.ThrowIfCancellationRequested();
275 
276             var type = (TType)await ReadByteAsync(cancellationToken);
277             if (type == TType.Stop)
278             {
279                 return StopField;
280             }
281 
282             return new TField {
283                 Type = type,
284                 ID = await ReadI16Async(cancellationToken)
285             };
286         }
287 
ReadFieldEndAsync(CancellationToken cancellationToken)288         public override Task ReadFieldEndAsync(CancellationToken cancellationToken)
289         {
290             cancellationToken.ThrowIfCancellationRequested();
291             return Task.CompletedTask;
292         }
293 
ReadMapBeginAsync(CancellationToken cancellationToken)294         public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
295         {
296             cancellationToken.ThrowIfCancellationRequested();
297 
298             var map = new TMap
299             {
300                 KeyType = (TType) await ReadByteAsync(cancellationToken),
301                 ValueType = (TType) await ReadByteAsync(cancellationToken),
302                 Count = await ReadI32Async(cancellationToken)
303             };
304             CheckReadBytesAvailable(map);
305             return map;
306         }
307 
ReadMapEndAsync(CancellationToken cancellationToken)308         public override Task ReadMapEndAsync(CancellationToken cancellationToken)
309         {
310             cancellationToken.ThrowIfCancellationRequested();
311             return Task.CompletedTask;
312         }
313 
ReadListBeginAsync(CancellationToken cancellationToken)314         public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
315         {
316             cancellationToken.ThrowIfCancellationRequested();
317 
318             var list = new TList
319             {
320                 ElementType = (TType) await ReadByteAsync(cancellationToken),
321                 Count = await ReadI32Async(cancellationToken)
322             };
323             CheckReadBytesAvailable(list);
324             return list;
325         }
326 
ReadListEndAsync(CancellationToken cancellationToken)327         public override Task ReadListEndAsync(CancellationToken cancellationToken)
328         {
329             cancellationToken.ThrowIfCancellationRequested();
330             return Task.CompletedTask;
331         }
332 
ReadSetBeginAsync(CancellationToken cancellationToken)333         public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
334         {
335             cancellationToken.ThrowIfCancellationRequested();
336 
337             var set = new TSet
338             {
339                 ElementType = (TType) await ReadByteAsync(cancellationToken),
340                 Count = await ReadI32Async(cancellationToken)
341             };
342             CheckReadBytesAvailable(set);
343             return set;
344         }
345 
ReadSetEndAsync(CancellationToken cancellationToken)346         public override Task ReadSetEndAsync(CancellationToken cancellationToken)
347         {
348             cancellationToken.ThrowIfCancellationRequested();
349             return Task.CompletedTask;
350         }
351 
ReadBoolAsync(CancellationToken cancellationToken)352         public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
353         {
354             cancellationToken.ThrowIfCancellationRequested();
355 
356             return await ReadByteAsync(cancellationToken) == 1;
357         }
358 
ReadByteAsync(CancellationToken cancellationToken)359         public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
360         {
361             cancellationToken.ThrowIfCancellationRequested();
362 
363             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
364             return (sbyte)PreAllocatedBuffer[0];
365         }
366 
ReadI16Async(CancellationToken cancellationToken)367         public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
368         {
369             cancellationToken.ThrowIfCancellationRequested();
370 
371             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
372             var result = BinaryPrimitives.ReadInt16BigEndian(PreAllocatedBuffer);
373             return result;
374         }
375 
ReadI32Async(CancellationToken cancellationToken)376         public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
377         {
378             cancellationToken.ThrowIfCancellationRequested();
379 
380             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
381 
382             var result = BinaryPrimitives.ReadInt32BigEndian(PreAllocatedBuffer);
383 
384             return result;
385         }
386 
ReadI64Async(CancellationToken cancellationToken)387         public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
388         {
389             cancellationToken.ThrowIfCancellationRequested();
390 
391             await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
392             return BinaryPrimitives.ReadInt64BigEndian(PreAllocatedBuffer);
393         }
394 
ReadDoubleAsync(CancellationToken cancellationToken)395         public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
396         {
397             cancellationToken.ThrowIfCancellationRequested();
398 
399             var d = await ReadI64Async(cancellationToken);
400             return BitConverter.Int64BitsToDouble(d);
401         }
402 
ReadBinaryAsync(CancellationToken cancellationToken)403         public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
404         {
405             cancellationToken.ThrowIfCancellationRequested();
406 
407             var size = await ReadI32Async(cancellationToken);
408             Transport.CheckReadBytesAvailable(size);
409             var buf = new byte[size];
410             await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
411             return buf;
412         }
413 
ReadUuidAsync(CancellationToken cancellationToken)414         public override async ValueTask<Guid> ReadUuidAsync(CancellationToken cancellationToken)
415         {
416             cancellationToken.ThrowIfCancellationRequested();
417 
418             Transport.CheckReadBytesAvailable(16);  // = sizeof(uuid)
419             var buf = new byte[16];
420             await Trans.ReadAllAsync(buf, 0, 16, cancellationToken);
421             return new Guid(buf).SwapByteOrder();
422         }
423 
ReadStringAsync(CancellationToken cancellationToken)424         public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
425         {
426             cancellationToken.ThrowIfCancellationRequested();
427 
428             var size = await ReadI32Async(cancellationToken);
429             return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
430         }
431 
ReadStringBodyAsync(int size, CancellationToken cancellationToken)432         private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
433         {
434             cancellationToken.ThrowIfCancellationRequested();
435 
436             if (size <= PreAllocatedBuffer.Length)
437             {
438                 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken);
439                 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size);
440             }
441 
442             Transport.CheckReadBytesAvailable(size);
443             var buf = new byte[size];
444             await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
445             return Encoding.UTF8.GetString(buf, 0, buf.Length);
446         }
447 
448         // Return the minimum number of bytes a type will consume on the wire
GetMinSerializedSize(TType type)449         public override int GetMinSerializedSize(TType type)
450         {
451             switch (type)
452             {
453                 case TType.Stop: return 0;
454                 case TType.Void: return 0;
455                 case TType.Bool: return sizeof(byte);
456                 case TType.Byte: return sizeof(byte);
457                 case TType.Double: return sizeof(double);
458                 case TType.I16: return sizeof(short);
459                 case TType.I32: return sizeof(int);
460                 case TType.I64: return sizeof(long);
461                 case TType.String: return sizeof(int);  // string length
462                 case TType.Struct: return 0;  // empty struct
463                 case TType.Map: return sizeof(int);  // element count
464                 case TType.Set: return sizeof(int);  // element count
465                 case TType.List: return sizeof(int);  // element count
466                 case TType.Uuid: return 16;  // uuid bytes
467                 default: throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "unrecognized type code");
468             }
469         }
470 
471         public class Factory : TProtocolFactory
472         {
473             protected bool StrictRead;
474             protected bool StrictWrite;
475 
Factory()476             public Factory()
477                 : this(false, true)
478             {
479             }
480 
Factory(bool strictRead, bool strictWrite)481             public Factory(bool strictRead, bool strictWrite)
482             {
483                 StrictRead = strictRead;
484                 StrictWrite = strictWrite;
485             }
486 
GetProtocol(TTransport trans)487             public override TProtocol GetProtocol(TTransport trans)
488             {
489                 return new TBinaryProtocol(trans, StrictRead, StrictWrite);
490             }
491         }
492     }
493 }
494