1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 package org.apache.thrift;
20 
21 import java.util.Collections;
22 import java.util.Map;
23 import org.apache.thrift.async.AsyncMethodCallback;
24 import org.apache.thrift.protocol.*;
25 import org.apache.thrift.server.AbstractNonblockingServer.*;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
29 public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor {
30   protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
31 
32   final I iface;
33   final Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap;
34 
TBaseAsyncProcessor( I iface, Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap)35   public TBaseAsyncProcessor(
36       I iface, Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap) {
37     this.iface = iface;
38     this.processMap = processMap;
39   }
40 
getProcessMapView()41   public Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> getProcessMapView() {
42     return Collections.unmodifiableMap(processMap);
43   }
44 
process(final AsyncFrameBuffer fb)45   public void process(final AsyncFrameBuffer fb) throws TException {
46 
47     final TProtocol in = fb.getInputProtocol();
48     final TProtocol out = fb.getOutputProtocol();
49 
50     // Find processing function
51     final TMessage msg = in.readMessageBegin();
52     AsyncProcessFunction fn = processMap.get(msg.name);
53     if (fn == null) {
54       TProtocolUtil.skip(in, TType.STRUCT);
55       in.readMessageEnd();
56 
57       TApplicationException x =
58           new TApplicationException(
59               TApplicationException.UNKNOWN_METHOD, "Invalid method name: '" + msg.name + "'");
60       LOGGER.debug("Invalid method name", x);
61 
62       // this means it is a two-way request, so we can send a reply
63       if (msg.type == TMessageType.CALL) {
64         out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
65         x.write(out);
66         out.writeMessageEnd();
67         out.getTransport().flush();
68       }
69       fb.responseReady();
70       return;
71     }
72 
73     // Get Args
74     TBase args = fn.getEmptyArgsInstance();
75 
76     try {
77       args.read(in);
78     } catch (TProtocolException e) {
79       in.readMessageEnd();
80 
81       TApplicationException x =
82           new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
83       LOGGER.debug("Could not retrieve function arguments", x);
84 
85       if (!fn.isOneway()) {
86         out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
87         x.write(out);
88         out.writeMessageEnd();
89         out.getTransport().flush();
90       }
91       fb.responseReady();
92       return;
93     }
94     in.readMessageEnd();
95 
96     if (fn.isOneway()) {
97       fb.responseReady();
98     }
99 
100     // start off processing function
101     AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid);
102     try {
103       fn.start(iface, args, resultHandler);
104     } catch (Exception e) {
105       LOGGER.debug("Exception handling function", e);
106       resultHandler.onError(e);
107     }
108     return;
109   }
110 
111   @Override
process(TProtocol in, TProtocol out)112   public void process(TProtocol in, TProtocol out) throws TException {}
113 }
114