1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 #include <thrift/concurrency/Thread.h>
22 #include <thrift/concurrency/ThreadFactory.h>
23 #include <thrift/concurrency/Monitor.h>
24 #include <thrift/concurrency/Mutex.h>
25 
26 #include <assert.h>
27 #include <iostream>
28 #include <vector>
29 
30 namespace apache {
31 namespace thrift {
32 namespace concurrency {
33 namespace test {
34 
35 using std::shared_ptr;
36 using namespace apache::thrift::concurrency;
37 
38 /**
39  * ThreadManagerTests class
40  *
41  * @version $Id:$
42  */
43 class ThreadFactoryTests {
44 
45 public:
46   /**
47    * Reap N threads
48    */
49   class ReapNTask : public Runnable {
50 
51   public:
ReapNTask(Monitor & monitor,int & activeCount)52     ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {}
53 
run()54     void run() override {
55       Synchronized s(_monitor);
56 
57       if (--_count == 0) {
58         _monitor.notify();
59       }
60     }
61 
62     Monitor& _monitor;
63     int& _count;
64   };
65 
66   bool reapNThreads(int loop = 1, int count = 10) {
67 
68     ThreadFactory threadFactory = ThreadFactory();
69     shared_ptr<Monitor> monitor(new Monitor);
70 
71     for (int lix = 0; lix < loop; lix++) {
72 
73       int activeCount = 0;
74 
75       std::vector<shared_ptr<Thread> > threads;
76       int tix;
77 
78       for (tix = 0; tix < count; tix++) {
79         try {
80           ++activeCount;
81           threads.push_back(
82               threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, activeCount))));
catch(SystemResourceException & e)83         } catch (SystemResourceException& e) {
84           std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what()
85                     << std::endl;
86           throw;
87         }
88       }
89 
90       tix = 0;
91       for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin();
92            thread != threads.end();
93            tix++, ++thread) {
94 
95         try {
96           (*thread)->start();
catch(SystemResourceException & e)97         } catch (SystemResourceException& e) {
98           std::cout << "\t\t\tfailed to start  " << lix* count + tix << " thread " << e.what()
99                     << std::endl;
100           throw;
101         }
102       }
103 
104       {
105         Synchronized s(*monitor);
106         while (activeCount > 0) {
107           monitor->wait(1000);
108         }
109       }
110 
111       std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl;
112     }
113 
114     std::cout << "\t\t\tSuccess!" << std::endl;
115     return true;
116   }
117 
118   class SynchStartTask : public Runnable {
119 
120   public:
121     enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
122 
SynchStartTask(Monitor & monitor,volatile STATE & state)123     SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {}
124 
run()125     void run() override {
126       {
127         Synchronized s(_monitor);
128         if (_state == SynchStartTask::STARTING) {
129           _state = SynchStartTask::STARTED;
130           _monitor.notify();
131         }
132       }
133 
134       {
135         Synchronized s(_monitor);
136         while (_state == SynchStartTask::STARTED) {
137           _monitor.wait();
138         }
139 
140         if (_state == SynchStartTask::STOPPING) {
141           _state = SynchStartTask::STOPPED;
142           _monitor.notifyAll();
143         }
144       }
145     }
146 
147   private:
148     Monitor& _monitor;
149     volatile STATE& _state;
150   };
151 
synchStartTest()152   bool synchStartTest() {
153 
154     Monitor monitor;
155 
156     SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
157 
158     shared_ptr<SynchStartTask> task
159         = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
160 
161     ThreadFactory threadFactory = ThreadFactory();
162 
163     shared_ptr<Thread> thread = threadFactory.newThread(task);
164 
165     if (state == SynchStartTask::UNINITIALIZED) {
166 
167       state = SynchStartTask::STARTING;
168 
169       thread->start();
170     }
171 
172     {
173       Synchronized s(monitor);
174       while (state == SynchStartTask::STARTING) {
175         monitor.wait();
176       }
177     }
178 
179     assert(state != SynchStartTask::STARTING);
180 
181     {
182       Synchronized s(monitor);
183 
184       try {
185         monitor.wait(100);
186       } catch (TimedOutException&) {
187       }
188 
189       if (state == SynchStartTask::STARTED) {
190 
191         state = SynchStartTask::STOPPING;
192 
193         monitor.notify();
194       }
195 
196       while (state == SynchStartTask::STOPPING) {
197         monitor.wait();
198       }
199     }
200 
201     assert(state == SynchStartTask::STOPPED);
202 
203     bool success = true;
204 
205     std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
206 
207     return true;
208   }
209 
210   /**
211    * The only guarantee a monitor timeout can give you is that
212    * it will take "at least" as long as the timeout, no less.
213    * There is absolutely no guarantee around regaining execution
214    * near the timeout.  On a busy system (like inside a third party
215    * CI environment) it could take quite a bit longer than the
216    * requested timeout, and that's ok.
217    */
218 
219   bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) {
220 
221     Monitor monitor;
222 
223     int64_t startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
224 
225     for (int64_t ix = 0; ix < count; ix++) {
226       {
227         Synchronized s(monitor);
228         try {
229           monitor.wait(timeout);
catch(TimedOutException &)230         } catch (TimedOutException&) {
231         }
232       }
233     }
234 
235     int64_t endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
236 
237   bool success = (endTime - startTime) >= (count * timeout);
238 
239     std::cout << "\t\t\t" << (success ? "Success" : "Failure")
240               << ": minimum required time to elapse " << count * timeout
241               << "ms; actual elapsed time " << endTime - startTime << "ms"
242               << std::endl;
243 
244     return success;
245   }
246 
247   class FloodTask : public Runnable {
248   public:
FloodTask(const size_t id,Monitor & mon)249     FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {}
~FloodTask()250     ~FloodTask() override {
251       if (_id % 10000 == 0) {
252 		Synchronized sync(_mon);
253         std::cout << "\t\tthread " << _id << " done" << std::endl;
254       }
255     }
256 
run()257     void run() override {
258       if (_id % 10000 == 0) {
259 		Synchronized sync(_mon);
260         std::cout << "\t\tthread " << _id << " started" << std::endl;
261       }
262     }
263     const size_t _id;
264     Monitor& _mon;
265   };
266 
foo(ThreadFactory * tf)267   void foo(ThreadFactory* tf) { (void)tf; }
268 
269   bool floodNTest(size_t loop = 1, size_t count = 100000) {
270 
271     bool success = false;
272     Monitor mon;
273 
274     for (size_t lix = 0; lix < loop; lix++) {
275 
276       ThreadFactory threadFactory = ThreadFactory();
277       threadFactory.setDetached(true);
278 
279       for (size_t tix = 0; tix < count; tix++) {
280 
281         try {
282 
283           shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon));
284           shared_ptr<Thread> thread = threadFactory.newThread(task);
285           thread->start();
286 
catch(TException & e)287         } catch (TException& e) {
288 
289           std::cout << "\t\t\tfailed to start  " << lix* count + tix << " thread " << e.what()
290                     << std::endl;
291 
292           return success;
293         }
294       }
295 
296       Synchronized sync(mon);
297       std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
298       success = true;
299     }
300 
301     return success;
302   }
303 };
304 
305 }
306 }
307 }
308 } // apache::thrift::concurrency::test
309