1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 #ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_
20 #define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1
21 
22 #include <thrift/protocol/TProtocol.h>
23 #include <thrift/concurrency/Mutex.h>
24 #include <thrift/concurrency/Monitor.h>
25 #include <memory>
26 #include <vector>
27 #include <string>
28 #include <map>
29 
30 namespace apache {
31 namespace thrift {
32 namespace async {
33 
34 class TConcurrentClientSyncInfo;
35 
36 class TConcurrentSendSentry {
37 public:
38   explicit TConcurrentSendSentry(TConcurrentClientSyncInfo* sync);
39   ~TConcurrentSendSentry();
40 
41   void commit();
42 
43 private:
44   TConcurrentClientSyncInfo& sync_;
45   bool committed_;
46 };
47 
48 class TConcurrentRecvSentry {
49 public:
50   TConcurrentRecvSentry(TConcurrentClientSyncInfo* sync, int32_t seqid);
51   ~TConcurrentRecvSentry();
52 
53   void commit();
54 
55 private:
56   TConcurrentClientSyncInfo& sync_;
57   int32_t seqid_;
58   bool committed_;
59 };
60 
61 class TConcurrentClientSyncInfo {
62 private: // typedefs
63   typedef std::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr;
64   typedef std::map<int32_t, MonitorPtr> MonitorMap;
65 
66 public:
67   TConcurrentClientSyncInfo();
68 
69   int32_t generateSeqId();
70 
71   bool getPending(std::string& fname,
72                   ::apache::thrift::protocol::TMessageType& mtype,
73                   int32_t& rseqid); /* requires readMutex_ */
74 
75   void updatePending(const std::string& fname,
76                      ::apache::thrift::protocol::TMessageType mtype,
77                      int32_t rseqid); /* requires readMutex_ */
78 
79   void waitForWork(int32_t seqid); /* requires readMutex_ */
80 
getReadMutex()81   ::apache::thrift::concurrency::Mutex& getReadMutex() { return readMutex_; }
getWriteMutex()82   ::apache::thrift::concurrency::Mutex& getWriteMutex() { return writeMutex_; }
83 
84 private: // constants
85   enum { MONITOR_CACHE_SIZE = 10 };
86 
87 private: // functions
88   MonitorPtr newMonitor_(
89       const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */
90   void deleteMonitor_(const ::apache::thrift::concurrency::Guard& seqidGuard, MonitorPtr& m);
91       /*noexcept*/ /* requires seqidMutex_ */
92   void wakeupAnyone_(
93       const ::apache::thrift::concurrency::Guard& seqidGuard);           /* requires seqidMutex_ */
94   void markBad_(const ::apache::thrift::concurrency::Guard& seqidGuard); /* requires seqidMutex_ */
95   void throwBadSeqId_();
96   void throwDeadConnection_();
97 
98 private: // data members
99   volatile bool stop_;
100 
101   ::apache::thrift::concurrency::Mutex seqidMutex_;
102   // begin seqidMutex_ protected members
103   int32_t nextseqid_;
104   MonitorMap seqidToMonitorMap_;
105   std::vector<MonitorPtr> freeMonitors_;
106   // end seqidMutex_ protected members
107 
108   ::apache::thrift::concurrency::Mutex writeMutex_;
109 
110   ::apache::thrift::concurrency::Mutex readMutex_;
111   // begin readMutex_ protected members
112   bool recvPending_;
113   bool wakeupSomeone_;
114   int32_t seqidPending_;
115   std::string fnamePending_;
116   ::apache::thrift::protocol::TMessageType mtypePending_;
117   // end readMutex_ protected members
118 
119   friend class TConcurrentSendSentry;
120   friend class TConcurrentRecvSentry;
121 };
122 }
123 }
124 } // apache::thrift::async
125 
126 #endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_
127