1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/concurrency/Monitor.h>
25
26 #include <memory>
27
28 #include <stdexcept>
29 #include <deque>
30 #include <set>
31
32 namespace apache {
33 namespace thrift {
34 namespace concurrency {
35
36 using std::shared_ptr;
37 using std::unique_ptr;
38 using std::dynamic_pointer_cast;
39
40 /**
41 * ThreadManager class
42 *
43 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
48 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
50 *
51 * @version $Id:$
52 */
53 class ThreadManager::Impl : public ThreadManager {
54
55 public:
Impl()56 Impl()
57 : workerCount_(0),
58 workerMaxCount_(0),
59 idleCount_(0),
60 pendingTaskCountMax_(0),
61 expiredCount_(0),
62 state_(ThreadManager::UNINITIALIZED),
63 monitor_(&mutex_),
64 maxMonitor_(&mutex_),
65 workerMonitor_(&mutex_) {}
66
~Impl()67 ~Impl() override { stop(); }
68
69 void start() override;
70 void stop() override;
71
state() const72 ThreadManager::STATE state() const override { return state_; }
73
threadFactory() const74 shared_ptr<ThreadFactory> threadFactory() const override {
75 Guard g(mutex_);
76 return threadFactory_;
77 }
78
threadFactory(shared_ptr<ThreadFactory> value)79 void threadFactory(shared_ptr<ThreadFactory> value) override {
80 Guard g(mutex_);
81 if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82 throw InvalidArgumentException();
83 }
84 threadFactory_ = value;
85 }
86
87 void addWorker(size_t value) override;
88
89 void removeWorker(size_t value) override;
90
idleWorkerCount() const91 size_t idleWorkerCount() const override { return idleCount_; }
92
workerCount() const93 size_t workerCount() const override {
94 Guard g(mutex_);
95 return workerCount_;
96 }
97
pendingTaskCount() const98 size_t pendingTaskCount() const override {
99 Guard g(mutex_);
100 return tasks_.size();
101 }
102
totalTaskCount() const103 size_t totalTaskCount() const override {
104 Guard g(mutex_);
105 return tasks_.size() + workerCount_ - idleCount_;
106 }
107
pendingTaskCountMax() const108 size_t pendingTaskCountMax() const override {
109 Guard g(mutex_);
110 return pendingTaskCountMax_;
111 }
112
expiredTaskCount() const113 size_t expiredTaskCount() const override {
114 Guard g(mutex_);
115 return expiredCount_;
116 }
117
pendingTaskCountMax(const size_t value)118 void pendingTaskCountMax(const size_t value) {
119 Guard g(mutex_);
120 pendingTaskCountMax_ = value;
121 }
122
123 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
124
125 void remove(shared_ptr<Runnable> task) override;
126
127 shared_ptr<Runnable> removeNextPending() override;
128
removeExpiredTasks()129 void removeExpiredTasks() override {
130 removeExpired(false);
131 }
132
133 void setExpireCallback(ExpireCallback expireCallback) override;
134
135 private:
136 /**
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
139 */
140 void removeExpired(bool justOne);
141
142 /**
143 * \returns whether it is acceptable to block, depending on the current thread id
144 */
145 bool canSleep() const;
146
147 /**
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
151 */
152 void removeWorkersUnderLock(size_t value);
153
154 size_t workerCount_;
155 size_t workerMaxCount_;
156 size_t idleCount_;
157 size_t pendingTaskCountMax_;
158 size_t expiredCount_;
159 ExpireCallback expireCallback_;
160
161 ThreadManager::STATE state_;
162 shared_ptr<ThreadFactory> threadFactory_;
163
164 friend class ThreadManager::Task;
165 typedef std::deque<shared_ptr<Task> > TaskQueue;
166 TaskQueue tasks_;
167 Mutex mutex_;
168 Monitor monitor_;
169 Monitor maxMonitor_;
170 Monitor workerMonitor_; // used to synchronize changes in worker count
171
172 friend class ThreadManager::Worker;
173 std::set<shared_ptr<Thread> > workers_;
174 std::set<shared_ptr<Thread> > deadWorkers_;
175 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
176 };
177
178 class ThreadManager::Task : public Runnable {
179
180 public:
181 enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
182
Task(shared_ptr<Runnable> runnable,uint64_t expiration=0ULL)183 Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
184 : runnable_(runnable),
185 state_(WAITING) {
186 if (expiration != 0ULL) {
187 expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
188 }
189 }
190
191 ~Task() override = default;
192
run()193 void run() override {
194 if (state_ == EXECUTING) {
195 runnable_->run();
196 state_ = COMPLETE;
197 }
198 }
199
getRunnable()200 shared_ptr<Runnable> getRunnable() { return runnable_; }
201
getExpireTime() const202 const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
203
204 private:
205 shared_ptr<Runnable> runnable_;
206 friend class ThreadManager::Worker;
207 STATE state_;
208 unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
209 };
210
211 class ThreadManager::Worker : public Runnable {
212 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
213
214 public:
Worker(ThreadManager::Impl * manager)215 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
216
217 ~Worker() override = default;
218
219 private:
isActive() const220 bool isActive() const {
221 return (manager_->workerCount_ <= manager_->workerMaxCount_)
222 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
223 }
224
225 public:
226 /**
227 * Worker entry point
228 *
229 * As long as worker thread is running, pull tasks off the task queue and
230 * execute.
231 */
run()232 void run() override {
233 Guard g(manager_->mutex_);
234
235 /**
236 * This method has three parts; one is to check for and account for
237 * admitting a task which happens under a lock. Then the lock is released
238 * and the task itself is executed. Finally we do some accounting
239 * under lock again when the task completes.
240 */
241
242 /**
243 * Admitting
244 */
245
246 /**
247 * Increment worker semaphore and notify manager if worker count reached
248 * desired max
249 */
250 bool active = manager_->workerCount_ < manager_->workerMaxCount_;
251 if (active) {
252 if (++manager_->workerCount_ == manager_->workerMaxCount_) {
253 manager_->workerMonitor_.notify();
254 }
255 }
256
257 while (active) {
258 /**
259 * While holding manager monitor block for non-empty task queue (Also
260 * check that the thread hasn't been requested to stop). Once the queue
261 * is non-empty, dequeue a task, release monitor, and execute. If the
262 * worker max count has been decremented such that we exceed it, mark
263 * ourself inactive, decrement the worker count and notify the manager
264 * (technically we're notifying the next blocked thread but eventually
265 * the manager will see it.
266 */
267 active = isActive();
268
269 while (active && manager_->tasks_.empty()) {
270 manager_->idleCount_++;
271 manager_->monitor_.wait();
272 active = isActive();
273 manager_->idleCount_--;
274 }
275
276 shared_ptr<ThreadManager::Task> task;
277
278 if (active) {
279 if (!manager_->tasks_.empty()) {
280 task = manager_->tasks_.front();
281 manager_->tasks_.pop_front();
282 if (task->state_ == ThreadManager::Task::WAITING) {
283 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284 // then the execution loop needs to be changed below.
285 task->state_ =
286 (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
287 ThreadManager::Task::TIMEDOUT :
288 ThreadManager::Task::EXECUTING;
289 }
290 }
291
292 /* If we have a pending task max and we just dropped below it, wakeup any
293 thread that might be blocked on add. */
294 if (manager_->pendingTaskCountMax_ != 0
295 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
296 manager_->maxMonitor_.notify();
297 }
298 }
299
300 /**
301 * Execution - not holding a lock
302 */
303 if (task) {
304 if (task->state_ == ThreadManager::Task::EXECUTING) {
305
306 // Release the lock so we can run the task without blocking the thread manager
307 manager_->mutex_.unlock();
308
309 try {
310 task->run();
311 } catch (const std::exception& e) {
312 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
313 } catch (...) {
314 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
315 }
316
317 // Re-acquire the lock to proceed in the thread manager
318 manager_->mutex_.lock();
319
320 } else if (manager_->expireCallback_) {
321 // The only other state the task could have been in is TIMEDOUT (see above)
322 manager_->mutex_.unlock();
323 manager_->expireCallback_(task->getRunnable());
324 manager_->mutex_.lock();
325 manager_->expiredCount_++;
326 }
327 }
328 }
329
330 /**
331 * Final accounting for the worker thread that is done working
332 */
333 manager_->deadWorkers_.insert(this->thread());
334 if (--manager_->workerCount_ == manager_->workerMaxCount_) {
335 manager_->workerMonitor_.notify();
336 }
337 }
338
339 private:
340 ThreadManager::Impl* manager_;
341 friend class ThreadManager::Impl;
342 STATE state_;
343 };
344
addWorker(size_t value)345 void ThreadManager::Impl::addWorker(size_t value) {
346 std::set<shared_ptr<Thread> > newThreads;
347 for (size_t ix = 0; ix < value; ix++) {
348 shared_ptr<ThreadManager::Worker> worker
349 = std::make_shared<ThreadManager::Worker>(this);
350 newThreads.insert(threadFactory_->newThread(worker));
351 }
352
353 Guard g(mutex_);
354 workerMaxCount_ += value;
355 workers_.insert(newThreads.begin(), newThreads.end());
356
357 for (const auto & newThread : newThreads) {
358 shared_ptr<ThreadManager::Worker> worker
359 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
360 worker->state_ = ThreadManager::Worker::STARTING;
361 newThread->start();
362 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
363 }
364
365 while (workerCount_ != workerMaxCount_) {
366 workerMonitor_.wait();
367 }
368 }
369
start()370 void ThreadManager::Impl::start() {
371 Guard g(mutex_);
372 if (state_ == ThreadManager::STOPPED) {
373 return;
374 }
375
376 if (state_ == ThreadManager::UNINITIALIZED) {
377 if (!threadFactory_) {
378 throw InvalidArgumentException();
379 }
380 state_ = ThreadManager::STARTED;
381 monitor_.notifyAll();
382 }
383
384 while (state_ == STARTING) {
385 monitor_.wait();
386 }
387 }
388
stop()389 void ThreadManager::Impl::stop() {
390 Guard g(mutex_);
391 bool doStop = false;
392
393 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
394 && state_ != ThreadManager::STOPPED) {
395 doStop = true;
396 state_ = ThreadManager::JOINING;
397 }
398
399 if (doStop) {
400 removeWorkersUnderLock(workerCount_);
401 }
402
403 state_ = ThreadManager::STOPPED;
404 }
405
removeWorker(size_t value)406 void ThreadManager::Impl::removeWorker(size_t value) {
407 Guard g(mutex_);
408 removeWorkersUnderLock(value);
409 }
410
removeWorkersUnderLock(size_t value)411 void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
412 if (value > workerMaxCount_) {
413 throw InvalidArgumentException();
414 }
415
416 workerMaxCount_ -= value;
417
418 if (idleCount_ > value) {
419 // There are more idle workers than we need to remove,
420 // so notify enough of them so they can terminate.
421 for (size_t ix = 0; ix < value; ix++) {
422 monitor_.notify();
423 }
424 } else {
425 // There are as many or less idle workers than we need to remove,
426 // so just notify them all so they can terminate.
427 monitor_.notifyAll();
428 }
429
430 while (workerCount_ != workerMaxCount_) {
431 workerMonitor_.wait();
432 }
433
434 for (const auto & deadWorker : deadWorkers_) {
435
436 // when used with a joinable thread factory, we join the threads as we remove them
437 if (!threadFactory_->isDetached()) {
438 deadWorker->join();
439 }
440
441 idMap_.erase(deadWorker->getId());
442 workers_.erase(deadWorker);
443 }
444
445 deadWorkers_.clear();
446 }
447
canSleep() const448 bool ThreadManager::Impl::canSleep() const {
449 const Thread::id_t id = threadFactory_->getCurrentThreadId();
450 return idMap_.find(id) == idMap_.end();
451 }
452
add(shared_ptr<Runnable> value,int64_t timeout,int64_t expiration)453 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
454 Guard g(mutex_, timeout);
455
456 if (!g) {
457 throw TimedOutException();
458 }
459
460 if (state_ != ThreadManager::STARTED) {
461 throw IllegalStateException(
462 "ThreadManager::Impl::add ThreadManager "
463 "not started");
464 }
465
466 // if we're at a limit, remove an expired task to see if the limit clears
467 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
468 removeExpired(true);
469 }
470
471 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
472 if (canSleep() && timeout >= 0) {
473 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
474 // This is thread safe because the mutex is shared between monitors.
475 maxMonitor_.wait(timeout);
476 }
477 } else {
478 throw TooManyPendingTasksException();
479 }
480 }
481
482 tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
483
484 // If idle thread is available notify it, otherwise all worker threads are
485 // running and will get around to this task in time.
486 if (idleCount_ > 0) {
487 monitor_.notify();
488 }
489 }
490
remove(shared_ptr<Runnable> task)491 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
492 Guard g(mutex_);
493 if (state_ != ThreadManager::STARTED) {
494 throw IllegalStateException(
495 "ThreadManager::Impl::remove ThreadManager not "
496 "started");
497 }
498
499 for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
500 {
501 if ((*it)->getRunnable() == task)
502 {
503 tasks_.erase(it);
504 return;
505 }
506 }
507 }
508
removeNextPending()509 std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
510 Guard g(mutex_);
511 if (state_ != ThreadManager::STARTED) {
512 throw IllegalStateException(
513 "ThreadManager::Impl::removeNextPending "
514 "ThreadManager not started");
515 }
516
517 if (tasks_.empty()) {
518 return std::shared_ptr<Runnable>();
519 }
520
521 shared_ptr<ThreadManager::Task> task = tasks_.front();
522 tasks_.pop_front();
523
524 return task->getRunnable();
525 }
526
removeExpired(bool justOne)527 void ThreadManager::Impl::removeExpired(bool justOne) {
528 // this is always called under a lock
529 if (tasks_.empty()) {
530 return;
531 }
532 auto now = std::chrono::steady_clock::now();
533
534 for (auto it = tasks_.begin(); it != tasks_.end(); )
535 {
536 if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
537 if (expireCallback_) {
538 expireCallback_((*it)->getRunnable());
539 }
540 it = tasks_.erase(it);
541 ++expiredCount_;
542 if (justOne) {
543 return;
544 }
545 }
546 else
547 {
548 ++it;
549 }
550 }
551 }
552
setExpireCallback(ExpireCallback expireCallback)553 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
554 Guard g(mutex_);
555 expireCallback_ = expireCallback;
556 }
557
558 class SimpleThreadManager : public ThreadManager::Impl {
559
560 public:
SimpleThreadManager(size_t workerCount=4,size_t pendingTaskCountMax=0)561 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
562 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
563
start()564 void start() override {
565 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
566 ThreadManager::Impl::start();
567 addWorker(workerCount_);
568 }
569
570 private:
571 const size_t workerCount_;
572 const size_t pendingTaskCountMax_;
573 };
574
newThreadManager()575 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
576 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
577 }
578
newSimpleThreadManager(size_t count,size_t pendingTaskCountMax)579 shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
580 size_t pendingTaskCountMax) {
581 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
582 }
583 }
584 }
585 } // apache::thrift::concurrency
586