src/corosio/src/io_context.cpp

98.8% Lines (81/82) 100.0% List of functions (13/13)
io_context.cpp
f(x) Functions (13)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/io_context.hpp>
12 #include <boost/corosio/backend.hpp>
13 #include <boost/corosio/detail/thread_pool.hpp>
14
15 #include <algorithm>
16 #include <stdexcept>
17 #include <thread>
18
19 #if BOOST_COROSIO_HAS_EPOLL
20 #include <boost/corosio/native/detail/epoll/epoll_types.hpp>
21 #endif
22
23 #if BOOST_COROSIO_HAS_SELECT
24 #include <boost/corosio/native/detail/select/select_types.hpp>
25 #endif
26
27 #if BOOST_COROSIO_HAS_KQUEUE
28 #include <boost/corosio/native/detail/kqueue/kqueue_types.hpp>
29 #endif
30
31 #if BOOST_COROSIO_HAS_IO_URING
32 #include <boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp>
33 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
34 #include <boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp>
35 #include <boost/corosio/native/detail/io_uring/io_uring_multishot_acceptor.hpp>
36 #include <boost/corosio/native/detail/io_uring/io_uring_random_access_file.hpp>
37 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
38 #include <boost/corosio/native/detail/io_uring/io_uring_stream_file.hpp>
39 #include <boost/corosio/native/detail/io_uring/io_uring_types.hpp>
40 #endif
41
42 #if BOOST_COROSIO_HAS_IOCP
43 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
44 #include <boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp>
45 #include <boost/corosio/native/detail/iocp/win_udp_service.hpp>
46 #include <boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp>
47 #include <boost/corosio/native/detail/iocp/win_signals.hpp>
48 #include <boost/corosio/native/detail/iocp/win_file_service.hpp>
49 #include <boost/corosio/native/detail/iocp/win_random_access_file_service.hpp>
50 #endif
51
52 namespace boost::corosio {
53
54 #if BOOST_COROSIO_HAS_EPOLL
55 detail::scheduler&
56 593x epoll_t::construct(capy::execution_context& ctx, unsigned concurrency_hint)
57 {
58 1186x auto& sched = ctx.make_service<detail::epoll_scheduler>(
59 593x static_cast<int>(concurrency_hint));
60
61 593x ctx.make_service<detail::epoll_tcp_service>();
62 593x ctx.make_service<detail::epoll_tcp_acceptor_service>();
63 593x ctx.make_service<detail::epoll_udp_service>();
64 593x ctx.make_service<detail::epoll_local_stream_service>();
65 593x ctx.make_service<detail::epoll_local_stream_acceptor_service>();
66 593x ctx.make_service<detail::epoll_local_datagram_service>();
67
68 593x return sched;
69 }
70 #endif
71
72 #if BOOST_COROSIO_HAS_SELECT
73 detail::scheduler&
74 428x select_t::construct(capy::execution_context& ctx, unsigned concurrency_hint)
75 {
76 856x auto& sched = ctx.make_service<detail::select_scheduler>(
77 428x static_cast<int>(concurrency_hint));
78
79 428x ctx.make_service<detail::select_tcp_service>();
80 428x ctx.make_service<detail::select_tcp_acceptor_service>();
81 428x ctx.make_service<detail::select_udp_service>();
82 428x ctx.make_service<detail::select_local_stream_service>();
83 428x ctx.make_service<detail::select_local_stream_acceptor_service>();
84 428x ctx.make_service<detail::select_local_datagram_service>();
85
86 428x return sched;
87 }
88 #endif
89
90 #if BOOST_COROSIO_HAS_KQUEUE
91 detail::scheduler&
92 kqueue_t::construct(capy::execution_context& ctx, unsigned concurrency_hint)
93 {
94 auto& sched = ctx.make_service<detail::kqueue_scheduler>(
95 static_cast<int>(concurrency_hint));
96
97 ctx.make_service<detail::kqueue_tcp_service>();
98 ctx.make_service<detail::kqueue_tcp_acceptor_service>();
99 ctx.make_service<detail::kqueue_udp_service>();
100 ctx.make_service<detail::kqueue_local_stream_service>();
101 ctx.make_service<detail::kqueue_local_stream_acceptor_service>();
102 ctx.make_service<detail::kqueue_local_datagram_service>();
103
104 return sched;
105 }
106 #endif
107
108 #if BOOST_COROSIO_HAS_IOCP
109 detail::scheduler&
110 iocp_t::construct(capy::execution_context& ctx, unsigned concurrency_hint)
111 {
112 auto& sched = ctx.make_service<detail::win_scheduler>(
113 static_cast<int>(concurrency_hint));
114
115 auto& tcp_svc = ctx.make_service<detail::win_tcp_service>();
116 ctx.make_service<detail::win_tcp_acceptor_service>(tcp_svc);
117 ctx.make_service<detail::win_udp_service>();
118 auto& local_svc =
119 ctx.make_service<detail::win_local_stream_service>(tcp_svc);
120 ctx.make_service<detail::win_local_stream_acceptor_service>(local_svc);
121 ctx.make_service<detail::win_signals>();
122 ctx.make_service<detail::win_file_service>();
123 ctx.make_service<detail::win_random_access_file_service>();
124
125 return sched;
126 }
127 #endif
128
129 #if BOOST_COROSIO_HAS_IO_URING
130 detail::scheduler&
131 io_uring_t::construct(capy::execution_context& ctx, unsigned concurrency_hint)
132 {
133 auto& sched = ctx.make_service<detail::io_uring_scheduler>(
134 static_cast<int>(concurrency_hint));
135
136 ctx.make_service<detail::io_uring_tcp_service>();
137 ctx.make_service<detail::io_uring_tcp_acceptor_service>();
138 ctx.make_service<detail::io_uring_local_stream_service>();
139 ctx.make_service<detail::io_uring_local_stream_acceptor_service>();
140 ctx.make_service<detail::io_uring_udp_service>();
141 ctx.make_service<detail::io_uring_local_datagram_service>();
142 ctx.make_service<detail::io_uring_stream_file_service>(sched);
143 ctx.make_service<detail::io_uring_random_access_file_service>(sched);
144
145 return sched;
146 }
147 #endif
148
149 namespace {
150
151 // Pre-create services that must exist before construct() runs.
152 void
153 12x pre_create_services(
154 capy::execution_context& ctx,
155 io_context_options const& opts)
156 {
157 #if BOOST_COROSIO_POSIX
158 12x if (opts.thread_pool_size < 1)
159 throw std::invalid_argument(
160 "thread_pool_size must be at least 1");
161 // Pre-create the shared thread pool with the configured size.
162 // This must happen before construct() because the scheduler
163 // constructor creates file and resolver services that call
164 // get_or_create_pool(), which would create a 1-thread pool.
165 12x if (opts.thread_pool_size != 1)
166 1x ctx.make_service<detail::thread_pool>(opts.thread_pool_size);
167 #endif
168
169 (void)ctx;
170 (void)opts;
171 12x }
172
173 // Apply runtime tuning to the scheduler after construction.
174 //
175 // Concurrency-hint heuristic for budget defaults: when the io_context is
176 // constructed with concurrency_hint > 1 AND the user has not customized
177 // the budget settings (i.e. they remain at the struct defaults), we
178 // disable the inline-completion fast path. Multi-thread workloads
179 // benefit from "always-post" because cross-thread work-stealing wins
180 // over chained dispatch on the originating thread. Single-thread (or
181 // any custom budget) keeps the user/library setting unchanged.
182 void
183 12x apply_scheduler_options(
184 detail::scheduler& sched,
185 io_context_options const& opts,
186 unsigned concurrency_hint)
187 {
188 #if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_KQUEUE || BOOST_COROSIO_HAS_SELECT
189 // dynamic_cast — when io_uring is also linked, the runtime probe may
190 // have selected io_uring_scheduler instead of a reactor_scheduler.
191 12x if (auto* reactor =
192 12x dynamic_cast<detail::reactor_scheduler*>(&sched))
193 {
194 // Detect "user kept the defaults" by comparing all three to the
195 // io_context_options-defined struct defaults.
196 12x io_context_options defaults;
197 12x bool budget_at_defaults =
198 16x opts.inline_budget_initial == defaults.inline_budget_initial &&
199 16x opts.inline_budget_max == defaults.inline_budget_max &&
200 4x opts.unassisted_budget == defaults.unassisted_budget;
201
202 12x unsigned init = opts.inline_budget_initial;
203 12x unsigned max = opts.inline_budget_max;
204 12x unsigned ua = opts.unassisted_budget;
205
206 12x if (budget_at_defaults && concurrency_hint > 1)
207 {
208 // Multi-thread default: disable budget (post-everything).
209 3x init = 0;
210 3x max = 0;
211 3x ua = 0;
212 }
213
214 12x reactor->configure_reactor(
215 12x opts.max_events_per_poll,
216 init,
217 max,
218 ua);
219 10x if (opts.single_threaded)
220 1x reactor->configure_single_threaded(true);
221 }
222 #endif
223
224 #if BOOST_COROSIO_HAS_IO_URING
225 if (auto* uring_sched =
226 dynamic_cast<detail::io_uring_scheduler*>(&sched))
227 {
228 if (opts.single_threaded)
229 uring_sched->configure_single_threaded(true);
230 if (opts.enable_sqpoll)
231 uring_sched->configure_sqpoll(
232 true, opts.sq_thread_idle_ms, opts.sq_thread_cpu);
233 }
234 #endif
235
236 #if BOOST_COROSIO_HAS_IOCP
237 auto& iocp_sched = static_cast<detail::win_scheduler&>(sched);
238 iocp_sched.configure_iocp(opts.gqcs_timeout_ms);
239 if (opts.single_threaded)
240 iocp_sched.configure_single_threaded(true);
241 #endif
242
243 (void)sched;
244 (void)opts;
245 10x }
246
247 detail::scheduler&
248 165x construct_default(capy::execution_context& ctx, unsigned concurrency_hint)
249 {
250 #if BOOST_COROSIO_HAS_IOCP
251 return iocp_t::construct(ctx, concurrency_hint);
252 #elif BOOST_COROSIO_HAS_EPOLL
253 165x return epoll_t::construct(ctx, concurrency_hint);
254 #elif BOOST_COROSIO_HAS_KQUEUE
255 return kqueue_t::construct(ctx, concurrency_hint);
256 #elif BOOST_COROSIO_HAS_SELECT
257 return select_t::construct(ctx, concurrency_hint);
258 #endif
259 }
260
261 // Tie concurrency_hint == 1 to single_threaded (asio precedent).
262 io_context_options
263 12x normalize_options(io_context_options opts, unsigned concurrency_hint)
264 {
265 12x if (concurrency_hint == 1)
266 1x opts.single_threaded = true;
267 12x return opts;
268 }
269
270 } // anonymous namespace
271
272 157x io_context::io_context()
273 157x : io_context(std::max(2u, std::thread::hardware_concurrency()))
274 {
275 157x }
276
277 161x io_context::io_context(unsigned concurrency_hint)
278 : capy::execution_context(this)
279 161x , sched_(&construct_default(*this, concurrency_hint))
280 {
281 161x if (concurrency_hint == 1)
282 4x configure_single_threaded_();
283 161x }
284
285 4x io_context::io_context(
286 io_context_options const& opts_in,
287 4x unsigned concurrency_hint)
288 : capy::execution_context(this)
289 4x , sched_(nullptr)
290 {
291 4x auto opts = normalize_options(opts_in, concurrency_hint);
292 4x pre_create_services(*this, opts);
293 4x sched_ = &construct_default(*this, concurrency_hint);
294 4x apply_scheduler_options(*sched_, opts, concurrency_hint);
295 4x }
296
297 void
298 8x io_context::apply_options_pre_(io_context_options const& opts)
299 {
300 8x pre_create_services(*this, opts);
301 8x }
302
303 void
304 8x io_context::apply_options_post_(
305 io_context_options const& opts_in,
306 unsigned concurrency_hint)
307 {
308 8x auto opts = normalize_options(opts_in, concurrency_hint);
309 8x apply_scheduler_options(*sched_, opts, concurrency_hint);
310 6x }
311
312 void
313 6x io_context::configure_single_threaded_()
314 {
315 // Dispatched through the scheduler base's virtual override; avoids
316 // unsafe downcasts when the active backend is io_uring rather than
317 // reactor (on Linux both BOOST_COROSIO_HAS_EPOLL and the io_uring
318 // backend may be enabled simultaneously).
319 6x sched_->configure_single_threaded(true);
320 6x }
321
322 1019x io_context::~io_context()
323 {
324 1019x shutdown();
325 1019x destroy();
326 1019x }
327
328 } // namespace boost::corosio
329