89.58% Lines (43/48) 100.00% Functions (7/7)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/intrusive.hpp> 14   #include <boost/corosio/detail/intrusive.hpp>
15   #include <boost/capy/ex/execution_context.hpp> 15   #include <boost/capy/ex/execution_context.hpp>
16   #include <boost/capy/test/thread_name.hpp> 16   #include <boost/capy/test/thread_name.hpp>
17   17  
18   #include <condition_variable> 18   #include <condition_variable>
19   #include <cstdio> 19   #include <cstdio>
20   #include <mutex> 20   #include <mutex>
21   #include <stdexcept> 21   #include <stdexcept>
22   #include <thread> 22   #include <thread>
23   #include <vector> 23   #include <vector>
24   24  
25   namespace boost::corosio::detail { 25   namespace boost::corosio::detail {
26   26  
27   /** Base class for thread pool work items. 27   /** Base class for thread pool work items.
28   28  
29   Derive from this to create work that can be posted to a 29   Derive from this to create work that can be posted to a
30   @ref thread_pool. Uses static function pointer dispatch, 30   @ref thread_pool. Uses static function pointer dispatch,
31   consistent with the IOCP `op` pattern. 31   consistent with the IOCP `op` pattern.
32   32  
33   @par Example 33   @par Example
34   @code 34   @code
35   struct my_work : pool_work_item 35   struct my_work : pool_work_item
36   { 36   {
37   int* result; 37   int* result;
38   static void execute( pool_work_item* w ) noexcept 38   static void execute( pool_work_item* w ) noexcept
39   { 39   {
40   auto* self = static_cast<my_work*>( w ); 40   auto* self = static_cast<my_work*>( w );
41   *self->result = 42; 41   *self->result = 42;
42   } 42   }
43   }; 43   };
44   44  
45   my_work w; 45   my_work w;
46   w.func_ = &my_work::execute; 46   w.func_ = &my_work::execute;
47   w.result = &r; 47   w.result = &r;
48   pool.post( &w ); 48   pool.post( &w );
49   @endcode 49   @endcode
50   */ 50   */
51   struct pool_work_item : intrusive_queue<pool_work_item>::node 51   struct pool_work_item : intrusive_queue<pool_work_item>::node
52   { 52   {
53   /// Static dispatch function signature. 53   /// Static dispatch function signature.
54   using func_type = void (*)(pool_work_item*) noexcept; 54   using func_type = void (*)(pool_work_item*) noexcept;
55   55  
56   /// Completion handler invoked by the worker thread. 56   /// Completion handler invoked by the worker thread.
57   func_type func_ = nullptr; 57   func_type func_ = nullptr;
58   }; 58   };
59   59  
60   /** Shared thread pool for dispatching blocking operations. 60   /** Shared thread pool for dispatching blocking operations.
61   61  
62   Provides a fixed pool of reusable worker threads for operations 62   Provides a fixed pool of reusable worker threads for operations
63   that cannot be integrated with async I/O (e.g. blocking DNS 63   that cannot be integrated with async I/O (e.g. blocking DNS
64   calls). Registered as an `execution_context::service` so it 64   calls). Registered as an `execution_context::service` so it
65   is a singleton per io_context. 65   is a singleton per io_context.
66   66  
67   Threads are created eagerly in the constructor. The default 67   Threads are created eagerly in the constructor. The default
68   thread count is 1. 68   thread count is 1.
69   69  
70   @par Thread Safety 70   @par Thread Safety
71   All public member functions are thread-safe. 71   All public member functions are thread-safe.
72   72  
73   @par Shutdown 73   @par Shutdown
74   Sets a shutdown flag, notifies all threads, and joins them. 74   Sets a shutdown flag, notifies all threads, and joins them.
75   In-flight blocking calls complete naturally before the thread 75   In-flight blocking calls complete naturally before the thread
76   exits. 76   exits.
77   */ 77   */
78   class thread_pool final : public capy::execution_context::service 78   class thread_pool final : public capy::execution_context::service
79   { 79   {
80   std::mutex mutex_; 80   std::mutex mutex_;
81   std::condition_variable cv_; 81   std::condition_variable cv_;
82   intrusive_queue<pool_work_item> work_queue_; 82   intrusive_queue<pool_work_item> work_queue_;
83   std::vector<std::thread> threads_; 83   std::vector<std::thread> threads_;
84   bool shutdown_ = false; 84   bool shutdown_ = false;
85   85  
86   void worker_loop(unsigned index); 86   void worker_loop(unsigned index);
87   87  
88   public: 88   public:
89   using key_type = thread_pool; 89   using key_type = thread_pool;
90   90  
91   /** Construct the thread pool service. 91   /** Construct the thread pool service.
92   92  
93   Eagerly creates all worker threads. 93   Eagerly creates all worker threads.
94   94  
95   @par Exception Safety 95   @par Exception Safety
96   Strong guarantee. If thread creation fails, all 96   Strong guarantee. If thread creation fails, all
97   already-created threads are shut down and joined 97   already-created threads are shut down and joined
98   before the exception propagates. 98   before the exception propagates.
99   99  
100   @param ctx Reference to the owning execution_context. 100   @param ctx Reference to the owning execution_context.
101   @param num_threads Number of worker threads. Must be 101   @param num_threads Number of worker threads. Must be
102   at least 1. 102   at least 1.
103   103  
104   @throws std::logic_error If `num_threads` is 0. 104   @throws std::logic_error If `num_threads` is 0.
105   */ 105   */
HITCBC 106   1023 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1) 106   1023 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
HITCBC 107   1023 { 107   1023 {
108   (void)ctx; 108   (void)ctx;
HITCBC 109   1023 if (!num_threads) 109   1023 if (!num_threads)
HITCBC 110   1 throw std::logic_error("thread_pool requires at least 1 thread"); 110   1 throw std::logic_error("thread_pool requires at least 1 thread");
HITCBC 111   1022 threads_.reserve(num_threads); 111   1022 threads_.reserve(num_threads);
112   try 112   try
113   { 113   {
HITCBC 114   2050 for (unsigned i = 0; i < num_threads; ++i) 114   2050 for (unsigned i = 0; i < num_threads; ++i)
HITCBC 115   2056 threads_.emplace_back([this, i] { worker_loop(i + 1); }); 115   2056 threads_.emplace_back([this, i] { worker_loop(i + 1); });
116   } 116   }
MISUBC 117   catch (...) 117   catch (...)
118   { 118   {
MISUBC 119   shutdown(); 119   shutdown();
MISUBC 120   throw; 120   throw;
MISUBC 121   } 121   }
HITCBC 122   1025 } 122   1025 }
123   123  
HITCBC 124   2043 ~thread_pool() override = default; 124   2043 ~thread_pool() override = default;
125   125  
126   thread_pool(thread_pool const&) = delete; 126   thread_pool(thread_pool const&) = delete;
127   thread_pool& operator=(thread_pool const&) = delete; 127   thread_pool& operator=(thread_pool const&) = delete;
128   128  
129   /** Enqueue a work item for execution on the thread pool. 129   /** Enqueue a work item for execution on the thread pool.
130   130  
131   Zero-allocation: the caller owns the work item's storage. 131   Zero-allocation: the caller owns the work item's storage.
132   132  
133   @param w The work item to execute. Must remain valid until 133   @param w The work item to execute. Must remain valid until
134   its `func_` has been called. 134   its `func_` has been called.
135   135  
136   @return `true` if the item was enqueued, `false` if the 136   @return `true` if the item was enqueued, `false` if the
137   pool has already shut down. 137   pool has already shut down.
138   */ 138   */
139   bool post(pool_work_item* w) noexcept; 139   bool post(pool_work_item* w) noexcept;
140   140  
141   /** Shut down the thread pool. 141   /** Shut down the thread pool.
142   142  
143   Signals all threads to exit after draining any 143   Signals all threads to exit after draining any
144   remaining queued work, then joins them. 144   remaining queued work, then joins them.
145   */ 145   */
146   void shutdown() override; 146   void shutdown() override;
147   }; 147   };
148   148  
149   inline void 149   inline void
HITCBC 150   1028 thread_pool::worker_loop(unsigned index) 150   1028 thread_pool::worker_loop(unsigned index)
151   { 151   {
152   // Name format chosen to fit Linux's 15-char pthread limit: 152   // Name format chosen to fit Linux's 15-char pthread limit:
153   // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999". 153   // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
154   char name[16]; 154   char name[16];
HITCBC 155   1028 std::snprintf(name, sizeof(name), "tpool-svc-%u", index); 155   1028 std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
HITCBC 156   1028 capy::set_current_thread_name(name); 156   1028 capy::set_current_thread_name(name);
157   157  
158   for (;;) 158   for (;;)
159   { 159   {
160   pool_work_item* w; 160   pool_work_item* w;
161   { 161   {
HITCBC 162   1245 std::unique_lock<std::mutex> lock(mutex_); 162   1245 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 163   1245 cv_.wait( 163   1245 cv_.wait(
HITCBC 164   1809 lock, [this] { return shutdown_ || !work_queue_.empty(); }); 164   2006 lock, [this] { return shutdown_ || !work_queue_.empty(); });
165   165  
HITCBC 166   1245 w = work_queue_.pop(); 166   1245 w = work_queue_.pop();
HITCBC 167   1245 if (!w) 167   1245 if (!w)
168   { 168   {
HITCBC 169   1028 if (shutdown_) 169   1028 if (shutdown_)
HITCBC 170   2056 return; 170   2056 return;
MISUBC 171   continue; 171   continue;
172   } 172   }
HITCBC 173   1245 } 173   1245 }
HITCBC 174   217 w->func_(w); 174   217 w->func_(w);
HITCBC 175   217 } 175   217 }
176   } 176   }
177   177  
178   inline bool 178   inline bool
HITCBC 179   218 thread_pool::post(pool_work_item* w) noexcept 179   218 thread_pool::post(pool_work_item* w) noexcept
180   { 180   {
181   { 181   {
HITCBC 182   218 std::lock_guard<std::mutex> lock(mutex_); 182   218 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 183   218 if (shutdown_) 183   218 if (shutdown_)
HITCBC 184   1 return false; 184   1 return false;
HITCBC 185   217 work_queue_.push(w); 185   217 work_queue_.push(w);
HITCBC 186   218 } 186   218 }
HITCBC 187   217 cv_.notify_one(); 187   217 cv_.notify_one();
HITCBC 188   217 return true; 188   217 return true;
189   } 189   }
190   190  
191   inline void 191   inline void
HITCBC 192   1026 thread_pool::shutdown() 192   1026 thread_pool::shutdown()
193   { 193   {
194   { 194   {
HITCBC 195   1026 std::lock_guard<std::mutex> lock(mutex_); 195   1026 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 196   1026 shutdown_ = true; 196   1026 shutdown_ = true;
HITCBC 197   1026 } 197   1026 }
HITCBC 198   1026 cv_.notify_all(); 198   1026 cv_.notify_all();
199   199  
HITCBC 200   2054 for (auto& t : threads_) 200   2054 for (auto& t : threads_)
201   { 201   {
HITCBC 202   1028 if (t.joinable()) 202   1028 if (t.joinable())
HITCBC 203   1028 t.join(); 203   1028 t.join();
204   } 204   }
HITCBC 205   1026 threads_.clear(); 205   1026 threads_.clear();
206   206  
207   { 207   {
HITCBC 208   1026 std::lock_guard<std::mutex> lock(mutex_); 208   1026 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 209   1026 while (work_queue_.pop()) 209   1026 while (work_queue_.pop())
210   ; 210   ;
HITCBC 211   1026 } 211   1026 }
HITCBC 212   1026 } 212   1026 }
213   213  
214   } // namespace boost::corosio::detail 214   } // namespace boost::corosio::detail
215   215  
216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP