1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 package org.apache.thrift; 20 21 import java.util.Collections; 22 import java.util.Map; 23 import org.apache.thrift.async.AsyncMethodCallback; 24 import org.apache.thrift.protocol.*; 25 import org.apache.thrift.server.AbstractNonblockingServer.*; 26 import org.slf4j.Logger; 27 import org.slf4j.LoggerFactory; 28 29 public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor { 30 protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); 31 32 final I iface; 33 final Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap; 34 TBaseAsyncProcessor( I iface, Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap)35 public TBaseAsyncProcessor( 36 I iface, Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap) { 37 this.iface = iface; 38 this.processMap = processMap; 39 } 40 getProcessMapView()41 public Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> getProcessMapView() { 42 return Collections.unmodifiableMap(processMap); 43 } 44 process(final AsyncFrameBuffer fb)45 public void process(final AsyncFrameBuffer fb) throws TException { 46 47 final TProtocol in = fb.getInputProtocol(); 48 final TProtocol out = fb.getOutputProtocol(); 49 50 // Find processing function 51 final TMessage msg = in.readMessageBegin(); 52 AsyncProcessFunction fn = processMap.get(msg.name); 53 if (fn == null) { 54 TProtocolUtil.skip(in, TType.STRUCT); 55 in.readMessageEnd(); 56 57 TApplicationException x = 58 new TApplicationException( 59 TApplicationException.UNKNOWN_METHOD, "Invalid method name: '" + msg.name + "'"); 60 LOGGER.debug("Invalid method name", x); 61 62 // this means it is a two-way request, so we can send a reply 63 if (msg.type == TMessageType.CALL) { 64 out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); 65 x.write(out); 66 out.writeMessageEnd(); 67 out.getTransport().flush(); 68 } 69 fb.responseReady(); 70 return; 71 } 72 73 // Get Args 74 TBase args = fn.getEmptyArgsInstance(); 75 76 try { 77 args.read(in); 78 } catch (TProtocolException e) { 79 in.readMessageEnd(); 80 81 TApplicationException x = 82 new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); 83 LOGGER.debug("Could not retrieve function arguments", x); 84 85 if (!fn.isOneway()) { 86 out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); 87 x.write(out); 88 out.writeMessageEnd(); 89 out.getTransport().flush(); 90 } 91 fb.responseReady(); 92 return; 93 } 94 in.readMessageEnd(); 95 96 if (fn.isOneway()) { 97 fb.responseReady(); 98 } 99 100 // start off processing function 101 AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid); 102 try { 103 fn.start(iface, args, resultHandler); 104 } catch (Exception e) { 105 LOGGER.debug("Exception handling function", e); 106 resultHandler.onError(e); 107 } 108 return; 109 } 110 111 @Override process(TProtocol in, TProtocol out)112 public void process(TProtocol in, TProtocol out) throws TException {} 113 } 114