1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 #ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 20 #define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1 21 22 #include <thrift/protocol/TProtocol.h> 23 #include <thrift/concurrency/Mutex.h> 24 #include <thrift/concurrency/Monitor.h> 25 #include <memory> 26 #include <vector> 27 #include <string> 28 #include <map> 29 30 namespace apache { 31 namespace thrift { 32 namespace async { 33 34 class TConcurrentClientSyncInfo; 35 36 class TConcurrentSendSentry { 37 public: 38 explicit TConcurrentSendSentry(TConcurrentClientSyncInfo* sync); 39 ~TConcurrentSendSentry(); 40 41 void commit(); 42 43 private: 44 TConcurrentClientSyncInfo& sync_; 45 bool committed_; 46 }; 47 48 class TConcurrentRecvSentry { 49 public: 50 TConcurrentRecvSentry(TConcurrentClientSyncInfo* sync, int32_t seqid); 51 ~TConcurrentRecvSentry(); 52 53 void commit(); 54 55 private: 56 TConcurrentClientSyncInfo& sync_; 57 int32_t seqid_; 58 bool committed_; 59 }; 60 61 class TConcurrentClientSyncInfo { 62 private: // typedefs 63 typedef std::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr; 64 typedef std::map<int32_t, MonitorPtr> MonitorMap; 65 66 public: 67 TConcurrentClientSyncInfo(); 68 69 int32_t generateSeqId(); 70 71 bool getPending(std::string& fname, 72 ::apache::thrift::protocol::TMessageType& mtype, 73 int32_t& rseqid); /* requires readMutex_ */ 74 75 void updatePending(const std::string& fname, 76 ::apache::thrift::protocol::TMessageType mtype, 77 int32_t rseqid); /* requires readMutex_ */ 78 79 void waitForWork(int32_t seqid); /* requires readMutex_ */ 80 getReadMutex()81 ::apache::thrift::concurrency::Mutex& getReadMutex() { return readMutex_; } getWriteMutex()82 ::apache::thrift::concurrency::Mutex& getWriteMutex() { return writeMutex_; } 83 84 private: // constants 85 enum { MONITOR_CACHE_SIZE = 10 }; 86 87 private: // functions 88 MonitorPtr newMonitor_( 89 const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */ 90 void deleteMonitor_(const ::apache::thrift::concurrency::Guard& seqidGuard, MonitorPtr& m); 91 /*noexcept*/ /* requires seqidMutex_ */ 92 void wakeupAnyone_( 93 const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */ 94 void markBad_(const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */ 95 void throwBadSeqId_(); 96 void throwDeadConnection_(); 97 98 private: // data members 99 volatile bool stop_; 100 101 ::apache::thrift::concurrency::Mutex seqidMutex_; 102 // begin seqidMutex_ protected members 103 int32_t nextseqid_; 104 MonitorMap seqidToMonitorMap_; 105 std::vector<MonitorPtr> freeMonitors_; 106 // end seqidMutex_ protected members 107 108 ::apache::thrift::concurrency::Mutex writeMutex_; 109 110 ::apache::thrift::concurrency::Mutex readMutex_; 111 // begin readMutex_ protected members 112 bool recvPending_; 113 bool wakeupSomeone_; 114 int32_t seqidPending_; 115 std::string fnamePending_; 116 ::apache::thrift::protocol::TMessageType mtypePending_; 117 // end readMutex_ protected members 118 119 friend class TConcurrentSendSentry; 120 friend class TConcurrentRecvSentry; 121 }; 122 } 123 } 124 } // apache::thrift::async 125 126 #endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 127