1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ 21 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1 22 23 #include <functional> 24 #include <memory> 25 #include <thrift/concurrency/ThreadFactory.h> 26 27 namespace apache { 28 namespace thrift { 29 namespace concurrency { 30 31 /** 32 * Thread Pool Manager and related classes 33 * 34 * @version $Id:$ 35 */ 36 class ThreadManager; 37 38 /** 39 * ThreadManager class 40 * 41 * This class manages a pool of threads. It uses a ThreadFactory to create 42 * threads. It never actually creates or destroys worker threads, rather 43 * it maintains statistics on number of idle threads, number of active threads, 44 * task backlog, and average wait and service times and informs the PoolPolicy 45 * object bound to instances of this manager of interesting transitions. It is 46 * then up the PoolPolicy object to decide if the thread pool size needs to be 47 * adjusted and call this object addWorker and removeWorker methods to make 48 * changes. 49 * 50 * This design allows different policy implementations to use this code to 51 * handle basic worker thread management and worker task execution and focus on 52 * policy issues. The simplest policy, StaticPolicy, does nothing other than 53 * create a fixed number of threads. 54 */ 55 class ThreadManager { 56 57 protected: 58 ThreadManager() = default; 59 60 public: 61 typedef std::function<void(std::shared_ptr<Runnable>)> ExpireCallback; 62 63 virtual ~ThreadManager() = default; 64 65 /** 66 * Starts the thread manager. Verifies all attributes have been properly 67 * initialized, then allocates necessary resources to begin operation 68 */ 69 virtual void start() = 0; 70 71 /** 72 * Stops the thread manager. Aborts all remaining unprocessed task, shuts 73 * down all created worker threads, and releases all allocated resources. 74 * This method blocks for all worker threads to complete, thus it can 75 * potentially block forever if a worker thread is running a task that 76 * won't terminate. 77 * 78 * Worker threads will be joined depending on the threadFactory's detached 79 * disposition. 80 */ 81 virtual void stop() = 0; 82 83 enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED }; 84 85 virtual STATE state() const = 0; 86 87 /** 88 * \returns the current thread factory 89 */ 90 virtual std::shared_ptr<ThreadFactory> threadFactory() const = 0; 91 92 /** 93 * Set the thread factory. 94 * \throws InvalidArgumentException if the new thread factory has a different 95 * detached disposition than the one replacing it 96 */ 97 virtual void threadFactory(std::shared_ptr<ThreadFactory> value) = 0; 98 99 /** 100 * Adds worker thread(s). 101 */ 102 virtual void addWorker(size_t value = 1) = 0; 103 104 /** 105 * Removes worker thread(s). 106 * Threads are joined if the thread factory detached disposition allows it. 107 * Blocks until the number of worker threads reaches the new limit. 108 * \param[in] value the number to remove 109 * \throws InvalidArgumentException if the value is greater than the number 110 * of workers 111 */ 112 virtual void removeWorker(size_t value = 1) = 0; 113 114 /** 115 * Gets the current number of idle worker threads 116 */ 117 virtual size_t idleWorkerCount() const = 0; 118 119 /** 120 * Gets the current number of total worker threads 121 */ 122 virtual size_t workerCount() const = 0; 123 124 /** 125 * Gets the current number of pending tasks 126 */ 127 virtual size_t pendingTaskCount() const = 0; 128 129 /** 130 * Gets the current number of pending and executing tasks 131 */ 132 virtual size_t totalTaskCount() const = 0; 133 134 /** 135 * Gets the maximum pending task count. 0 indicates no maximum 136 */ 137 virtual size_t pendingTaskCountMax() const = 0; 138 139 /** 140 * Gets the number of tasks which have been expired without being run 141 * since start() was called. 142 */ 143 virtual size_t expiredTaskCount() const = 0; 144 145 /** 146 * Adds a task to be executed at some time in the future by a worker thread. 147 * 148 * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount() 149 * is greater than or equalt to pendingTaskCountMax(). If this method is called in the 150 * context of a ThreadManager worker thread it will throw a 151 * TooManyPendingTasksException 152 * 153 * @param task The task to queue for execution 154 * 155 * @param timeout Time to wait in milliseconds to add a task when a pending-task-count 156 * is specified. Specific cases: 157 * timeout = 0 : Wait forever to queue task. 158 * timeout = -1 : Return immediately if pending task count exceeds specified max 159 * @param expiration when nonzero, the number of milliseconds the task is valid 160 * to be run; if exceeded, the task will be dropped off the queue and not run. 161 * 162 * @throws TooManyPendingTasksException Pending task count exceeds max pending task count 163 */ 164 virtual void add(std::shared_ptr<Runnable> task, 165 int64_t timeout = 0LL, 166 int64_t expiration = 0LL) = 0; 167 168 /** 169 * Removes a pending task 170 */ 171 virtual void remove(std::shared_ptr<Runnable> task) = 0; 172 173 /** 174 * Remove the next pending task which would be run. 175 * 176 * @return the task removed. 177 */ 178 virtual std::shared_ptr<Runnable> removeNextPending() = 0; 179 180 /** 181 * Remove tasks from front of task queue that have expired. 182 */ 183 virtual void removeExpiredTasks() = 0; 184 185 /** 186 * Set a callback to be called when a task is expired and not run. 187 * 188 * @param expireCallback a function called with the shared_ptr<Runnable> for 189 * the expired task. 190 */ 191 virtual void setExpireCallback(ExpireCallback expireCallback) = 0; 192 193 static std::shared_ptr<ThreadManager> newThreadManager(); 194 195 /** 196 * Creates a simple thread manager the uses count number of worker threads and has 197 * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit 198 * on pending tasks 199 */ 200 static std::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4, 201 size_t pendingTaskCountMax = 0); 202 203 class Task; 204 205 class Worker; 206 207 class Impl; 208 }; 209 } 210 } 211 } // apache::thrift::concurrency 212 213 #endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ 214