1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.thrift.protocol; 21 22 import java.util.ArrayList; 23 import java.util.HashMap; 24 import java.util.HashSet; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 import java.util.function.IntFunction; 29 import org.apache.thrift.TException; 30 import org.apache.thrift.partial.TFieldData; 31 import org.apache.thrift.scheme.IScheme; 32 import org.apache.thrift.scheme.StandardScheme; 33 import org.apache.thrift.transport.TTransport; 34 35 /** Protocol interface definition. */ 36 public abstract class TProtocol implements TWriteProtocol, TReadProtocol { 37 38 /** Prevent direct instantiation */ 39 @SuppressWarnings("unused") TProtocol()40 private TProtocol() {} 41 42 /** Transport */ 43 protected TTransport trans_; 44 45 /** Constructor */ TProtocol(TTransport trans)46 protected TProtocol(TTransport trans) { 47 trans_ = trans; 48 } 49 50 /** Transport accessor */ getTransport()51 public TTransport getTransport() { 52 return trans_; 53 } 54 checkReadBytesAvailable(TMap map)55 protected void checkReadBytesAvailable(TMap map) throws TException { 56 long elemSize = getMinSerializedSize(map.keyType) + getMinSerializedSize(map.valueType); 57 trans_.checkReadBytesAvailable(map.size * elemSize); 58 } 59 checkReadBytesAvailable(TList list)60 protected void checkReadBytesAvailable(TList list) throws TException { 61 long size = list.getSize(); 62 trans_.checkReadBytesAvailable(size * getMinSerializedSize(list.elemType)); 63 } 64 checkReadBytesAvailable(TSet set)65 protected void checkReadBytesAvailable(TSet set) throws TException { 66 long size = set.getSize(); 67 trans_.checkReadBytesAvailable(size * getMinSerializedSize(set.elemType)); 68 } 69 70 /** 71 * Return min serialized size in bytes 72 * 73 * @param type Returns the minimum amount of bytes needed to store the smallest possible instance 74 * of TType. 75 * @return min serialized size 76 * @throws TException when error happens 77 */ getMinSerializedSize(byte type)78 public abstract int getMinSerializedSize(byte type) throws TException; 79 80 public interface WriteCallback<T> { call(T e)81 void call(T e) throws TException; 82 } 83 84 public interface ReadCallback<T, R> { accept(T t)85 R accept(T t) throws TException; 86 } 87 88 public interface ReadCollectionCallback<R> { call()89 R call() throws TException; 90 } 91 92 public interface ReadMapEntryCallback<K, V> { getKey()93 K getKey() throws TException; 94 getValue()95 V getValue() throws TException; 96 } 97 writeSet(byte elementType, Set<T> set, WriteCallback<T> callback)98 public final <T> void writeSet(byte elementType, Set<T> set, WriteCallback<T> callback) 99 throws TException { 100 writeSetBegin(new TSet(elementType, set.size())); 101 for (T t : set) { 102 callback.call(t); 103 } 104 writeSetEnd(); 105 } 106 writeList(byte elementType, List<T> list, WriteCallback<T> callback)107 public final <T> void writeList(byte elementType, List<T> list, WriteCallback<T> callback) 108 throws TException { 109 writeListBegin(new TList(elementType, list.size())); 110 for (T t : list) { 111 callback.call(t); 112 } 113 writeListEnd(); 114 } 115 writeMap( byte keyType, byte valueType, Map<K, V> map, WriteCallback<Map.Entry<K, V>> callback)116 public final <K, V> void writeMap( 117 byte keyType, byte valueType, Map<K, V> map, WriteCallback<Map.Entry<K, V>> callback) 118 throws TException { 119 writeMapBegin(new TMap(keyType, valueType, map.size())); 120 for (Map.Entry<K, V> entry : map.entrySet()) { 121 callback.call(entry); 122 } 123 writeMapEnd(); 124 } 125 writeField(TField field, WriteCallback<Void> callback)126 public final void writeField(TField field, WriteCallback<Void> callback) throws TException { 127 writeFieldBegin(field); 128 callback.call(null); 129 writeFieldEnd(); 130 } 131 writeStruct(TStruct struct, WriteCallback<Void> callback)132 public final void writeStruct(TStruct struct, WriteCallback<Void> callback) throws TException { 133 writeStructBegin(struct); 134 callback.call(null); 135 writeStructEnd(); 136 } 137 writeMessage(TMessage message, WriteCallback<Void> callback)138 public final void writeMessage(TMessage message, WriteCallback<Void> callback) throws TException { 139 writeMessageBegin(message); 140 callback.call(null); 141 writeMessageEnd(); 142 } 143 144 /** 145 * read a message by delegating to a callback, handles {@link #readMessageBegin() begin} and 146 * {@link #readMessageEnd() end} automatically. 147 * 148 * @param callback callback for actual reading 149 * @param <T> result message type 150 * @return the message read 151 * @throws TException when any sub-operation failed 152 */ readMessage(ReadCallback<TMessage, T> callback)153 public final <T> T readMessage(ReadCallback<TMessage, T> callback) throws TException { 154 TMessage tMessage = readMessageBegin(); 155 T t = callback.accept(tMessage); 156 readMessageEnd(); 157 return t; 158 } 159 160 /** 161 * read a struct by delegating to a callback, handles {@link #readStructBegin() begin} and {@link 162 * #readStructEnd() end} automatically. 163 * 164 * @param callback callback for actual reading 165 * @param <T> result struct type 166 * @return the struct read 167 * @throws TException when any sub-operation failed 168 */ readStruct(ReadCallback<TStruct, T> callback)169 public final <T> T readStruct(ReadCallback<TStruct, T> callback) throws TException { 170 TStruct tStruct = readStructBegin(); 171 T t = callback.accept(tStruct); 172 readStructEnd(); 173 return t; 174 } 175 176 /** 177 * read a field by delegating to a callback, handles {@link #readFieldBegin() begin} and {@link 178 * #readFieldEnd() end} automatically, and returns whether the {@link TType#STOP stop signal} was 179 * encountered. Because the value is not returned, you (the compiler generated code in most cases) 180 * are expected to set the field yourself within the callback. 181 * 182 * @param callback callback for reading a field 183 * @param <T> result field type 184 * @return true if a stop signal was encountered, false otherwise 185 * @throws Exception when any sub-operation failed 186 */ readField(ReadCallback<TField, T> callback)187 public final <T> boolean readField(ReadCallback<TField, T> callback) throws Exception { 188 TField tField = readFieldBegin(); 189 if (tField.type == org.apache.thrift.protocol.TType.STOP) { 190 return true; 191 } 192 callback.accept(tField); 193 readFieldEnd(); 194 return false; 195 } 196 197 /** 198 * read a {@link Map} of elements by delegating to the callback, handles {@link #readMapBegin() 199 * begin} and {@link #readMapEnd() end} automatically. 200 * 201 * @param callback callback for reading the map 202 * @param <T> result map type 203 * @return the map read 204 * @throws TException when any sub-operation fails 205 */ readMap(ReadCallback<TMap, T> callback)206 public final <T extends Map<?, ?>> T readMap(ReadCallback<TMap, T> callback) throws TException { 207 TMap tMap = readMapBegin(); 208 T t = callback.accept(tMap); 209 readMapEnd(); 210 return t; 211 } 212 213 /** 214 * read a {@link Map} of elements by delegating key and value reading to the callback, handles 215 * {@link #readMapBegin() begin} and {@link #readMapEnd() end} automatically. 216 * 217 * @param callback callback for reading keys and values, calls to {@link 218 * ReadMapEntryCallback#getKey()} and {@link ReadMapEntryCallback#getValue()} will be in 219 * alternating orders, i.e. k1, v1, k2, v2, .., k_n, v_n 220 * @param <K> key type 221 * @param <V> value type 222 * @return the map read 223 * @throws TException when any sub-operation fails 224 */ readMap(ReadMapEntryCallback<K, V> callback)225 public final <K, V> Map<K, V> readMap(ReadMapEntryCallback<K, V> callback) throws TException { 226 return readMap(callback, HashMap::new); 227 } 228 229 /** 230 * read a {@link Map} of elements by delegating key and value reading to the callback, handles 231 * {@link #readMapBegin() begin} and {@link #readMapEnd() end} automatically, with a specialized 232 * map creator given the size hint. 233 * 234 * @param callback callback for reading keys and values, calls to {@link 235 * ReadMapEntryCallback#getKey()} and {@link ReadMapEntryCallback#getValue()} will be in 236 * alternating orders, i.e. k1, v1, k2, v2, .., k_n, v_n 237 * @param mapCreator map creator given the size hint 238 * @param <K> key type 239 * @param <V> value type 240 * @return the map read 241 * @throws TException when any sub-operation fails 242 */ readMap( ReadMapEntryCallback<K, V> callback, IntFunction<Map<K, V>> mapCreator)243 public final <K, V> Map<K, V> readMap( 244 ReadMapEntryCallback<K, V> callback, IntFunction<Map<K, V>> mapCreator) throws TException { 245 return readMap( 246 tMap -> { 247 Map<K, V> map = mapCreator.apply(tMap.size); 248 for (int i = 0; i < tMap.size; i += 1) { 249 map.put(callback.getKey(), callback.getValue()); 250 } 251 return map; 252 }); 253 } 254 255 /** 256 * read a {@link List} by delegating to the callback, handles {@link #readListBegin() begin} and 257 * {@link #readListEnd() end} automatically. 258 * 259 * @param callback callback for reading the list 260 * @param <T> result list type 261 * @return the list read 262 * @throws TException when any sub-operation fails 263 */ readList(ReadCallback<TList, T> callback)264 public final <T extends List<?>> T readList(ReadCallback<TList, T> callback) throws TException { 265 TList tList = readListBegin(); 266 T t = callback.accept(tList); 267 readListEnd(); 268 return t; 269 } 270 271 /** 272 * read a {@link List} by delegating element reading to the callback, handles {@link 273 * #readListBegin() begin} and {@link #readListEnd() end} automatically. 274 * 275 * @param callback callback for reading one element 276 * @param <T> element type 277 * @return list of elements read 278 * @throws TException when any sub-operation fails 279 */ readList(ReadCollectionCallback<T> callback)280 public final <T> List<T> readList(ReadCollectionCallback<T> callback) throws TException { 281 return readList(callback, ArrayList::new); 282 } 283 284 /** 285 * read a {@link List} by delegating element reading to the callback, handles {@link 286 * #readListBegin() begin} and {@link #readListEnd() end} automatically, with a specialized list 287 * creator given the size hint. 288 * 289 * @param callback callback for reading one element 290 * @param listCreator list creator given size hint 291 * @param <T> element type 292 * @return list of elements read 293 * @throws TException when any sub-operation fails 294 */ readList( ReadCollectionCallback<T> callback, IntFunction<List<T>> listCreator)295 public final <T> List<T> readList( 296 ReadCollectionCallback<T> callback, IntFunction<List<T>> listCreator) throws TException { 297 return readList( 298 tList -> { 299 List<T> list = listCreator.apply(tList.size); 300 for (int i = 0; i < tList.size; i += 1) { 301 list.add(callback.call()); 302 } 303 return list; 304 }); 305 } 306 307 /** 308 * read a {@link Set} of elements by delegating to the callback, handles {@link #readSetBegin() 309 * begin} and {@link #readSetEnd() end} automatically 310 * 311 * @param callback callback for reading the set 312 * @param <T> result set type 313 * @return the set read 314 * @throws TException when any sub-operation fails 315 */ 316 public final <T extends Set<?>> T readSet(ReadCallback<TSet, T> callback) throws TException { 317 TSet tSet = readSetBegin(); 318 T t = callback.accept(tSet); 319 readSetEnd(); 320 return t; 321 } 322 323 /** 324 * read a {@link Set} of elements by delegating element reading to the callback, handles {@link 325 * #readSetBegin() begin} and {@link #readSetEnd() end} automatically 326 * 327 * @param callback callback for reading one element 328 * @param <T> element type 329 * @return set of elements read 330 * @throws TException when any sub-operation fails 331 */ 332 public final <T> Set<T> readSet(ReadCollectionCallback<T> callback) throws TException { 333 return readSet(callback, HashSet::new); 334 } 335 336 /** 337 * read a {@link Set} of elements by delegating element reading to the callback, handles {@link 338 * #readSetBegin() begin} and {@link #readSetEnd() end} automatically, with a specialized set 339 * creator given the size hint. 340 * 341 * @param callback callback for reading one elment 342 * @param setCreator set creator given size hint 343 * @param <T> element type 344 * @return set of elements read 345 * @throws TException when any sub-operation fails 346 */ 347 public final <T> Set<T> readSet( 348 ReadCollectionCallback<T> callback, IntFunction<Set<T>> setCreator) throws TException { 349 return readSet( 350 tSet -> { 351 Set<T> set = setCreator.apply(tSet.size); 352 for (int i = 0; i < tSet.size; i += 1) { 353 set.add(callback.call()); 354 } 355 return set; 356 }); 357 } 358 359 /** 360 * Reset any internal state back to a blank slate. This method only needs to be implemented for 361 * stateful protocols. 362 */ 363 public void reset() {} 364 365 /** Scheme accessor */ 366 public Class<? extends IScheme> getScheme() { 367 return StandardScheme.class; 368 } 369 370 // ----------------------------------------------------------------- 371 // Additional methods to improve performance. 372 373 public int readFieldBeginData() throws TException { 374 // Derived classes should provide a more efficient version of this 375 // method if allowed by the encoding used by that protocol. 376 TField tfield = this.readFieldBegin(); 377 return TFieldData.encode(tfield.type, tfield.id); 378 } 379 380 public void skip(byte fieldType) throws TException { 381 this.skip(fieldType, Integer.MAX_VALUE); 382 } 383 384 public void skip(byte fieldType, int maxDepth) throws TException { 385 if (maxDepth <= 0) { 386 throw new TException("Maximum skip depth exceeded"); 387 } 388 389 switch (fieldType) { 390 case TType.BOOL: 391 this.skipBool(); 392 break; 393 394 case TType.BYTE: 395 this.skipByte(); 396 break; 397 398 case TType.I16: 399 this.skipI16(); 400 break; 401 402 case TType.I32: 403 this.skipI32(); 404 break; 405 406 case TType.I64: 407 this.skipI64(); 408 break; 409 410 case TType.DOUBLE: 411 this.skipDouble(); 412 break; 413 414 case TType.STRING: 415 this.skipBinary(); 416 break; 417 418 case TType.STRUCT: 419 this.readStructBegin(); 420 while (true) { 421 int tfieldData = this.readFieldBeginData(); 422 byte tfieldType = TFieldData.getType(tfieldData); 423 if (tfieldType == TType.STOP) { 424 break; 425 } 426 this.skip(tfieldType, maxDepth - 1); 427 this.readFieldEnd(); 428 } 429 this.readStructEnd(); 430 break; 431 432 case TType.MAP: 433 TMap map = this.readMapBegin(); 434 for (int i = 0; i < map.size; i++) { 435 this.skip(map.keyType, maxDepth - 1); 436 this.skip(map.valueType, maxDepth - 1); 437 } 438 this.readMapEnd(); 439 break; 440 441 case TType.SET: 442 TSet set = this.readSetBegin(); 443 for (int i = 0; i < set.size; i++) { 444 this.skip(set.elemType, maxDepth - 1); 445 } 446 this.readSetEnd(); 447 break; 448 449 case TType.LIST: 450 TList list = this.readListBegin(); 451 for (int i = 0; i < list.size; i++) { 452 this.skip(list.elemType, maxDepth - 1); 453 } 454 this.readListEnd(); 455 break; 456 457 default: 458 throw new TProtocolException( 459 TProtocolException.INVALID_DATA, "Unrecognized type " + fieldType); 460 } 461 } 462 463 /** 464 * The default implementation of all skip() methods calls the corresponding read() method. 465 * Protocols that derive from this class are strongly encouraged to provide a more efficient 466 * alternative. 467 */ 468 protected void skipBool() throws TException { 469 this.readBool(); 470 } 471 472 protected void skipByte() throws TException { 473 this.readByte(); 474 } 475 476 protected void skipI16() throws TException { 477 this.readI16(); 478 } 479 480 protected void skipI32() throws TException { 481 this.readI32(); 482 } 483 484 protected void skipI64() throws TException { 485 this.readI64(); 486 } 487 488 protected void skipDouble() throws TException { 489 this.readDouble(); 490 } 491 492 protected void skipBinary() throws TException { 493 this.readBinary(); 494 } 495 496 static final int MAX_SKIPPED_BYTES = 256; 497 protected byte[] skippedBytes = new byte[MAX_SKIPPED_BYTES]; 498 499 protected void skipBytes(int numBytes) throws TException { 500 if (numBytes <= MAX_SKIPPED_BYTES) { 501 if (this.getTransport().getBytesRemainingInBuffer() >= numBytes) { 502 this.getTransport().consumeBuffer(numBytes); 503 } else { 504 this.getTransport().readAll(skippedBytes, 0, numBytes); 505 } 506 } else { 507 int remaining = numBytes; 508 while (remaining > 0) { 509 skipBytes(Math.min(remaining, MAX_SKIPPED_BYTES)); 510 remaining -= MAX_SKIPPED_BYTES; 511 } 512 } 513 } 514 } 515