TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : // Copyright (c) 2026 Michael Vandeberg
5 : //
6 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
7 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8 : //
9 : // Official repository: https://github.com/cppalliance/corosio
10 : //
11 :
12 : #ifndef BOOST_COROSIO_IO_CONTEXT_HPP
13 : #define BOOST_COROSIO_IO_CONTEXT_HPP
14 :
15 : #include <boost/corosio/detail/config.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/corosio/detail/platform.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/continuation.hpp>
20 : #include <boost/capy/ex/execution_context.hpp>
21 :
22 : #include <chrono>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <limits>
26 : #include <thread>
27 :
28 : namespace boost::corosio {
29 :
30 : /** Runtime tuning options for @ref io_context.
31 :
32 : All fields have defaults that match the library's built-in
33 : values, so constructing a default `io_context_options` produces
34 : identical behavior to an unconfigured context.
35 :
36 : Options that apply only to a specific backend family are
37 : silently ignored when the active backend does not support them.
38 :
39 : @par Example
40 : @code
41 : io_context_options opts;
42 : opts.max_events_per_poll = 256; // larger batch per syscall
43 : opts.inline_budget_max = 32; // more speculative completions
44 : opts.thread_pool_size = 4; // more file-I/O workers
45 :
46 : io_context ioc(opts);
47 : @endcode
48 :
49 : @see io_context, native_io_context
50 : */
51 : struct io_context_options
52 : {
53 : /** Maximum events fetched per reactor poll call.
54 :
55 : Controls the buffer size passed to `epoll_wait()` or
56 : `kevent()`. Larger values reduce syscall frequency under
57 : high load; smaller values improve fairness between
58 : connections. Ignored on IOCP and select backends.
59 : */
60 : unsigned max_events_per_poll = 128;
61 :
62 : /** Starting inline completion budget per handler chain.
63 :
64 : After a posted handler executes, the reactor grants this
65 : many speculative inline completions before forcing a
66 : re-queue. Applies to reactor backends only.
67 :
68 : @note Constructing an `io_context` with `concurrency_hint > 1`
69 : and all three budget fields at their defaults overrides
70 : them to disable inline completion (post-everything mode),
71 : since multi-thread workloads benefit from cross-thread
72 : work-stealing. Setting any budget field to a non-default
73 : value disables the override.
74 : */
75 : unsigned inline_budget_initial = 2;
76 :
77 : /** Hard ceiling on adaptive inline budget ramp-up.
78 :
79 : The budget doubles each cycle it is fully consumed, up to
80 : this limit. Applies to reactor backends only.
81 : */
82 : unsigned inline_budget_max = 16;
83 :
84 : /** Inline budget when no other thread assists the reactor.
85 :
86 : When only one thread is running the event loop, this
87 : value caps the inline budget to preserve fairness.
88 : Applies to reactor backends only.
89 : */
90 : unsigned unassisted_budget = 4;
91 :
92 : /** Maximum `GetQueuedCompletionStatus` timeout in milliseconds.
93 :
94 : Bounds how long the IOCP scheduler blocks between timer
95 : rechecks. Lower values improve timer responsiveness at the
96 : cost of more syscalls. Applies to IOCP only.
97 : */
98 : unsigned gqcs_timeout_ms = 500;
99 :
100 : /** Thread pool size for blocking I/O (file I/O, DNS resolution).
101 :
102 : Sets the number of worker threads in the shared thread pool
103 : used by POSIX file services and DNS resolution. Must be at
104 : least 1. Applies to POSIX backends only; ignored on IOCP
105 : where file I/O uses native overlapped I/O.
106 : */
107 : unsigned thread_pool_size = 1;
108 :
109 : /** Enable single-threaded mode (disable scheduler locking).
110 :
111 : When true, the scheduler skips all mutex lock/unlock and
112 : condition variable operations on the hot path. This
113 : eliminates synchronization overhead when only one thread
114 : calls `run()`.
115 :
116 : @par Restrictions
117 : - Only one thread may call `run()` (or any run variant).
118 : - Posting work from another thread is undefined behavior.
119 : - DNS resolution returns `operation_not_supported`.
120 : - POSIX file I/O returns `operation_not_supported`.
121 : - Signal sets should not be shared across contexts.
122 :
123 : @note Constructing an `io_context` with `concurrency_hint == 1`
124 : automatically enables single-threaded mode regardless of
125 : this field's value, matching asio's convention. To opt out,
126 : pass `concurrency_hint > 1`.
127 : */
128 : bool single_threaded = false;
129 :
130 : /** Enable IORING_SETUP_SQPOLL on the io_uring backend.
131 :
132 : With SQPOLL, the kernel forks a thread that busy-polls the
133 : submission ring; submission becomes a userspace-only memory
134 : store, eliminating the io_uring_enter syscall on the submit
135 : path. Most useful for sustained traffic. Idle thread parks
136 : after `sq_thread_idle_ms` of no activity.
137 :
138 : Independent of `single_threaded`. Default: off.
139 :
140 : Ignored on non-io_uring backends.
141 : */
142 : bool enable_sqpoll = false;
143 :
144 : /** SQ-poll idle timeout in milliseconds.
145 :
146 : After this many ms of no submissions, the kernel polling
147 : thread sleeps; next submit re-wakes it via SQ_WAKEUP. 0
148 : means use the kernel default (1ms). Recommended for bursty
149 : workloads: 100-1000ms (avoids park/unpark thrash).
150 :
151 : Ignored unless `enable_sqpoll` is true. Ignored on
152 : non-io_uring backends.
153 : */
154 : unsigned sq_thread_idle_ms = 0;
155 :
156 : /** Pin the SQ-poll kernel thread to this CPU.
157 :
158 : -1 means do not pin (kernel scheduler picks). Pinning off
159 : the dispatch core is recommended on latency-sensitive
160 : deployments to avoid cache contention.
161 :
162 : Ignored unless `enable_sqpoll` is true. Ignored on
163 : non-io_uring backends.
164 : */
165 : int sq_thread_cpu = -1;
166 : };
167 :
168 : namespace detail {
169 : class timer_service;
170 : struct timer_service_access;
171 : } // namespace detail
172 :
173 : /** An I/O context for running asynchronous operations.
174 :
175 : The io_context provides an execution environment for async
176 : operations. It maintains a queue of pending work items and
177 : processes them when `run()` is called.
178 :
179 : The default and unsigned constructors select the platform's
180 : native backend:
181 : - Windows: IOCP
182 : - Linux: epoll
183 : - BSD/macOS: kqueue
184 : - Other POSIX: select
185 :
186 : The template constructor accepts a backend tag value to
187 : choose a specific backend at compile time:
188 :
189 : @par Example
190 : @code
191 : io_context ioc; // platform default
192 : io_context ioc2(corosio::epoll); // explicit backend
193 : @endcode
194 :
195 : @par Thread Safety
196 : Distinct objects: Safe.@n
197 : Shared objects: Safe, if using a concurrency hint greater
198 : than 1.
199 :
200 : @see epoll_t, select_t, kqueue_t, iocp_t
201 : */
202 : class BOOST_COROSIO_DECL io_context : public capy::execution_context
203 : {
204 : friend struct detail::timer_service_access;
205 :
206 : /// Pre-create services that depend on options (before construct).
207 : void apply_options_pre_(io_context_options const& opts);
208 :
209 : /// Apply runtime tuning to the scheduler (after construct).
210 : void apply_options_post_(
211 : io_context_options const& opts,
212 : unsigned concurrency_hint);
213 :
214 : /// Switch the scheduler to single-threaded (lockless) mode.
215 : void configure_single_threaded_();
216 :
217 : protected:
218 : detail::timer_service* timer_svc_ = nullptr;
219 : detail::scheduler* sched_;
220 :
221 : public:
222 : /** The executor type for this context. */
223 : class executor_type;
224 :
225 : /** Construct with default concurrency and platform backend.
226 :
227 : Uses `std::thread::hardware_concurrency()` clamped to a minimum
228 : of 2 as the concurrency hint, so the default constructor never
229 : silently engages single-threaded mode (see
230 : @ref io_context_options::single_threaded). Pass an explicit
231 : `concurrency_hint == 1` to opt into single-threaded mode.
232 : */
233 : io_context();
234 :
235 : /** Construct with a concurrency hint and platform backend.
236 :
237 : @param concurrency_hint Hint for the number of threads
238 : that will call `run()`.
239 : */
240 : explicit io_context(unsigned concurrency_hint);
241 :
242 : /** Construct with runtime tuning options and platform backend.
243 :
244 : @param opts Runtime options controlling scheduler and
245 : service behavior.
246 : @param concurrency_hint Hint for the number of threads
247 : that will call `run()`.
248 : */
249 : explicit io_context(
250 : io_context_options const& opts,
251 : unsigned concurrency_hint = std::thread::hardware_concurrency());
252 :
253 : /** Construct with an explicit backend tag.
254 :
255 : @param backend The backend tag value selecting the I/O
256 : multiplexer (e.g. `corosio::epoll`).
257 : @param concurrency_hint Hint for the number of threads
258 : that will call `run()`.
259 : */
260 : template<class Backend>
261 : requires requires { Backend::construct; }
262 HIT 848 : explicit io_context(
263 : Backend backend,
264 : unsigned concurrency_hint = std::thread::hardware_concurrency())
265 : : capy::execution_context(this)
266 848 : , sched_(nullptr)
267 : {
268 : (void)backend;
269 848 : sched_ = &Backend::construct(*this, concurrency_hint);
270 848 : if (concurrency_hint == 1)
271 2 : configure_single_threaded_();
272 848 : }
273 :
274 : /** Construct with an explicit backend tag and runtime options.
275 :
276 : @param backend The backend tag value selecting the I/O
277 : multiplexer (e.g. `corosio::epoll`).
278 : @param opts Runtime options controlling scheduler and
279 : service behavior.
280 : @param concurrency_hint Hint for the number of threads
281 : that will call `run()`.
282 : */
283 : template<class Backend>
284 : requires requires { Backend::construct; }
285 8 : explicit io_context(
286 : Backend backend,
287 : io_context_options const& opts,
288 : unsigned concurrency_hint = std::thread::hardware_concurrency())
289 : : capy::execution_context(this)
290 8 : , sched_(nullptr)
291 : {
292 : (void)backend;
293 8 : apply_options_pre_(opts);
294 8 : sched_ = &Backend::construct(*this, concurrency_hint);
295 8 : apply_options_post_(opts, concurrency_hint);
296 8 : }
297 :
298 : ~io_context();
299 :
300 : io_context(io_context const&) = delete;
301 : io_context& operator=(io_context const&) = delete;
302 :
303 : /** Return an executor for this context.
304 :
305 : The returned executor can be used to dispatch coroutines
306 : and post work items to this context.
307 :
308 : @return An executor associated with this context.
309 : */
310 : executor_type get_executor() const noexcept;
311 :
312 : /** Signal the context to stop processing.
313 :
314 : This causes `run()` to return as soon as possible. Any pending
315 : work items remain queued.
316 : */
317 5 : void stop()
318 : {
319 5 : sched_->stop();
320 5 : }
321 :
322 : /** Return whether the context has been stopped.
323 :
324 : @return `true` if `stop()` has been called and `restart()`
325 : has not been called since.
326 : */
327 34 : bool stopped() const noexcept
328 : {
329 34 : return sched_->stopped();
330 : }
331 :
332 : /** Restart the context after being stopped.
333 :
334 : This function must be called before `run()` can be called
335 : again after `stop()` has been called.
336 : */
337 156 : void restart()
338 : {
339 156 : sched_->restart();
340 156 : }
341 :
342 : /** Process all pending work items.
343 :
344 : This function blocks until all pending work items have been
345 : executed or `stop()` is called. The context is stopped
346 : when there is no more outstanding work.
347 :
348 : @note The context must be restarted with `restart()` before
349 : calling this function again after it returns.
350 :
351 : @return The number of handlers executed.
352 : */
353 667 : std::size_t run()
354 : {
355 667 : return sched_->run();
356 : }
357 :
358 : /** Process at most one pending work item.
359 :
360 : This function blocks until one work item has been executed
361 : or `stop()` is called. The context is stopped when there
362 : is no more outstanding work.
363 :
364 : @note The context must be restarted with `restart()` before
365 : calling this function again after it returns.
366 :
367 : @return The number of handlers executed (0 or 1).
368 : */
369 2 : std::size_t run_one()
370 : {
371 2 : return sched_->run_one();
372 : }
373 :
374 : /** Process work items for the specified duration.
375 :
376 : This function blocks until work items have been executed for
377 : the specified duration, or `stop()` is called. The context
378 : is stopped when there is no more outstanding work.
379 :
380 : @note The context must be restarted with `restart()` before
381 : calling this function again after it returns.
382 :
383 : @param rel_time The duration for which to process work.
384 :
385 : @return The number of handlers executed.
386 : */
387 : template<class Rep, class Period>
388 5 : std::size_t run_for(std::chrono::duration<Rep, Period> const& rel_time)
389 : {
390 5 : return run_until(std::chrono::steady_clock::now() + rel_time);
391 : }
392 :
393 : /** Process work items until the specified time.
394 :
395 : This function blocks until the specified time is reached
396 : or `stop()` is called. The context is stopped when there
397 : is no more outstanding work.
398 :
399 : @note The context must be restarted with `restart()` before
400 : calling this function again after it returns.
401 :
402 : @param abs_time The time point until which to process work.
403 :
404 : @return The number of handlers executed.
405 : */
406 : template<class Clock, class Duration>
407 : std::size_t
408 5 : run_until(std::chrono::time_point<Clock, Duration> const& abs_time)
409 : {
410 5 : std::size_t n = 0;
411 14 : while (run_one_until(abs_time))
412 9 : if (n != (std::numeric_limits<std::size_t>::max)())
413 9 : ++n;
414 5 : return n;
415 : }
416 :
417 : /** Process at most one work item for the specified duration.
418 :
419 : This function blocks until one work item has been executed,
420 : the specified duration has elapsed, or `stop()` is called.
421 : The context is stopped when there is no more outstanding work.
422 :
423 : @note The context must be restarted with `restart()` before
424 : calling this function again after it returns.
425 :
426 : @param rel_time The duration for which the call may block.
427 :
428 : @return The number of handlers executed (0 or 1).
429 : */
430 : template<class Rep, class Period>
431 3 : std::size_t run_one_for(std::chrono::duration<Rep, Period> const& rel_time)
432 : {
433 3 : return run_one_until(std::chrono::steady_clock::now() + rel_time);
434 : }
435 :
436 : /** Process at most one work item until the specified time.
437 :
438 : This function blocks until one work item has been executed,
439 : the specified time is reached, or `stop()` is called.
440 : The context is stopped when there is no more outstanding work.
441 :
442 : @note The context must be restarted with `restart()` before
443 : calling this function again after it returns.
444 :
445 : @param abs_time The time point until which the call may block.
446 :
447 : @return The number of handlers executed (0 or 1).
448 : */
449 : template<class Clock, class Duration>
450 : std::size_t
451 21 : run_one_until(std::chrono::time_point<Clock, Duration> const& abs_time)
452 : {
453 21 : typename Clock::time_point now = Clock::now();
454 4 : for (;;)
455 : {
456 25 : auto rel_time = abs_time - now;
457 : using rel_type = decltype(rel_time);
458 25 : if (rel_time < rel_type::zero())
459 2 : rel_time = rel_type::zero();
460 23 : else if (rel_time > std::chrono::seconds(1))
461 11 : rel_time = std::chrono::seconds(1);
462 :
463 25 : std::size_t s = sched_->wait_one(
464 : static_cast<long>(
465 25 : std::chrono::duration_cast<std::chrono::microseconds>(
466 : rel_time)
467 25 : .count()));
468 :
469 25 : if (s || stopped())
470 21 : return s;
471 :
472 6 : now = Clock::now();
473 6 : if (now >= abs_time)
474 2 : return 0;
475 : }
476 : }
477 :
478 : /** Process all ready work items without blocking.
479 :
480 : This function executes all work items that are ready to run
481 : without blocking for more work. The context is stopped
482 : when there is no more outstanding work.
483 :
484 : @note The context must be restarted with `restart()` before
485 : calling this function again after it returns.
486 :
487 : @return The number of handlers executed.
488 : */
489 24 : std::size_t poll()
490 : {
491 24 : return sched_->poll();
492 : }
493 :
494 : /** Process at most one ready work item without blocking.
495 :
496 : This function executes at most one work item that is ready
497 : to run without blocking for more work. The context is
498 : stopped when there is no more outstanding work.
499 :
500 : @note The context must be restarted with `restart()` before
501 : calling this function again after it returns.
502 :
503 : @return The number of handlers executed (0 or 1).
504 : */
505 4 : std::size_t poll_one()
506 : {
507 4 : return sched_->poll_one();
508 : }
509 : };
510 :
511 : /** An executor for dispatching work to an I/O context.
512 :
513 : The executor provides the interface for posting work items and
514 : dispatching coroutines to the associated context. It satisfies
515 : the `capy::Executor` concept.
516 :
517 : Executors are lightweight handles that can be copied and compared
518 : for equality. Two executors compare equal if they refer to the
519 : same context.
520 :
521 : @par Thread Safety
522 : Distinct objects: Safe.@n
523 : Shared objects: Safe.
524 : */
525 : class io_context::executor_type
526 : {
527 : io_context* ctx_ = nullptr;
528 :
529 : public:
530 : /** Default constructor.
531 :
532 : Constructs an executor not associated with any context.
533 : */
534 : executor_type() = default;
535 :
536 : /** Construct an executor from a context.
537 :
538 : @param ctx The context to associate with this executor.
539 : */
540 937 : explicit executor_type(io_context& ctx) noexcept : ctx_(&ctx) {}
541 :
542 : /** Return a reference to the associated execution context.
543 :
544 : @return Reference to the context.
545 : */
546 1728 : io_context& context() const noexcept
547 : {
548 1728 : return *ctx_;
549 : }
550 :
551 : /** Check if the current thread is running this executor's context.
552 :
553 : @return `true` if `run()` is being called on this thread.
554 : */
555 1756 : bool running_in_this_thread() const noexcept
556 : {
557 1756 : return ctx_->sched_->running_in_this_thread();
558 : }
559 :
560 : /** Informs the executor that work is beginning.
561 :
562 : Must be paired with `on_work_finished()`.
563 : */
564 1920 : void on_work_started() const noexcept
565 : {
566 1920 : ctx_->sched_->work_started();
567 1920 : }
568 :
569 : /** Informs the executor that work has completed.
570 :
571 : @par Preconditions
572 : A preceding call to `on_work_started()` on an equal executor.
573 : */
574 1887 : void on_work_finished() const noexcept
575 : {
576 1887 : ctx_->sched_->work_finished();
577 1887 : }
578 :
579 : /** Dispatch a continuation.
580 :
581 : Returns a handle for symmetric transfer. If called from
582 : within `run()`, returns `c.h`. Otherwise posts the
583 : enclosing continuation_op as a scheduler_op for later
584 : execution and returns `std::noop_coroutine()`.
585 :
586 : @param c The continuation to dispatch. Must be the `cont`
587 : member of a `detail::continuation_op`.
588 :
589 : @return A handle for symmetric transfer or `std::noop_coroutine()`.
590 : */
591 1754 : std::coroutine_handle<> dispatch(capy::continuation& c) const
592 : {
593 1754 : if (running_in_this_thread())
594 613 : return c.h;
595 1141 : post(c);
596 1141 : return std::noop_coroutine();
597 : }
598 :
599 : /** Post a continuation for deferred execution.
600 :
601 : If the continuation is backed by a continuation_op
602 : (tagged), posts it directly as a scheduler_op — zero
603 : heap allocation. Otherwise falls back to the
604 : heap-allocating post(coroutine_handle<>) path.
605 : */
606 5311 : void post(capy::continuation& c) const
607 : {
608 5311 : auto* op = detail::continuation_op::try_from_continuation(c);
609 5311 : if (op)
610 4167 : ctx_->sched_->post(op);
611 : else
612 1144 : ctx_->sched_->post(c.h);
613 5311 : }
614 :
615 : /** Post a bare coroutine handle for deferred execution.
616 :
617 : Heap-allocates a scheduler_op to wrap the handle. Prefer
618 : posting through a continuation_op-backed continuation when
619 : the continuation has suitable lifetime.
620 :
621 : @param h The coroutine handle to post.
622 : */
623 1441 : void post(std::coroutine_handle<> h) const
624 : {
625 1441 : ctx_->sched_->post(h);
626 1441 : }
627 :
628 : /** Compare two executors for equality.
629 :
630 : @return `true` if both executors refer to the same context.
631 : */
632 1 : bool operator==(executor_type const& other) const noexcept
633 : {
634 1 : return ctx_ == other.ctx_;
635 : }
636 :
637 : /** Compare two executors for inequality.
638 :
639 : @return `true` if the executors refer to different contexts.
640 : */
641 : bool operator!=(executor_type const& other) const noexcept
642 : {
643 : return ctx_ != other.ctx_;
644 : }
645 : };
646 :
647 : inline io_context::executor_type
648 937 : io_context::get_executor() const noexcept
649 : {
650 937 : return executor_type(const_cast<io_context&>(*this));
651 : }
652 :
653 : } // namespace boost::corosio
654 :
655 : #endif // BOOST_COROSIO_IO_CONTEXT_HPP
|