1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
21 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
22 
23 #include <functional>
24 #include <memory>
25 #include <thrift/concurrency/ThreadFactory.h>
26 
27 namespace apache {
28 namespace thrift {
29 namespace concurrency {
30 
31 /**
32  * Thread Pool Manager and related classes
33  *
34  * @version $Id:$
35  */
36 class ThreadManager;
37 
38 /**
39  * ThreadManager class
40  *
41  * This class manages a pool of threads. It uses a ThreadFactory to create
42  * threads. It never actually creates or destroys worker threads, rather
43  * it maintains statistics on number of idle threads, number of active threads,
44  * task backlog, and average wait and service times and informs the PoolPolicy
45  * object bound to instances of this manager of interesting transitions. It is
46  * then up the PoolPolicy object to decide if the thread pool size needs to be
47  * adjusted and call this object addWorker and removeWorker methods to make
48  * changes.
49  *
50  * This design allows different policy implementations to use this code to
51  * handle basic worker thread management and worker task execution and focus on
52  * policy issues. The simplest policy, StaticPolicy, does nothing other than
53  * create a fixed number of threads.
54  */
55 class ThreadManager {
56 
57 protected:
58   ThreadManager() = default;
59 
60 public:
61   typedef std::function<void(std::shared_ptr<Runnable>)> ExpireCallback;
62 
63   virtual ~ThreadManager() = default;
64 
65   /**
66    * Starts the thread manager. Verifies all attributes have been properly
67    * initialized, then allocates necessary resources to begin operation
68    */
69   virtual void start() = 0;
70 
71   /**
72    * Stops the thread manager. Aborts all remaining unprocessed task, shuts
73    * down all created worker threads, and releases all allocated resources.
74    * This method blocks for all worker threads to complete, thus it can
75    * potentially block forever if a worker thread is running a task that
76    * won't terminate.
77    *
78    * Worker threads will be joined depending on the threadFactory's detached
79    * disposition.
80    */
81   virtual void stop() = 0;
82 
83   enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
84 
85   virtual STATE state() const = 0;
86 
87   /**
88    * \returns the current thread factory
89    */
90   virtual std::shared_ptr<ThreadFactory> threadFactory() const = 0;
91 
92   /**
93    * Set the thread factory.
94    * \throws InvalidArgumentException if the new thread factory has a different
95    *                                  detached disposition than the one replacing it
96    */
97   virtual void threadFactory(std::shared_ptr<ThreadFactory> value) = 0;
98 
99   /**
100    * Adds worker thread(s).
101    */
102   virtual void addWorker(size_t value = 1) = 0;
103 
104   /**
105    * Removes worker thread(s).
106    * Threads are joined if the thread factory detached disposition allows it.
107    * Blocks until the number of worker threads reaches the new limit.
108    * \param[in]  value  the number to remove
109    * \throws InvalidArgumentException if the value is greater than the number
110    *                                  of workers
111    */
112   virtual void removeWorker(size_t value = 1) = 0;
113 
114   /**
115    * Gets the current number of idle worker threads
116    */
117   virtual size_t idleWorkerCount() const = 0;
118 
119   /**
120    * Gets the current number of total worker threads
121    */
122   virtual size_t workerCount() const = 0;
123 
124   /**
125    * Gets the current number of pending tasks
126    */
127   virtual size_t pendingTaskCount() const = 0;
128 
129   /**
130    * Gets the current number of pending and executing tasks
131    */
132   virtual size_t totalTaskCount() const = 0;
133 
134   /**
135    * Gets the maximum pending task count.  0 indicates no maximum
136    */
137   virtual size_t pendingTaskCountMax() const = 0;
138 
139   /**
140    * Gets the number of tasks which have been expired without being run
141    * since start() was called.
142    */
143   virtual size_t expiredTaskCount() const = 0;
144 
145   /**
146    * Adds a task to be executed at some time in the future by a worker thread.
147    *
148    * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
149    * is greater than or equalt to pendingTaskCountMax().  If this method is called in the
150    * context of a ThreadManager worker thread it will throw a
151    * TooManyPendingTasksException
152    *
153    * @param task  The task to queue for execution
154    *
155    * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
156    * is specified. Specific cases:
157    * timeout = 0  : Wait forever to queue task.
158    * timeout = -1 : Return immediately if pending task count exceeds specified max
159    * @param expiration when nonzero, the number of milliseconds the task is valid
160    * to be run; if exceeded, the task will be dropped off the queue and not run.
161    *
162    * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
163    */
164   virtual void add(std::shared_ptr<Runnable> task,
165                    int64_t timeout = 0LL,
166                    int64_t expiration = 0LL) = 0;
167 
168   /**
169    * Removes a pending task
170    */
171   virtual void remove(std::shared_ptr<Runnable> task) = 0;
172 
173   /**
174    * Remove the next pending task which would be run.
175    *
176    * @return the task removed.
177    */
178   virtual std::shared_ptr<Runnable> removeNextPending() = 0;
179 
180   /**
181    * Remove tasks from front of task queue that have expired.
182    */
183   virtual void removeExpiredTasks() = 0;
184 
185   /**
186    * Set a callback to be called when a task is expired and not run.
187    *
188    * @param expireCallback a function called with the shared_ptr<Runnable> for
189    * the expired task.
190    */
191   virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
192 
193   static std::shared_ptr<ThreadManager> newThreadManager();
194 
195   /**
196    * Creates a simple thread manager the uses count number of worker threads and has
197    * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
198    * on pending tasks
199    */
200   static std::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
201                                                                  size_t pendingTaskCountMax = 0);
202 
203   class Task;
204 
205   class Worker;
206 
207   class Impl;
208 };
209 }
210 }
211 } // apache::thrift::concurrency
212 
213 #endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
214