1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 package org.apache.thrift.protocol;
21 
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.function.IntFunction;
29 import org.apache.thrift.TException;
30 import org.apache.thrift.partial.TFieldData;
31 import org.apache.thrift.scheme.IScheme;
32 import org.apache.thrift.scheme.StandardScheme;
33 import org.apache.thrift.transport.TTransport;
34 
35 /** Protocol interface definition. */
36 public abstract class TProtocol implements TWriteProtocol, TReadProtocol {
37 
38   /** Prevent direct instantiation */
39   @SuppressWarnings("unused")
TProtocol()40   private TProtocol() {}
41 
42   /** Transport */
43   protected TTransport trans_;
44 
45   /** Constructor */
TProtocol(TTransport trans)46   protected TProtocol(TTransport trans) {
47     trans_ = trans;
48   }
49 
50   /** Transport accessor */
getTransport()51   public TTransport getTransport() {
52     return trans_;
53   }
54 
checkReadBytesAvailable(TMap map)55   protected void checkReadBytesAvailable(TMap map) throws TException {
56     long elemSize = getMinSerializedSize(map.keyType) + getMinSerializedSize(map.valueType);
57     trans_.checkReadBytesAvailable(map.size * elemSize);
58   }
59 
checkReadBytesAvailable(TList list)60   protected void checkReadBytesAvailable(TList list) throws TException {
61     long size = list.getSize();
62     trans_.checkReadBytesAvailable(size * getMinSerializedSize(list.elemType));
63   }
64 
checkReadBytesAvailable(TSet set)65   protected void checkReadBytesAvailable(TSet set) throws TException {
66     long size = set.getSize();
67     trans_.checkReadBytesAvailable(size * getMinSerializedSize(set.elemType));
68   }
69 
70   /**
71    * Return min serialized size in bytes
72    *
73    * @param type Returns the minimum amount of bytes needed to store the smallest possible instance
74    *     of TType.
75    * @return min serialized size
76    * @throws TException when error happens
77    */
getMinSerializedSize(byte type)78   public abstract int getMinSerializedSize(byte type) throws TException;
79 
80   public interface WriteCallback<T> {
call(T e)81     void call(T e) throws TException;
82   }
83 
84   public interface ReadCallback<T, R> {
accept(T t)85     R accept(T t) throws TException;
86   }
87 
88   public interface ReadCollectionCallback<R> {
call()89     R call() throws TException;
90   }
91 
92   public interface ReadMapEntryCallback<K, V> {
getKey()93     K getKey() throws TException;
94 
getValue()95     V getValue() throws TException;
96   }
97 
writeSet(byte elementType, Set<T> set, WriteCallback<T> callback)98   public final <T> void writeSet(byte elementType, Set<T> set, WriteCallback<T> callback)
99       throws TException {
100     writeSetBegin(new TSet(elementType, set.size()));
101     for (T t : set) {
102       callback.call(t);
103     }
104     writeSetEnd();
105   }
106 
writeList(byte elementType, List<T> list, WriteCallback<T> callback)107   public final <T> void writeList(byte elementType, List<T> list, WriteCallback<T> callback)
108       throws TException {
109     writeListBegin(new TList(elementType, list.size()));
110     for (T t : list) {
111       callback.call(t);
112     }
113     writeListEnd();
114   }
115 
writeMap( byte keyType, byte valueType, Map<K, V> map, WriteCallback<Map.Entry<K, V>> callback)116   public final <K, V> void writeMap(
117       byte keyType, byte valueType, Map<K, V> map, WriteCallback<Map.Entry<K, V>> callback)
118       throws TException {
119     writeMapBegin(new TMap(keyType, valueType, map.size()));
120     for (Map.Entry<K, V> entry : map.entrySet()) {
121       callback.call(entry);
122     }
123     writeMapEnd();
124   }
125 
writeField(TField field, WriteCallback<Void> callback)126   public final void writeField(TField field, WriteCallback<Void> callback) throws TException {
127     writeFieldBegin(field);
128     callback.call(null);
129     writeFieldEnd();
130   }
131 
writeStruct(TStruct struct, WriteCallback<Void> callback)132   public final void writeStruct(TStruct struct, WriteCallback<Void> callback) throws TException {
133     writeStructBegin(struct);
134     callback.call(null);
135     writeStructEnd();
136   }
137 
writeMessage(TMessage message, WriteCallback<Void> callback)138   public final void writeMessage(TMessage message, WriteCallback<Void> callback) throws TException {
139     writeMessageBegin(message);
140     callback.call(null);
141     writeMessageEnd();
142   }
143 
144   /**
145    * read a message by delegating to a callback, handles {@link #readMessageBegin() begin} and
146    * {@link #readMessageEnd() end} automatically.
147    *
148    * @param callback callback for actual reading
149    * @param <T> result message type
150    * @return the message read
151    * @throws TException when any sub-operation failed
152    */
readMessage(ReadCallback<TMessage, T> callback)153   public final <T> T readMessage(ReadCallback<TMessage, T> callback) throws TException {
154     TMessage tMessage = readMessageBegin();
155     T t = callback.accept(tMessage);
156     readMessageEnd();
157     return t;
158   }
159 
160   /**
161    * read a struct by delegating to a callback, handles {@link #readStructBegin() begin} and {@link
162    * #readStructEnd() end} automatically.
163    *
164    * @param callback callback for actual reading
165    * @param <T> result struct type
166    * @return the struct read
167    * @throws TException when any sub-operation failed
168    */
readStruct(ReadCallback<TStruct, T> callback)169   public final <T> T readStruct(ReadCallback<TStruct, T> callback) throws TException {
170     TStruct tStruct = readStructBegin();
171     T t = callback.accept(tStruct);
172     readStructEnd();
173     return t;
174   }
175 
176   /**
177    * read a field by delegating to a callback, handles {@link #readFieldBegin() begin} and {@link
178    * #readFieldEnd() end} automatically, and returns whether the {@link TType#STOP stop signal} was
179    * encountered. Because the value is not returned, you (the compiler generated code in most cases)
180    * are expected to set the field yourself within the callback.
181    *
182    * @param callback callback for reading a field
183    * @param <T> result field type
184    * @return true if a stop signal was encountered, false otherwise
185    * @throws Exception when any sub-operation failed
186    */
readField(ReadCallback<TField, T> callback)187   public final <T> boolean readField(ReadCallback<TField, T> callback) throws Exception {
188     TField tField = readFieldBegin();
189     if (tField.type == org.apache.thrift.protocol.TType.STOP) {
190       return true;
191     }
192     callback.accept(tField);
193     readFieldEnd();
194     return false;
195   }
196 
197   /**
198    * read a {@link Map} of elements by delegating to the callback, handles {@link #readMapBegin()
199    * begin} and {@link #readMapEnd() end} automatically.
200    *
201    * @param callback callback for reading the map
202    * @param <T> result map type
203    * @return the map read
204    * @throws TException when any sub-operation fails
205    */
readMap(ReadCallback<TMap, T> callback)206   public final <T extends Map<?, ?>> T readMap(ReadCallback<TMap, T> callback) throws TException {
207     TMap tMap = readMapBegin();
208     T t = callback.accept(tMap);
209     readMapEnd();
210     return t;
211   }
212 
213   /**
214    * read a {@link Map} of elements by delegating key and value reading to the callback, handles
215    * {@link #readMapBegin() begin} and {@link #readMapEnd() end} automatically.
216    *
217    * @param callback callback for reading keys and values, calls to {@link
218    *     ReadMapEntryCallback#getKey()} and {@link ReadMapEntryCallback#getValue()} will be in
219    *     alternating orders, i.e. k1, v1, k2, v2, .., k_n, v_n
220    * @param <K> key type
221    * @param <V> value type
222    * @return the map read
223    * @throws TException when any sub-operation fails
224    */
readMap(ReadMapEntryCallback<K, V> callback)225   public final <K, V> Map<K, V> readMap(ReadMapEntryCallback<K, V> callback) throws TException {
226     return readMap(callback, HashMap::new);
227   }
228 
229   /**
230    * read a {@link Map} of elements by delegating key and value reading to the callback, handles
231    * {@link #readMapBegin() begin} and {@link #readMapEnd() end} automatically, with a specialized
232    * map creator given the size hint.
233    *
234    * @param callback callback for reading keys and values, calls to {@link
235    *     ReadMapEntryCallback#getKey()} and {@link ReadMapEntryCallback#getValue()} will be in
236    *     alternating orders, i.e. k1, v1, k2, v2, .., k_n, v_n
237    * @param mapCreator map creator given the size hint
238    * @param <K> key type
239    * @param <V> value type
240    * @return the map read
241    * @throws TException when any sub-operation fails
242    */
readMap( ReadMapEntryCallback<K, V> callback, IntFunction<Map<K, V>> mapCreator)243   public final <K, V> Map<K, V> readMap(
244       ReadMapEntryCallback<K, V> callback, IntFunction<Map<K, V>> mapCreator) throws TException {
245     return readMap(
246         tMap -> {
247           Map<K, V> map = mapCreator.apply(tMap.size);
248           for (int i = 0; i < tMap.size; i += 1) {
249             map.put(callback.getKey(), callback.getValue());
250           }
251           return map;
252         });
253   }
254 
255   /**
256    * read a {@link List} by delegating to the callback, handles {@link #readListBegin() begin} and
257    * {@link #readListEnd() end} automatically.
258    *
259    * @param callback callback for reading the list
260    * @param <T> result list type
261    * @return the list read
262    * @throws TException when any sub-operation fails
263    */
readList(ReadCallback<TList, T> callback)264   public final <T extends List<?>> T readList(ReadCallback<TList, T> callback) throws TException {
265     TList tList = readListBegin();
266     T t = callback.accept(tList);
267     readListEnd();
268     return t;
269   }
270 
271   /**
272    * read a {@link List} by delegating element reading to the callback, handles {@link
273    * #readListBegin() begin} and {@link #readListEnd() end} automatically.
274    *
275    * @param callback callback for reading one element
276    * @param <T> element type
277    * @return list of elements read
278    * @throws TException when any sub-operation fails
279    */
readList(ReadCollectionCallback<T> callback)280   public final <T> List<T> readList(ReadCollectionCallback<T> callback) throws TException {
281     return readList(callback, ArrayList::new);
282   }
283 
284   /**
285    * read a {@link List} by delegating element reading to the callback, handles {@link
286    * #readListBegin() begin} and {@link #readListEnd() end} automatically, with a specialized list
287    * creator given the size hint.
288    *
289    * @param callback callback for reading one element
290    * @param listCreator list creator given size hint
291    * @param <T> element type
292    * @return list of elements read
293    * @throws TException when any sub-operation fails
294    */
readList( ReadCollectionCallback<T> callback, IntFunction<List<T>> listCreator)295   public final <T> List<T> readList(
296       ReadCollectionCallback<T> callback, IntFunction<List<T>> listCreator) throws TException {
297     return readList(
298         tList -> {
299           List<T> list = listCreator.apply(tList.size);
300           for (int i = 0; i < tList.size; i += 1) {
301             list.add(callback.call());
302           }
303           return list;
304         });
305   }
306 
307   /**
308    * read a {@link Set} of elements by delegating to the callback, handles {@link #readSetBegin()
309    * begin} and {@link #readSetEnd() end} automatically
310    *
311    * @param callback callback for reading the set
312    * @param <T> result set type
313    * @return the set read
314    * @throws TException when any sub-operation fails
315    */
316   public final <T extends Set<?>> T readSet(ReadCallback<TSet, T> callback) throws TException {
317     TSet tSet = readSetBegin();
318     T t = callback.accept(tSet);
319     readSetEnd();
320     return t;
321   }
322 
323   /**
324    * read a {@link Set} of elements by delegating element reading to the callback, handles {@link
325    * #readSetBegin() begin} and {@link #readSetEnd() end} automatically
326    *
327    * @param callback callback for reading one element
328    * @param <T> element type
329    * @return set of elements read
330    * @throws TException when any sub-operation fails
331    */
332   public final <T> Set<T> readSet(ReadCollectionCallback<T> callback) throws TException {
333     return readSet(callback, HashSet::new);
334   }
335 
336   /**
337    * read a {@link Set} of elements by delegating element reading to the callback, handles {@link
338    * #readSetBegin() begin} and {@link #readSetEnd() end} automatically, with a specialized set
339    * creator given the size hint.
340    *
341    * @param callback callback for reading one elment
342    * @param setCreator set creator given size hint
343    * @param <T> element type
344    * @return set of elements read
345    * @throws TException when any sub-operation fails
346    */
347   public final <T> Set<T> readSet(
348       ReadCollectionCallback<T> callback, IntFunction<Set<T>> setCreator) throws TException {
349     return readSet(
350         tSet -> {
351           Set<T> set = setCreator.apply(tSet.size);
352           for (int i = 0; i < tSet.size; i += 1) {
353             set.add(callback.call());
354           }
355           return set;
356         });
357   }
358 
359   /**
360    * Reset any internal state back to a blank slate. This method only needs to be implemented for
361    * stateful protocols.
362    */
363   public void reset() {}
364 
365   /** Scheme accessor */
366   public Class<? extends IScheme> getScheme() {
367     return StandardScheme.class;
368   }
369 
370   // -----------------------------------------------------------------
371   // Additional methods to improve performance.
372 
373   public int readFieldBeginData() throws TException {
374     // Derived classes should provide a more efficient version of this
375     // method if allowed by the encoding used by that protocol.
376     TField tfield = this.readFieldBegin();
377     return TFieldData.encode(tfield.type, tfield.id);
378   }
379 
380   public void skip(byte fieldType) throws TException {
381     this.skip(fieldType, Integer.MAX_VALUE);
382   }
383 
384   public void skip(byte fieldType, int maxDepth) throws TException {
385     if (maxDepth <= 0) {
386       throw new TException("Maximum skip depth exceeded");
387     }
388 
389     switch (fieldType) {
390       case TType.BOOL:
391         this.skipBool();
392         break;
393 
394       case TType.BYTE:
395         this.skipByte();
396         break;
397 
398       case TType.I16:
399         this.skipI16();
400         break;
401 
402       case TType.I32:
403         this.skipI32();
404         break;
405 
406       case TType.I64:
407         this.skipI64();
408         break;
409 
410       case TType.DOUBLE:
411         this.skipDouble();
412         break;
413 
414       case TType.STRING:
415         this.skipBinary();
416         break;
417 
418       case TType.STRUCT:
419         this.readStructBegin();
420         while (true) {
421           int tfieldData = this.readFieldBeginData();
422           byte tfieldType = TFieldData.getType(tfieldData);
423           if (tfieldType == TType.STOP) {
424             break;
425           }
426           this.skip(tfieldType, maxDepth - 1);
427           this.readFieldEnd();
428         }
429         this.readStructEnd();
430         break;
431 
432       case TType.MAP:
433         TMap map = this.readMapBegin();
434         for (int i = 0; i < map.size; i++) {
435           this.skip(map.keyType, maxDepth - 1);
436           this.skip(map.valueType, maxDepth - 1);
437         }
438         this.readMapEnd();
439         break;
440 
441       case TType.SET:
442         TSet set = this.readSetBegin();
443         for (int i = 0; i < set.size; i++) {
444           this.skip(set.elemType, maxDepth - 1);
445         }
446         this.readSetEnd();
447         break;
448 
449       case TType.LIST:
450         TList list = this.readListBegin();
451         for (int i = 0; i < list.size; i++) {
452           this.skip(list.elemType, maxDepth - 1);
453         }
454         this.readListEnd();
455         break;
456 
457       default:
458         throw new TProtocolException(
459             TProtocolException.INVALID_DATA, "Unrecognized type " + fieldType);
460     }
461   }
462 
463   /**
464    * The default implementation of all skip() methods calls the corresponding read() method.
465    * Protocols that derive from this class are strongly encouraged to provide a more efficient
466    * alternative.
467    */
468   protected void skipBool() throws TException {
469     this.readBool();
470   }
471 
472   protected void skipByte() throws TException {
473     this.readByte();
474   }
475 
476   protected void skipI16() throws TException {
477     this.readI16();
478   }
479 
480   protected void skipI32() throws TException {
481     this.readI32();
482   }
483 
484   protected void skipI64() throws TException {
485     this.readI64();
486   }
487 
488   protected void skipDouble() throws TException {
489     this.readDouble();
490   }
491 
492   protected void skipBinary() throws TException {
493     this.readBinary();
494   }
495 
496   static final int MAX_SKIPPED_BYTES = 256;
497   protected byte[] skippedBytes = new byte[MAX_SKIPPED_BYTES];
498 
499   protected void skipBytes(int numBytes) throws TException {
500     if (numBytes <= MAX_SKIPPED_BYTES) {
501       if (this.getTransport().getBytesRemainingInBuffer() >= numBytes) {
502         this.getTransport().consumeBuffer(numBytes);
503       } else {
504         this.getTransport().readAll(skippedBytes, 0, numBytes);
505       }
506     } else {
507       int remaining = numBytes;
508       while (remaining > 0) {
509         skipBytes(Math.min(remaining, MAX_SKIPPED_BYTES));
510         remaining -= MAX_SKIPPED_BYTES;
511       }
512     }
513   }
514 }
515