1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_CONCURRENCY_THREAD_H_
21 #define _THRIFT_CONCURRENCY_THREAD_H_ 1
22 
23 #include <memory>
24 #include <thread>
25 
26 #include <thrift/concurrency/Monitor.h>
27 
28 namespace apache {
29 namespace thrift {
30 namespace concurrency {
31 
32 class Thread;
33 
34 /**
35  * Minimal runnable class.  More or less analogous to java.lang.Runnable.
36  *
37  * @version $Id:$
38  */
39 class Runnable {
40 
41 public:
42   virtual ~Runnable() = default;
43   virtual void run() = 0;
44 
45   /**
46    * Gets the thread object that is hosting this runnable object  - can return
47    * an empty boost::shared pointer if no references remain on that thread object
48    */
thread()49   virtual std::shared_ptr<Thread> thread() { return thread_.lock(); }
50 
51   /**
52    * Sets the thread that is executing this object.  This is only meant for
53    * use by concrete implementations of Thread.
54    */
thread(std::shared_ptr<Thread> value)55   virtual void thread(std::shared_ptr<Thread> value) { thread_ = value; }
56 
57 private:
58   std::weak_ptr<Thread> thread_;
59 };
60 
61 /**
62  * Minimal thread class. Returned by thread factory bound to a Runnable object
63  * and ready to start execution.  More or less analogous to java.lang.Thread
64  * (minus all the thread group, priority, mode and other baggage, since that
65  * is difficult to abstract across platforms and is left for platform-specific
66  * ThreadFactory implementations to deal with
67  *
68  * @see apache::thrift::concurrency::ThreadFactory)
69  */
70 class Thread : public std::enable_shared_from_this<Thread> {
71 
72 public:
73   typedef std::thread::id id_t;
74   typedef void (*thread_funct_t)(std::shared_ptr<Thread> );
75 
76   enum STATE { uninitialized, starting, started, stopping, stopped };
77 
78   static void threadMain(std::shared_ptr<Thread> thread);
79 
is_current(id_t t)80   static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
get_current()81   static inline id_t get_current() { return std::this_thread::get_id(); }
82 
Thread(bool detached,std::shared_ptr<Runnable> runnable)83   Thread(bool detached, std::shared_ptr<Runnable> runnable)
84     : state_(uninitialized), detached_(detached) {
85     this->_runnable = runnable;
86   }
87 
~Thread()88   virtual ~Thread() {
89     if (!detached_ && thread_->joinable()) {
90       try {
91         join();
92       } catch (...) {
93         // We're really hosed.
94       }
95     }
96   }
97 
getState()98   STATE getState() const
99   {
100     Synchronized sync(monitor_);
101     return state_;
102   }
103 
setState(STATE newState)104   void setState(STATE newState)
105   {
106     Synchronized sync(monitor_);
107     state_ = newState;
108 
109     // unblock start() with the knowledge that the thread has actually
110     // started running, which avoids a race in detached threads.
111     if (newState == started) {
112 	  monitor_.notify();
113     }
114   }
115 
116   /**
117    * Starts the thread. Does platform specific thread creation and
118    * configuration then invokes the run method of the Runnable object bound
119    * to this thread.
120    */
start()121   virtual void start() {
122     if (getState() != uninitialized) {
123       return;
124     }
125 
126     std::shared_ptr<Thread> selfRef = shared_from_this();
127     setState(starting);
128 
129     Synchronized sync(monitor_);
130     thread_ = std::unique_ptr<std::thread>(new std::thread(getThreadFunc(), selfRef));
131 
132     if (detached_)
133       thread_->detach();
134 
135     // Wait for the thread to start and get far enough to grab everything
136     // that it needs from the calling context, thus absolving the caller
137     // from being required to hold on to runnable indefinitely.
138     monitor_.wait();
139   }
140 
141   /**
142    * Join this thread. If this thread is joinable, the calling thread blocks
143    * until this thread completes.  If the target thread is not joinable, then
144    * nothing happens.
145    */
join()146   virtual void join() {
147     if (!detached_ && state_ != uninitialized) {
148       thread_->join();
149     }
150   }
151 
152   /**
153    * Gets the thread's platform-specific ID
154    */
getId()155   Thread::id_t getId() const { return thread_.get() ? thread_->get_id() : std::thread::id(); }
156 
157   /**
158    * Gets the runnable object this thread is hosting
159    */
runnable()160   std::shared_ptr<Runnable> runnable() const { return _runnable; }
161 
162 protected:
163 
getThreadFunc()164   virtual thread_funct_t getThreadFunc() const {
165       return threadMain;
166   }
167 
168 private:
169   std::shared_ptr<Runnable> _runnable;
170   std::unique_ptr<std::thread> thread_;
171   Monitor monitor_;
172   STATE state_;
173   bool detached_;
174 };
175 
176 
177 }
178 }
179 } // apache::thrift::concurrency
180 
181 #endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
182