include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

71.1% Lines (241/339) 78.3% List of functions (36/46)
reactor_scheduler.hpp
f(x) Functions (46)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 741980x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :90 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :255 487x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :261 64x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :272 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :283 591x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :328 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :329 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :375 487x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :383 487x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :395 487x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :409 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :437 95177x 50.0% 45.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :463 394006x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const :477 2167x 100.0% 84.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :483 2167x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :484 4334x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :486 2158x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :495 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :520 96469x 100.0% 87.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :537 1337x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :543 448x 100.0% 82.0% boost::corosio::detail::reactor_scheduler::stopped() const :555 62x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :561 117x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :567 418x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::run_one() :592 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :606 102x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::poll() :620 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::poll_one() :645 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::work_started() :659 26348x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :665 37087x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :672 152824x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :680 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :693 16059x 30.0% 35.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :710 591x 100.0% 88.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :727 997x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :734 2333x 57.1% 50.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :748 315595x 85.7% 80.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :760 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :766 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :778 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :790 2333x 87.5% 92.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :808 267502x 92.3% 92.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :832 188981x 83.3% 86.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :853 267934x 68.9% 55.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 741980x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80 741980x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82 738908x if (c->key == self)
83 738908x return c;
84 }
85 3072x return nullptr;
86 }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112 if (!ctx || ctx->private_queue.empty())
113 return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Epoll and kqueue do not need write-direction notification.
152 static constexpr bool needs_write_notification = false;
153
154 /// Post a coroutine for deferred execution.
155 void post(std::coroutine_handle<> h) const override;
156
157 /// Post a scheduler operation for deferred execution.
158 void post(scheduler_op* h) const override;
159
160 /// Return true if called from a thread running this scheduler.
161 bool running_in_this_thread() const noexcept override;
162
163 /// Request the scheduler to stop dispatching handlers.
164 void stop() override;
165
166 /// Return true if the scheduler has been stopped.
167 bool stopped() const noexcept override;
168
169 /// Reset the stopped state so `run()` can resume.
170 void restart() override;
171
172 /// Run the event loop until no work remains.
173 std::size_t run() override;
174
175 /// Run until one handler completes or no work remains.
176 std::size_t run_one() override;
177
178 /// Run until one handler completes or @a usec elapses.
179 std::size_t wait_one(long usec) override;
180
181 /// Run ready handlers without blocking.
182 std::size_t poll() override;
183
184 /// Run at most one ready handler without blocking.
185 std::size_t poll_one() override;
186
187 /// Increment the outstanding work count.
188 void work_started() noexcept override;
189
190 /// Decrement the outstanding work count, stopping on zero.
191 void work_finished() noexcept override;
192
193 /** Reset the thread's inline completion budget.
194
195 Called at the start of each posted completion handler to
196 grant a fresh budget for speculative inline completions.
197 */
198 void reset_inline_budget() const noexcept;
199
200 /** Consume one unit of inline budget if available.
201
202 @return True if budget was available and consumed.
203 */
204 bool try_consume_inline_budget() const noexcept;
205
206 /** Offset a forthcoming work_finished from work_cleanup.
207
208 Called by descriptor_state when all I/O returned EAGAIN and
209 no handler will be executed. Must be called from a scheduler
210 thread.
211 */
212 void compensating_work_started() const noexcept;
213
214 /** Drain work from thread context's private queue to global queue.
215
216 Flushes private work count to the global counter, then
217 transfers the queue under mutex protection.
218
219 @param queue The private queue to drain.
220 @param count Private work count to flush before draining.
221 */
222 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
223
224 /** Post completed operations for deferred invocation.
225
226 If called from a thread running this scheduler, operations
227 go to the thread's private queue (fast path). Otherwise,
228 operations are added to the global queue under mutex and a
229 waiter is signaled.
230
231 @par Preconditions
232 work_started() must have been called for each operation.
233
234 @param ops Queue of operations to post.
235 */
236 void post_deferred_completions(op_queue& ops) const;
237
238 /** Apply runtime configuration to the scheduler.
239
240 Called by `io_context` after construction. Values that do
241 not apply to this backend are silently ignored.
242
243 @param max_events Event buffer size for epoll/kqueue.
244 @param budget_init Starting inline completion budget.
245 @param budget_max Hard ceiling on adaptive budget ramp-up.
246 @param unassisted Budget when single-threaded.
247 */
248 virtual void configure_reactor(
249 unsigned max_events,
250 unsigned budget_init,
251 unsigned budget_max,
252 unsigned unassisted);
253
254 /// Return the configured initial inline budget.
255 487x unsigned inline_budget_initial() const noexcept
256 {
257 487x return inline_budget_initial_;
258 }
259
260 /// Return true if single-threaded (lockless) mode is active.
261 64x bool is_single_threaded() const noexcept
262 {
263 64x return single_threaded_;
264 }
265
266 /** Enable or disable single-threaded (lockless) mode.
267
268 When enabled, all scheduler mutex and condition variable
269 operations become no-ops. Cross-thread post() is
270 undefined behavior.
271 */
272 void configure_single_threaded(bool v) noexcept
273 {
274 single_threaded_ = v;
275 mutex_.set_enabled(!v);
276 cond_.set_enabled(!v);
277 }
278
279 protected:
280 timer_service* timer_svc_ = nullptr;
281 bool single_threaded_ = false;
282
283 591x reactor_scheduler() = default;
284
285 /** Drain completed_ops during shutdown.
286
287 Pops all operations from the global queue and destroys them,
288 skipping the task sentinel. Signals all waiting threads.
289 Derived classes call this from their shutdown() override
290 before performing platform-specific cleanup.
291 */
292 void shutdown_drain();
293
294 /// RAII guard that re-inserts the task sentinel after `run_task`.
295 struct task_cleanup
296 {
297 reactor_scheduler const* sched;
298 lock_type* lock;
299 context_type* ctx;
300 ~task_cleanup();
301 };
302
303 mutable mutex_type mutex_{true};
304 mutable event_type cond_{true};
305 mutable op_queue completed_ops_;
306 mutable std::atomic<std::int64_t> outstanding_work_{0};
307 std::atomic<bool> stopped_{false};
308 mutable std::atomic<bool> task_running_{false};
309 mutable bool task_interrupted_ = false;
310
311 // Runtime-configurable reactor tuning parameters.
312 // Defaults match the library's built-in values.
313 unsigned max_events_per_poll_ = 128;
314 unsigned inline_budget_initial_ = 2;
315 unsigned inline_budget_max_ = 16;
316 unsigned unassisted_budget_ = 4;
317
318 /// Bit 0 of `state_`: set when the condvar should be signaled.
319 static constexpr std::size_t signaled_bit = 1;
320
321 /// Increment per waiting thread in `state_`.
322 static constexpr std::size_t waiter_increment = 2;
323 mutable std::size_t state_ = 0;
324
325 /// Sentinel op that triggers a reactor poll when dequeued.
326 struct task_op final : scheduler_op
327 {
328 void operator()() override {}
329 void destroy() override {}
330 };
331 task_op task_op_;
332
333 /// Run the platform-specific reactor poll.
334 virtual void
335 run_task(lock_type& lock, context_type* ctx,
336 long timeout_us) = 0;
337
338 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
339 virtual void interrupt_reactor() const = 0;
340
341 private:
342 struct work_cleanup
343 {
344 reactor_scheduler* sched;
345 lock_type* lock;
346 context_type* ctx;
347 ~work_cleanup();
348 };
349
350 std::size_t do_one(
351 lock_type& lock, long timeout_us, context_type* ctx);
352
353 void signal_all(lock_type& lock) const;
354 bool maybe_unlock_and_signal_one(lock_type& lock) const;
355 bool unlock_and_signal_one(lock_type& lock) const;
356 void clear_signal() const;
357 void wait_for_signal(lock_type& lock) const;
358 void wait_for_signal_for(
359 lock_type& lock, long timeout_us) const;
360 void wake_one_thread_and_unlock(lock_type& lock) const;
361 };
362
363 /** RAII guard that pushes/pops a scheduler context frame.
364
365 On construction, pushes a new context frame onto the
366 thread-local stack. On destruction, drains any remaining
367 private queue items to the global queue and pops the frame.
368 */
369 struct reactor_thread_context_guard
370 {
371 /// The context frame managed by this guard.
372 reactor_scheduler_context frame_;
373
374 /// Construct the guard, pushing a frame for @a sched.
375 487x explicit reactor_thread_context_guard(
376 reactor_scheduler const* sched) noexcept
377 487x : frame_(sched, reactor_context_stack.get())
378 {
379 487x reactor_context_stack.set(&frame_);
380 487x }
381
382 /// Destroy the guard, draining private work and popping the frame.
383 487x ~reactor_thread_context_guard() noexcept
384 {
385 487x if (!frame_.private_queue.empty())
386 frame_.key->drain_thread_queue(
387 frame_.private_queue, frame_.private_outstanding_work);
388 487x reactor_context_stack.set(frame_.next);
389 487x }
390 };
391
392 // ---- Inline implementations ------------------------------------------------
393
394 inline
395 487x reactor_scheduler_context::reactor_scheduler_context(
396 reactor_scheduler const* k,
397 487x reactor_scheduler_context* n)
398 487x : key(k)
399 487x , next(n)
400 487x , private_outstanding_work(0)
401 487x , inline_budget(0)
402 487x , inline_budget_max(
403 487x static_cast<int>(k->inline_budget_initial()))
404 487x , unassisted(false)
405 {
406 487x }
407
408 inline void
409 reactor_scheduler::configure_reactor(
410 unsigned max_events,
411 unsigned budget_init,
412 unsigned budget_max,
413 unsigned unassisted)
414 {
415 if (max_events < 1 ||
416 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 throw std::out_of_range(
418 "max_events_per_poll must be in [1, INT_MAX]");
419 if (budget_max < 1 ||
420 budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
421 throw std::out_of_range(
422 "inline_budget_max must be in [1, INT_MAX]");
423
424 // Clamp initial and unassisted to budget_max.
425 if (budget_init > budget_max)
426 budget_init = budget_max;
427 if (unassisted > budget_max)
428 unassisted = budget_max;
429
430 max_events_per_poll_ = max_events;
431 inline_budget_initial_ = budget_init;
432 inline_budget_max_ = budget_max;
433 unassisted_budget_ = unassisted;
434 }
435
436 inline void
437 95177x reactor_scheduler::reset_inline_budget() const noexcept
438 {
439 95177x if (auto* ctx = reactor_find_context(this))
440 {
441 // Cap when no other thread absorbed queued work
442 95177x if (ctx->unassisted)
443 {
444 95177x ctx->inline_budget_max =
445 95177x static_cast<int>(unassisted_budget_);
446 95177x ctx->inline_budget =
447 95177x static_cast<int>(unassisted_budget_);
448 95177x return;
449 }
450 // Ramp up when previous cycle fully consumed budget
451 if (ctx->inline_budget == 0)
452 ctx->inline_budget_max = (std::min)(
453 ctx->inline_budget_max * 2,
454 static_cast<int>(inline_budget_max_));
455 else if (ctx->inline_budget < ctx->inline_budget_max)
456 ctx->inline_budget_max =
457 static_cast<int>(inline_budget_initial_);
458 ctx->inline_budget = ctx->inline_budget_max;
459 }
460 }
461
462 inline bool
463 394006x reactor_scheduler::try_consume_inline_budget() const noexcept
464 {
465 394006x if (auto* ctx = reactor_find_context(this))
466 {
467 394006x if (ctx->inline_budget > 0)
468 {
469 315203x --ctx->inline_budget;
470 315203x return true;
471 }
472 }
473 78803x return false;
474 }
475
476 inline void
477 2167x reactor_scheduler::post(std::coroutine_handle<> h) const
478 {
479 struct post_handler final : scheduler_op
480 {
481 std::coroutine_handle<> h_;
482
483 2167x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
484 4334x ~post_handler() override = default;
485
486 2158x void operator()() override
487 {
488 2158x auto saved = h_;
489 2158x delete this;
490 // Ensure stores from the posting thread are visible
491 std::atomic_thread_fence(std::memory_order_acquire);
492 2158x saved.resume();
493 2158x }
494
495 9x void destroy() override
496 {
497 9x auto saved = h_;
498 9x delete this;
499 9x saved.destroy();
500 9x }
501 };
502
503 2167x auto ph = std::make_unique<post_handler>(h);
504
505 2167x if (auto* ctx = reactor_find_context(this))
506 {
507 6x ++ctx->private_outstanding_work;
508 6x ctx->private_queue.push(ph.release());
509 6x return;
510 }
511
512 2161x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
513
514 2161x lock_type lock(mutex_);
515 2161x completed_ops_.push(ph.release());
516 2161x wake_one_thread_and_unlock(lock);
517 2167x }
518
519 inline void
520 96469x reactor_scheduler::post(scheduler_op* h) const
521 {
522 96469x if (auto* ctx = reactor_find_context(this))
523 {
524 96297x ++ctx->private_outstanding_work;
525 96297x ctx->private_queue.push(h);
526 96297x return;
527 }
528
529 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
530
531 172x lock_type lock(mutex_);
532 172x completed_ops_.push(h);
533 172x wake_one_thread_and_unlock(lock);
534 172x }
535
536 inline bool
537 1337x reactor_scheduler::running_in_this_thread() const noexcept
538 {
539 1337x return reactor_find_context(this) != nullptr;
540 }
541
542 inline void
543 448x reactor_scheduler::stop()
544 {
545 448x lock_type lock(mutex_);
546 448x if (!stopped_.load(std::memory_order_acquire))
547 {
548 406x stopped_.store(true, std::memory_order_release);
549 406x signal_all(lock);
550 406x interrupt_reactor();
551 }
552 448x }
553
554 inline bool
555 62x reactor_scheduler::stopped() const noexcept
556 {
557 62x return stopped_.load(std::memory_order_acquire);
558 }
559
560 inline void
561 117x reactor_scheduler::restart()
562 {
563 117x stopped_.store(false, std::memory_order_release);
564 117x }
565
566 inline std::size_t
567 418x reactor_scheduler::run()
568 {
569 836x if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 32x stop();
572 32x return 0;
573 }
574
575 386x reactor_thread_context_guard ctx(this);
576 386x lock_type lock(mutex_);
577
578 386x std::size_t n = 0;
579 for (;;)
580 {
581 267827x if (!do_one(lock, -1, &ctx.frame_))
582 386x break;
583 267441x if (n != (std::numeric_limits<std::size_t>::max)())
584 267441x ++n;
585 267441x if (!lock.owns_lock())
586 179583x lock.lock();
587 }
588 386x return n;
589 386x }
590
591 inline std::size_t
592 2x reactor_scheduler::run_one()
593 {
594 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
595 {
596 stop();
597 return 0;
598 }
599
600 2x reactor_thread_context_guard ctx(this);
601 2x lock_type lock(mutex_);
602 2x return do_one(lock, -1, &ctx.frame_);
603 2x }
604
605 inline std::size_t
606 102x reactor_scheduler::wait_one(long usec)
607 {
608 204x if (outstanding_work_.load(std::memory_order_acquire) == 0)
609 {
610 10x stop();
611 10x return 0;
612 }
613
614 92x reactor_thread_context_guard ctx(this);
615 92x lock_type lock(mutex_);
616 92x return do_one(lock, usec, &ctx.frame_);
617 92x }
618
619 inline std::size_t
620 6x reactor_scheduler::poll()
621 {
622 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
623 {
624 1x stop();
625 1x return 0;
626 }
627
628 5x reactor_thread_context_guard ctx(this);
629 5x lock_type lock(mutex_);
630
631 5x std::size_t n = 0;
632 for (;;)
633 {
634 11x if (!do_one(lock, 0, &ctx.frame_))
635 5x break;
636 6x if (n != (std::numeric_limits<std::size_t>::max)())
637 6x ++n;
638 6x if (!lock.owns_lock())
639 6x lock.lock();
640 }
641 5x return n;
642 5x }
643
644 inline std::size_t
645 4x reactor_scheduler::poll_one()
646 {
647 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
648 {
649 2x stop();
650 2x return 0;
651 }
652
653 2x reactor_thread_context_guard ctx(this);
654 2x lock_type lock(mutex_);
655 2x return do_one(lock, 0, &ctx.frame_);
656 2x }
657
658 inline void
659 26348x reactor_scheduler::work_started() noexcept
660 {
661 26348x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
662 26348x }
663
664 inline void
665 37087x reactor_scheduler::work_finished() noexcept
666 {
667 74174x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
668 398x stop();
669 37087x }
670
671 inline void
672 152824x reactor_scheduler::compensating_work_started() const noexcept
673 {
674 152824x auto* ctx = reactor_find_context(this);
675 152824x if (ctx)
676 152824x ++ctx->private_outstanding_work;
677 152824x }
678
679 inline void
680 reactor_scheduler::drain_thread_queue(
681 op_queue& queue, std::int64_t count) const
682 {
683 if (count > 0)
684 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
685
686 lock_type lock(mutex_);
687 completed_ops_.splice(queue);
688 if (count > 0)
689 maybe_unlock_and_signal_one(lock);
690 }
691
692 inline void
693 16059x reactor_scheduler::post_deferred_completions(op_queue& ops) const
694 {
695 16059x if (ops.empty())
696 16059x return;
697
698 if (auto* ctx = reactor_find_context(this))
699 {
700 ctx->private_queue.splice(ops);
701 return;
702 }
703
704 lock_type lock(mutex_);
705 completed_ops_.splice(ops);
706 wake_one_thread_and_unlock(lock);
707 }
708
709 inline void
710 591x reactor_scheduler::shutdown_drain()
711 {
712 591x lock_type lock(mutex_);
713
714 1290x while (auto* h = completed_ops_.pop())
715 {
716 699x if (h == &task_op_)
717 591x continue;
718 108x lock.unlock();
719 108x h->destroy();
720 108x lock.lock();
721 699x }
722
723 591x signal_all(lock);
724 591x }
725
726 inline void
727 997x reactor_scheduler::signal_all(lock_type&) const
728 {
729 997x state_ |= signaled_bit;
730 997x cond_.notify_all();
731 997x }
732
733 inline bool
734 2333x reactor_scheduler::maybe_unlock_and_signal_one(
735 lock_type& lock) const
736 {
737 2333x state_ |= signaled_bit;
738 2333x if (state_ > signaled_bit)
739 {
740 lock.unlock();
741 cond_.notify_one();
742 return true;
743 }
744 2333x return false;
745 }
746
747 inline bool
748 315595x reactor_scheduler::unlock_and_signal_one(
749 lock_type& lock) const
750 {
751 315595x state_ |= signaled_bit;
752 315595x bool have_waiters = state_ > signaled_bit;
753 315595x lock.unlock();
754 315595x if (have_waiters)
755 cond_.notify_one();
756 315595x return have_waiters;
757 }
758
759 inline void
760 reactor_scheduler::clear_signal() const
761 {
762 state_ &= ~signaled_bit;
763 }
764
765 inline void
766 reactor_scheduler::wait_for_signal(
767 lock_type& lock) const
768 {
769 while ((state_ & signaled_bit) == 0)
770 {
771 state_ += waiter_increment;
772 cond_.wait(lock);
773 state_ -= waiter_increment;
774 }
775 }
776
777 inline void
778 reactor_scheduler::wait_for_signal_for(
779 lock_type& lock, long timeout_us) const
780 {
781 if ((state_ & signaled_bit) == 0)
782 {
783 state_ += waiter_increment;
784 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
785 state_ -= waiter_increment;
786 }
787 }
788
789 inline void
790 2333x reactor_scheduler::wake_one_thread_and_unlock(
791 lock_type& lock) const
792 {
793 2333x if (maybe_unlock_and_signal_one(lock))
794 return;
795
796 2333x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
797 {
798 56x task_interrupted_ = true;
799 56x lock.unlock();
800 56x interrupt_reactor();
801 }
802 else
803 {
804 2277x lock.unlock();
805 }
806 }
807
808 267502x inline reactor_scheduler::work_cleanup::~work_cleanup()
809 {
810 267502x if (ctx)
811 {
812 267502x std::int64_t produced = ctx->private_outstanding_work;
813 267502x if (produced > 1)
814 15x sched->outstanding_work_.fetch_add(
815 produced - 1, std::memory_order_relaxed);
816 267487x else if (produced < 1)
817 26798x sched->work_finished();
818 267502x ctx->private_outstanding_work = 0;
819
820 267502x if (!ctx->private_queue.empty())
821 {
822 87880x lock->lock();
823 87880x sched->completed_ops_.splice(ctx->private_queue);
824 }
825 }
826 else
827 {
828 sched->work_finished();
829 }
830 267502x }
831
832 377962x inline reactor_scheduler::task_cleanup::~task_cleanup()
833 {
834 188981x if (!ctx)
835 return;
836
837 188981x if (ctx->private_outstanding_work > 0)
838 {
839 8398x sched->outstanding_work_.fetch_add(
840 8398x ctx->private_outstanding_work, std::memory_order_relaxed);
841 8398x ctx->private_outstanding_work = 0;
842 }
843
844 188981x if (!ctx->private_queue.empty())
845 {
846 8398x if (!lock->owns_lock())
847 lock->lock();
848 8398x sched->completed_ops_.splice(ctx->private_queue);
849 }
850 188981x }
851
852 inline std::size_t
853 267934x reactor_scheduler::do_one(
854 lock_type& lock, long timeout_us, context_type* ctx)
855 {
856 for (;;)
857 {
858 456874x if (stopped_.load(std::memory_order_acquire))
859 387x return 0;
860
861 456487x scheduler_op* op = completed_ops_.pop();
862
863 // Handle reactor sentinel — time to poll for I/O
864 456487x if (op == &task_op_)
865 {
866 bool more_handlers =
867 188985x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
868
869 329877x if (!more_handlers &&
870 281784x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
871 timeout_us == 0))
872 {
873 4x completed_ops_.push(&task_op_);
874 4x return 0;
875 }
876
877 188981x long task_timeout_us = more_handlers ? 0 : timeout_us;
878 188981x task_interrupted_ = task_timeout_us == 0;
879 188981x task_running_.store(true, std::memory_order_release);
880
881 188981x if (more_handlers)
882 48093x unlock_and_signal_one(lock);
883
884 try
885 {
886 188981x run_task(lock, ctx, task_timeout_us);
887 }
888 catch (...)
889 {
890 task_running_.store(false, std::memory_order_relaxed);
891 throw;
892 }
893
894 188981x task_running_.store(false, std::memory_order_relaxed);
895 188981x completed_ops_.push(&task_op_);
896 188981x if (timeout_us > 0)
897 41x return 0;
898 188940x continue;
899 188940x }
900
901 // Handle operation
902 267502x if (op != nullptr)
903 {
904 267502x bool more = !completed_ops_.empty();
905
906 267502x if (more)
907 267502x ctx->unassisted = !unlock_and_signal_one(lock);
908 else
909 {
910 ctx->unassisted = false;
911 lock.unlock();
912 }
913
914 267502x work_cleanup on_exit{this, &lock, ctx};
915 (void)on_exit;
916
917 267502x (*op)();
918 267502x return 1;
919 267502x }
920
921 // Try private queue before blocking
922 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
923 continue;
924
925 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
926 timeout_us == 0)
927 return 0;
928
929 clear_signal();
930 if (timeout_us < 0)
931 wait_for_signal(lock);
932 else
933 wait_for_signal_for(lock, timeout_us);
934 188940x }
935 }
936
937 } // namespace boost::corosio::detail
938
939 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
940