1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/processor/PeekProcessor.h>
21
22 using namespace apache::thrift::transport;
23 using namespace apache::thrift::protocol;
24 using namespace apache::thrift;
25
26 namespace apache {
27 namespace thrift {
28 namespace processor {
29
PeekProcessor()30 PeekProcessor::PeekProcessor() {
31 memoryBuffer_.reset(new TMemoryBuffer());
32 targetTransport_ = memoryBuffer_;
33 }
34 PeekProcessor::~PeekProcessor() = default;
35
initialize(std::shared_ptr<TProcessor> actualProcessor,std::shared_ptr<TProtocolFactory> protocolFactory,std::shared_ptr<TPipedTransportFactory> transportFactory)36 void PeekProcessor::initialize(std::shared_ptr<TProcessor> actualProcessor,
37 std::shared_ptr<TProtocolFactory> protocolFactory,
38 std::shared_ptr<TPipedTransportFactory> transportFactory) {
39 actualProcessor_ = actualProcessor;
40 pipedProtocol_ = protocolFactory->getProtocol(targetTransport_);
41 transportFactory_ = transportFactory;
42 transportFactory_->initializeTargetTransport(targetTransport_);
43 }
44
getPipedTransport(std::shared_ptr<TTransport> in)45 std::shared_ptr<TTransport> PeekProcessor::getPipedTransport(std::shared_ptr<TTransport> in) {
46 return transportFactory_->getTransport(in);
47 }
48
setTargetTransport(std::shared_ptr<TTransport> targetTransport)49 void PeekProcessor::setTargetTransport(std::shared_ptr<TTransport> targetTransport) {
50 targetTransport_ = targetTransport;
51 if (std::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) {
52 memoryBuffer_ = std::dynamic_pointer_cast<TMemoryBuffer>(targetTransport);
53 } else if (std::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) {
54 memoryBuffer_ = std::dynamic_pointer_cast<TMemoryBuffer>(
55 std::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport());
56 }
57
58 if (!memoryBuffer_) {
59 throw TException(
60 "Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer");
61 }
62 }
63
process(std::shared_ptr<TProtocol> in,std::shared_ptr<TProtocol> out,void * connectionContext)64 bool PeekProcessor::process(std::shared_ptr<TProtocol> in,
65 std::shared_ptr<TProtocol> out,
66 void* connectionContext) {
67
68 std::string fname;
69 TMessageType mtype;
70 int32_t seqid;
71 in->readMessageBegin(fname, mtype, seqid);
72
73 if (mtype != T_CALL && mtype != T_ONEWAY) {
74 throw TException("Unexpected message type");
75 }
76
77 // Peek at the name
78 peekName(fname);
79
80 TType ftype;
81 int16_t fid;
82 while (true) {
83 in->readFieldBegin(fname, ftype, fid);
84 if (ftype == T_STOP) {
85 break;
86 }
87
88 // Peek at the variable
89 peek(in, ftype, fid);
90 in->readFieldEnd();
91 }
92 in->readMessageEnd();
93 in->getTransport()->readEnd();
94
95 //
96 // All the data is now in memoryBuffer_ and ready to be processed
97 //
98
99 // Let's first take a peek at the full data in memory
100 uint8_t* buffer;
101 uint32_t size;
102 memoryBuffer_->getBuffer(&buffer, &size);
103 peekBuffer(buffer, size);
104
105 // Done peeking at variables
106 peekEnd();
107
108 bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext);
109 memoryBuffer_->resetBuffer();
110 return ret;
111 }
112
peekName(const std::string & fname)113 void PeekProcessor::peekName(const std::string& fname) {
114 (void)fname;
115 }
116
peekBuffer(uint8_t * buffer,uint32_t size)117 void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) {
118 (void)buffer;
119 (void)size;
120 }
121
peek(std::shared_ptr<TProtocol> in,TType ftype,int16_t fid)122 void PeekProcessor::peek(std::shared_ptr<TProtocol> in, TType ftype, int16_t fid) {
123 (void)fid;
124 in->skip(ftype);
125 }
126
peekEnd()127 void PeekProcessor::peekEnd() {
128 }
129 }
130 }
131 }
132