1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #include <thrift/thrift-config.h> 21 #include <thrift/concurrency/Thread.h> 22 #include <thrift/concurrency/ThreadFactory.h> 23 #include <thrift/concurrency/Monitor.h> 24 #include <thrift/concurrency/Mutex.h> 25 26 #include <assert.h> 27 #include <iostream> 28 #include <vector> 29 30 namespace apache { 31 namespace thrift { 32 namespace concurrency { 33 namespace test { 34 35 using std::shared_ptr; 36 using namespace apache::thrift::concurrency; 37 38 /** 39 * ThreadManagerTests class 40 * 41 * @version $Id:$ 42 */ 43 class ThreadFactoryTests { 44 45 public: 46 /** 47 * Reap N threads 48 */ 49 class ReapNTask : public Runnable { 50 51 public: ReapNTask(Monitor & monitor,int & activeCount)52 ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {} 53 run()54 void run() override { 55 Synchronized s(_monitor); 56 57 if (--_count == 0) { 58 _monitor.notify(); 59 } 60 } 61 62 Monitor& _monitor; 63 int& _count; 64 }; 65 66 bool reapNThreads(int loop = 1, int count = 10) { 67 68 ThreadFactory threadFactory = ThreadFactory(); 69 shared_ptr<Monitor> monitor(new Monitor); 70 71 for (int lix = 0; lix < loop; lix++) { 72 73 int activeCount = 0; 74 75 std::vector<shared_ptr<Thread> > threads; 76 int tix; 77 78 for (tix = 0; tix < count; tix++) { 79 try { 80 ++activeCount; 81 threads.push_back( 82 threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, activeCount)))); catch(SystemResourceException & e)83 } catch (SystemResourceException& e) { 84 std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what() 85 << std::endl; 86 throw; 87 } 88 } 89 90 tix = 0; 91 for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin(); 92 thread != threads.end(); 93 tix++, ++thread) { 94 95 try { 96 (*thread)->start(); catch(SystemResourceException & e)97 } catch (SystemResourceException& e) { 98 std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() 99 << std::endl; 100 throw; 101 } 102 } 103 104 { 105 Synchronized s(*monitor); 106 while (activeCount > 0) { 107 monitor->wait(1000); 108 } 109 } 110 111 std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl; 112 } 113 114 std::cout << "\t\t\tSuccess!" << std::endl; 115 return true; 116 } 117 118 class SynchStartTask : public Runnable { 119 120 public: 121 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; 122 SynchStartTask(Monitor & monitor,volatile STATE & state)123 SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {} 124 run()125 void run() override { 126 { 127 Synchronized s(_monitor); 128 if (_state == SynchStartTask::STARTING) { 129 _state = SynchStartTask::STARTED; 130 _monitor.notify(); 131 } 132 } 133 134 { 135 Synchronized s(_monitor); 136 while (_state == SynchStartTask::STARTED) { 137 _monitor.wait(); 138 } 139 140 if (_state == SynchStartTask::STOPPING) { 141 _state = SynchStartTask::STOPPED; 142 _monitor.notifyAll(); 143 } 144 } 145 } 146 147 private: 148 Monitor& _monitor; 149 volatile STATE& _state; 150 }; 151 synchStartTest()152 bool synchStartTest() { 153 154 Monitor monitor; 155 156 SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; 157 158 shared_ptr<SynchStartTask> task 159 = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state)); 160 161 ThreadFactory threadFactory = ThreadFactory(); 162 163 shared_ptr<Thread> thread = threadFactory.newThread(task); 164 165 if (state == SynchStartTask::UNINITIALIZED) { 166 167 state = SynchStartTask::STARTING; 168 169 thread->start(); 170 } 171 172 { 173 Synchronized s(monitor); 174 while (state == SynchStartTask::STARTING) { 175 monitor.wait(); 176 } 177 } 178 179 assert(state != SynchStartTask::STARTING); 180 181 { 182 Synchronized s(monitor); 183 184 try { 185 monitor.wait(100); 186 } catch (TimedOutException&) { 187 } 188 189 if (state == SynchStartTask::STARTED) { 190 191 state = SynchStartTask::STOPPING; 192 193 monitor.notify(); 194 } 195 196 while (state == SynchStartTask::STOPPING) { 197 monitor.wait(); 198 } 199 } 200 201 assert(state == SynchStartTask::STOPPED); 202 203 bool success = true; 204 205 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl; 206 207 return true; 208 } 209 210 /** 211 * The only guarantee a monitor timeout can give you is that 212 * it will take "at least" as long as the timeout, no less. 213 * There is absolutely no guarantee around regaining execution 214 * near the timeout. On a busy system (like inside a third party 215 * CI environment) it could take quite a bit longer than the 216 * requested timeout, and that's ok. 217 */ 218 219 bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) { 220 221 Monitor monitor; 222 223 int64_t startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); 224 225 for (int64_t ix = 0; ix < count; ix++) { 226 { 227 Synchronized s(monitor); 228 try { 229 monitor.wait(timeout); catch(TimedOutException &)230 } catch (TimedOutException&) { 231 } 232 } 233 } 234 235 int64_t endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); 236 237 bool success = (endTime - startTime) >= (count * timeout); 238 239 std::cout << "\t\t\t" << (success ? "Success" : "Failure") 240 << ": minimum required time to elapse " << count * timeout 241 << "ms; actual elapsed time " << endTime - startTime << "ms" 242 << std::endl; 243 244 return success; 245 } 246 247 class FloodTask : public Runnable { 248 public: FloodTask(const size_t id,Monitor & mon)249 FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {} ~FloodTask()250 ~FloodTask() override { 251 if (_id % 10000 == 0) { 252 Synchronized sync(_mon); 253 std::cout << "\t\tthread " << _id << " done" << std::endl; 254 } 255 } 256 run()257 void run() override { 258 if (_id % 10000 == 0) { 259 Synchronized sync(_mon); 260 std::cout << "\t\tthread " << _id << " started" << std::endl; 261 } 262 } 263 const size_t _id; 264 Monitor& _mon; 265 }; 266 foo(ThreadFactory * tf)267 void foo(ThreadFactory* tf) { (void)tf; } 268 269 bool floodNTest(size_t loop = 1, size_t count = 100000) { 270 271 bool success = false; 272 Monitor mon; 273 274 for (size_t lix = 0; lix < loop; lix++) { 275 276 ThreadFactory threadFactory = ThreadFactory(); 277 threadFactory.setDetached(true); 278 279 for (size_t tix = 0; tix < count; tix++) { 280 281 try { 282 283 shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon)); 284 shared_ptr<Thread> thread = threadFactory.newThread(task); 285 thread->start(); 286 catch(TException & e)287 } catch (TException& e) { 288 289 std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() 290 << std::endl; 291 292 return success; 293 } 294 } 295 296 Synchronized sync(mon); 297 std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl; 298 success = true; 299 } 300 301 return success; 302 } 303 }; 304 305 } 306 } 307 } 308 } // apache::thrift::concurrency::test 309