include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

71.1% Lines (241/339) 78.3% List of functions (36/46)
reactor_scheduler.hpp
f(x) Functions (46)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 730331x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :90 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :255 482x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :261 64x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :272 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :283 587x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :328 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :329 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :375 482x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :383 482x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :395 482x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :409 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :437 90344x 50.0% 45.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :463 399312x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const :477 2155x 100.0% 84.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :483 2155x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :484 4310x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :486 2146x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :495 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :520 91632x 100.0% 87.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :537 1322x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :543 441x 100.0% 82.0% boost::corosio::detail::reactor_scheduler::stopped() const :555 62x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :561 111x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :567 412x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::run_one() :592 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :606 102x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::poll() :620 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::poll_one() :645 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::work_started() :659 17504x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :665 25287x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :672 145566x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :680 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :693 10175x 30.0% 35.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :710 587x 100.0% 88.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :727 987x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :734 2321x 57.1% 50.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :748 296180x 85.7% 80.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :760 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :766 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :778 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :790 2321x 87.5% 92.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :808 249511x 92.3% 92.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :832 170525x 83.3% 86.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :853 249938x 68.9% 55.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 730331x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80 730331x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82 727283x if (c->key == self)
83 727283x return c;
84 }
85 3048x return nullptr;
86 }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112 if (!ctx || ctx->private_queue.empty())
113 return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Epoll and kqueue do not need write-direction notification.
152 static constexpr bool needs_write_notification = false;
153
154 /// Post a coroutine for deferred execution.
155 void post(std::coroutine_handle<> h) const override;
156
157 /// Post a scheduler operation for deferred execution.
158 void post(scheduler_op* h) const override;
159
160 /// Return true if called from a thread running this scheduler.
161 bool running_in_this_thread() const noexcept override;
162
163 /// Request the scheduler to stop dispatching handlers.
164 void stop() override;
165
166 /// Return true if the scheduler has been stopped.
167 bool stopped() const noexcept override;
168
169 /// Reset the stopped state so `run()` can resume.
170 void restart() override;
171
172 /// Run the event loop until no work remains.
173 std::size_t run() override;
174
175 /// Run until one handler completes or no work remains.
176 std::size_t run_one() override;
177
178 /// Run until one handler completes or @a usec elapses.
179 std::size_t wait_one(long usec) override;
180
181 /// Run ready handlers without blocking.
182 std::size_t poll() override;
183
184 /// Run at most one ready handler without blocking.
185 std::size_t poll_one() override;
186
187 /// Increment the outstanding work count.
188 void work_started() noexcept override;
189
190 /// Decrement the outstanding work count, stopping on zero.
191 void work_finished() noexcept override;
192
193 /** Reset the thread's inline completion budget.
194
195 Called at the start of each posted completion handler to
196 grant a fresh budget for speculative inline completions.
197 */
198 void reset_inline_budget() const noexcept;
199
200 /** Consume one unit of inline budget if available.
201
202 @return True if budget was available and consumed.
203 */
204 bool try_consume_inline_budget() const noexcept;
205
206 /** Offset a forthcoming work_finished from work_cleanup.
207
208 Called by descriptor_state when all I/O returned EAGAIN and
209 no handler will be executed. Must be called from a scheduler
210 thread.
211 */
212 void compensating_work_started() const noexcept;
213
214 /** Drain work from thread context's private queue to global queue.
215
216 Flushes private work count to the global counter, then
217 transfers the queue under mutex protection.
218
219 @param queue The private queue to drain.
220 @param count Private work count to flush before draining.
221 */
222 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
223
224 /** Post completed operations for deferred invocation.
225
226 If called from a thread running this scheduler, operations
227 go to the thread's private queue (fast path). Otherwise,
228 operations are added to the global queue under mutex and a
229 waiter is signaled.
230
231 @par Preconditions
232 work_started() must have been called for each operation.
233
234 @param ops Queue of operations to post.
235 */
236 void post_deferred_completions(op_queue& ops) const;
237
238 /** Apply runtime configuration to the scheduler.
239
240 Called by `io_context` after construction. Values that do
241 not apply to this backend are silently ignored.
242
243 @param max_events Event buffer size for epoll/kqueue.
244 @param budget_init Starting inline completion budget.
245 @param budget_max Hard ceiling on adaptive budget ramp-up.
246 @param unassisted Budget when single-threaded.
247 */
248 virtual void configure_reactor(
249 unsigned max_events,
250 unsigned budget_init,
251 unsigned budget_max,
252 unsigned unassisted);
253
254 /// Return the configured initial inline budget.
255 482x unsigned inline_budget_initial() const noexcept
256 {
257 482x return inline_budget_initial_;
258 }
259
260 /// Return true if single-threaded (lockless) mode is active.
261 64x bool is_single_threaded() const noexcept
262 {
263 64x return single_threaded_;
264 }
265
266 /** Enable or disable single-threaded (lockless) mode.
267
268 When enabled, all scheduler mutex and condition variable
269 operations become no-ops. Cross-thread post() is
270 undefined behavior.
271 */
272 void configure_single_threaded(bool v) noexcept
273 {
274 single_threaded_ = v;
275 mutex_.set_enabled(!v);
276 cond_.set_enabled(!v);
277 }
278
279 protected:
280 timer_service* timer_svc_ = nullptr;
281 bool single_threaded_ = false;
282
283 587x reactor_scheduler() = default;
284
285 /** Drain completed_ops during shutdown.
286
287 Pops all operations from the global queue and destroys them,
288 skipping the task sentinel. Signals all waiting threads.
289 Derived classes call this from their shutdown() override
290 before performing platform-specific cleanup.
291 */
292 void shutdown_drain();
293
294 /// RAII guard that re-inserts the task sentinel after `run_task`.
295 struct task_cleanup
296 {
297 reactor_scheduler const* sched;
298 lock_type* lock;
299 context_type* ctx;
300 ~task_cleanup();
301 };
302
303 mutable mutex_type mutex_{true};
304 mutable event_type cond_{true};
305 mutable op_queue completed_ops_;
306 mutable std::atomic<std::int64_t> outstanding_work_{0};
307 std::atomic<bool> stopped_{false};
308 mutable std::atomic<bool> task_running_{false};
309 mutable bool task_interrupted_ = false;
310
311 // Runtime-configurable reactor tuning parameters.
312 // Defaults match the library's built-in values.
313 unsigned max_events_per_poll_ = 128;
314 unsigned inline_budget_initial_ = 2;
315 unsigned inline_budget_max_ = 16;
316 unsigned unassisted_budget_ = 4;
317
318 /// Bit 0 of `state_`: set when the condvar should be signaled.
319 static constexpr std::size_t signaled_bit = 1;
320
321 /// Increment per waiting thread in `state_`.
322 static constexpr std::size_t waiter_increment = 2;
323 mutable std::size_t state_ = 0;
324
325 /// Sentinel op that triggers a reactor poll when dequeued.
326 struct task_op final : scheduler_op
327 {
328 void operator()() override {}
329 void destroy() override {}
330 };
331 task_op task_op_;
332
333 /// Run the platform-specific reactor poll.
334 virtual void
335 run_task(lock_type& lock, context_type* ctx,
336 long timeout_us) = 0;
337
338 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
339 virtual void interrupt_reactor() const = 0;
340
341 private:
342 struct work_cleanup
343 {
344 reactor_scheduler* sched;
345 lock_type* lock;
346 context_type* ctx;
347 ~work_cleanup();
348 };
349
350 std::size_t do_one(
351 lock_type& lock, long timeout_us, context_type* ctx);
352
353 void signal_all(lock_type& lock) const;
354 bool maybe_unlock_and_signal_one(lock_type& lock) const;
355 bool unlock_and_signal_one(lock_type& lock) const;
356 void clear_signal() const;
357 void wait_for_signal(lock_type& lock) const;
358 void wait_for_signal_for(
359 lock_type& lock, long timeout_us) const;
360 void wake_one_thread_and_unlock(lock_type& lock) const;
361 };
362
363 /** RAII guard that pushes/pops a scheduler context frame.
364
365 On construction, pushes a new context frame onto the
366 thread-local stack. On destruction, drains any remaining
367 private queue items to the global queue and pops the frame.
368 */
369 struct reactor_thread_context_guard
370 {
371 /// The context frame managed by this guard.
372 reactor_scheduler_context frame_;
373
374 /// Construct the guard, pushing a frame for @a sched.
375 482x explicit reactor_thread_context_guard(
376 reactor_scheduler const* sched) noexcept
377 482x : frame_(sched, reactor_context_stack.get())
378 {
379 482x reactor_context_stack.set(&frame_);
380 482x }
381
382 /// Destroy the guard, draining private work and popping the frame.
383 482x ~reactor_thread_context_guard() noexcept
384 {
385 482x if (!frame_.private_queue.empty())
386 frame_.key->drain_thread_queue(
387 frame_.private_queue, frame_.private_outstanding_work);
388 482x reactor_context_stack.set(frame_.next);
389 482x }
390 };
391
392 // ---- Inline implementations ------------------------------------------------
393
394 inline
395 482x reactor_scheduler_context::reactor_scheduler_context(
396 reactor_scheduler const* k,
397 482x reactor_scheduler_context* n)
398 482x : key(k)
399 482x , next(n)
400 482x , private_outstanding_work(0)
401 482x , inline_budget(0)
402 482x , inline_budget_max(
403 482x static_cast<int>(k->inline_budget_initial()))
404 482x , unassisted(false)
405 {
406 482x }
407
408 inline void
409 reactor_scheduler::configure_reactor(
410 unsigned max_events,
411 unsigned budget_init,
412 unsigned budget_max,
413 unsigned unassisted)
414 {
415 if (max_events < 1 ||
416 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 throw std::out_of_range(
418 "max_events_per_poll must be in [1, INT_MAX]");
419 if (budget_max < 1 ||
420 budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
421 throw std::out_of_range(
422 "inline_budget_max must be in [1, INT_MAX]");
423
424 // Clamp initial and unassisted to budget_max.
425 if (budget_init > budget_max)
426 budget_init = budget_max;
427 if (unassisted > budget_max)
428 unassisted = budget_max;
429
430 max_events_per_poll_ = max_events;
431 inline_budget_initial_ = budget_init;
432 inline_budget_max_ = budget_max;
433 unassisted_budget_ = unassisted;
434 }
435
436 inline void
437 90344x reactor_scheduler::reset_inline_budget() const noexcept
438 {
439 90344x if (auto* ctx = reactor_find_context(this))
440 {
441 // Cap when no other thread absorbed queued work
442 90344x if (ctx->unassisted)
443 {
444 90344x ctx->inline_budget_max =
445 90344x static_cast<int>(unassisted_budget_);
446 90344x ctx->inline_budget =
447 90344x static_cast<int>(unassisted_budget_);
448 90344x return;
449 }
450 // Ramp up when previous cycle fully consumed budget
451 if (ctx->inline_budget == 0)
452 ctx->inline_budget_max = (std::min)(
453 ctx->inline_budget_max * 2,
454 static_cast<int>(inline_budget_max_));
455 else if (ctx->inline_budget < ctx->inline_budget_max)
456 ctx->inline_budget_max =
457 static_cast<int>(inline_budget_initial_);
458 ctx->inline_budget = ctx->inline_budget_max;
459 }
460 }
461
462 inline bool
463 399312x reactor_scheduler::try_consume_inline_budget() const noexcept
464 {
465 399312x if (auto* ctx = reactor_find_context(this))
466 {
467 399312x if (ctx->inline_budget > 0)
468 {
469 319457x --ctx->inline_budget;
470 319457x return true;
471 }
472 }
473 79855x return false;
474 }
475
476 inline void
477 2155x reactor_scheduler::post(std::coroutine_handle<> h) const
478 {
479 struct post_handler final : scheduler_op
480 {
481 std::coroutine_handle<> h_;
482
483 2155x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
484 4310x ~post_handler() override = default;
485
486 2146x void operator()() override
487 {
488 2146x auto saved = h_;
489 2146x delete this;
490 // Ensure stores from the posting thread are visible
491 std::atomic_thread_fence(std::memory_order_acquire);
492 2146x saved.resume();
493 2146x }
494
495 9x void destroy() override
496 {
497 9x auto saved = h_;
498 9x delete this;
499 9x saved.destroy();
500 9x }
501 };
502
503 2155x auto ph = std::make_unique<post_handler>(h);
504
505 2155x if (auto* ctx = reactor_find_context(this))
506 {
507 6x ++ctx->private_outstanding_work;
508 6x ctx->private_queue.push(ph.release());
509 6x return;
510 }
511
512 2149x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
513
514 2149x lock_type lock(mutex_);
515 2149x completed_ops_.push(ph.release());
516 2149x wake_one_thread_and_unlock(lock);
517 2155x }
518
519 inline void
520 91632x reactor_scheduler::post(scheduler_op* h) const
521 {
522 91632x if (auto* ctx = reactor_find_context(this))
523 {
524 91460x ++ctx->private_outstanding_work;
525 91460x ctx->private_queue.push(h);
526 91460x return;
527 }
528
529 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
530
531 172x lock_type lock(mutex_);
532 172x completed_ops_.push(h);
533 172x wake_one_thread_and_unlock(lock);
534 172x }
535
536 inline bool
537 1322x reactor_scheduler::running_in_this_thread() const noexcept
538 {
539 1322x return reactor_find_context(this) != nullptr;
540 }
541
542 inline void
543 441x reactor_scheduler::stop()
544 {
545 441x lock_type lock(mutex_);
546 441x if (!stopped_.load(std::memory_order_acquire))
547 {
548 400x stopped_.store(true, std::memory_order_release);
549 400x signal_all(lock);
550 400x interrupt_reactor();
551 }
552 441x }
553
554 inline bool
555 62x reactor_scheduler::stopped() const noexcept
556 {
557 62x return stopped_.load(std::memory_order_acquire);
558 }
559
560 inline void
561 111x reactor_scheduler::restart()
562 {
563 111x stopped_.store(false, std::memory_order_release);
564 111x }
565
566 inline std::size_t
567 412x reactor_scheduler::run()
568 {
569 824x if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 31x stop();
572 31x return 0;
573 }
574
575 381x reactor_thread_context_guard ctx(this);
576 381x lock_type lock(mutex_);
577
578 381x std::size_t n = 0;
579 for (;;)
580 {
581 249831x if (!do_one(lock, -1, &ctx.frame_))
582 381x break;
583 249450x if (n != (std::numeric_limits<std::size_t>::max)())
584 249450x ++n;
585 249450x if (!lock.owns_lock())
586 163485x lock.lock();
587 }
588 381x return n;
589 381x }
590
591 inline std::size_t
592 2x reactor_scheduler::run_one()
593 {
594 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
595 {
596 stop();
597 return 0;
598 }
599
600 2x reactor_thread_context_guard ctx(this);
601 2x lock_type lock(mutex_);
602 2x return do_one(lock, -1, &ctx.frame_);
603 2x }
604
605 inline std::size_t
606 102x reactor_scheduler::wait_one(long usec)
607 {
608 204x if (outstanding_work_.load(std::memory_order_acquire) == 0)
609 {
610 10x stop();
611 10x return 0;
612 }
613
614 92x reactor_thread_context_guard ctx(this);
615 92x lock_type lock(mutex_);
616 92x return do_one(lock, usec, &ctx.frame_);
617 92x }
618
619 inline std::size_t
620 6x reactor_scheduler::poll()
621 {
622 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
623 {
624 1x stop();
625 1x return 0;
626 }
627
628 5x reactor_thread_context_guard ctx(this);
629 5x lock_type lock(mutex_);
630
631 5x std::size_t n = 0;
632 for (;;)
633 {
634 11x if (!do_one(lock, 0, &ctx.frame_))
635 5x break;
636 6x if (n != (std::numeric_limits<std::size_t>::max)())
637 6x ++n;
638 6x if (!lock.owns_lock())
639 6x lock.lock();
640 }
641 5x return n;
642 5x }
643
644 inline std::size_t
645 4x reactor_scheduler::poll_one()
646 {
647 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
648 {
649 2x stop();
650 2x return 0;
651 }
652
653 2x reactor_thread_context_guard ctx(this);
654 2x lock_type lock(mutex_);
655 2x return do_one(lock, 0, &ctx.frame_);
656 2x }
657
658 inline void
659 17504x reactor_scheduler::work_started() noexcept
660 {
661 17504x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
662 17504x }
663
664 inline void
665 25287x reactor_scheduler::work_finished() noexcept
666 {
667 50574x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
668 392x stop();
669 25287x }
670
671 inline void
672 145566x reactor_scheduler::compensating_work_started() const noexcept
673 {
674 145566x auto* ctx = reactor_find_context(this);
675 145566x if (ctx)
676 145566x ++ctx->private_outstanding_work;
677 145566x }
678
679 inline void
680 reactor_scheduler::drain_thread_queue(
681 op_queue& queue, std::int64_t count) const
682 {
683 if (count > 0)
684 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
685
686 lock_type lock(mutex_);
687 completed_ops_.splice(queue);
688 if (count > 0)
689 maybe_unlock_and_signal_one(lock);
690 }
691
692 inline void
693 10175x reactor_scheduler::post_deferred_completions(op_queue& ops) const
694 {
695 10175x if (ops.empty())
696 10175x return;
697
698 if (auto* ctx = reactor_find_context(this))
699 {
700 ctx->private_queue.splice(ops);
701 return;
702 }
703
704 lock_type lock(mutex_);
705 completed_ops_.splice(ops);
706 wake_one_thread_and_unlock(lock);
707 }
708
709 inline void
710 587x reactor_scheduler::shutdown_drain()
711 {
712 587x lock_type lock(mutex_);
713
714 1279x while (auto* h = completed_ops_.pop())
715 {
716 692x if (h == &task_op_)
717 587x continue;
718 105x lock.unlock();
719 105x h->destroy();
720 105x lock.lock();
721 692x }
722
723 587x signal_all(lock);
724 587x }
725
726 inline void
727 987x reactor_scheduler::signal_all(lock_type&) const
728 {
729 987x state_ |= signaled_bit;
730 987x cond_.notify_all();
731 987x }
732
733 inline bool
734 2321x reactor_scheduler::maybe_unlock_and_signal_one(
735 lock_type& lock) const
736 {
737 2321x state_ |= signaled_bit;
738 2321x if (state_ > signaled_bit)
739 {
740 lock.unlock();
741 cond_.notify_one();
742 return true;
743 }
744 2321x return false;
745 }
746
747 inline bool
748 296180x reactor_scheduler::unlock_and_signal_one(
749 lock_type& lock) const
750 {
751 296180x state_ |= signaled_bit;
752 296180x bool have_waiters = state_ > signaled_bit;
753 296180x lock.unlock();
754 296180x if (have_waiters)
755 cond_.notify_one();
756 296180x return have_waiters;
757 }
758
759 inline void
760 reactor_scheduler::clear_signal() const
761 {
762 state_ &= ~signaled_bit;
763 }
764
765 inline void
766 reactor_scheduler::wait_for_signal(
767 lock_type& lock) const
768 {
769 while ((state_ & signaled_bit) == 0)
770 {
771 state_ += waiter_increment;
772 cond_.wait(lock);
773 state_ -= waiter_increment;
774 }
775 }
776
777 inline void
778 reactor_scheduler::wait_for_signal_for(
779 lock_type& lock, long timeout_us) const
780 {
781 if ((state_ & signaled_bit) == 0)
782 {
783 state_ += waiter_increment;
784 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
785 state_ -= waiter_increment;
786 }
787 }
788
789 inline void
790 2321x reactor_scheduler::wake_one_thread_and_unlock(
791 lock_type& lock) const
792 {
793 2321x if (maybe_unlock_and_signal_one(lock))
794 return;
795
796 2321x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
797 {
798 55x task_interrupted_ = true;
799 55x lock.unlock();
800 55x interrupt_reactor();
801 }
802 else
803 {
804 2266x lock.unlock();
805 }
806 }
807
808 249511x inline reactor_scheduler::work_cleanup::~work_cleanup()
809 {
810 249511x if (ctx)
811 {
812 249511x std::int64_t produced = ctx->private_outstanding_work;
813 249511x if (produced > 1)
814 15x sched->outstanding_work_.fetch_add(
815 produced - 1, std::memory_order_relaxed);
816 249496x else if (produced < 1)
817 17958x sched->work_finished();
818 249511x ctx->private_outstanding_work = 0;
819
820 249511x if (!ctx->private_queue.empty())
821 {
822 85987x lock->lock();
823 85987x sched->completed_ops_.splice(ctx->private_queue);
824 }
825 }
826 else
827 {
828 sched->work_finished();
829 }
830 249511x }
831
832 341050x inline reactor_scheduler::task_cleanup::~task_cleanup()
833 {
834 170525x if (!ctx)
835 return;
836
837 170525x if (ctx->private_outstanding_work > 0)
838 {
839 5452x sched->outstanding_work_.fetch_add(
840 5452x ctx->private_outstanding_work, std::memory_order_relaxed);
841 5452x ctx->private_outstanding_work = 0;
842 }
843
844 170525x if (!ctx->private_queue.empty())
845 {
846 5452x if (!lock->owns_lock())
847 lock->lock();
848 5452x sched->completed_ops_.splice(ctx->private_queue);
849 }
850 170525x }
851
852 inline std::size_t
853 249938x reactor_scheduler::do_one(
854 lock_type& lock, long timeout_us, context_type* ctx)
855 {
856 for (;;)
857 {
858 420422x if (stopped_.load(std::memory_order_acquire))
859 381x return 0;
860
861 420041x scheduler_op* op = completed_ops_.pop();
862
863 // Handle reactor sentinel — time to poll for I/O
864 420041x if (op == &task_op_)
865 {
866 bool more_handlers =
867 170530x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
868
869 294391x if (!more_handlers &&
870 247722x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
871 timeout_us == 0))
872 {
873 5x completed_ops_.push(&task_op_);
874 5x return 0;
875 }
876
877 170525x long task_timeout_us = more_handlers ? 0 : timeout_us;
878 170525x task_interrupted_ = task_timeout_us == 0;
879 170525x task_running_.store(true, std::memory_order_release);
880
881 170525x if (more_handlers)
882 46669x unlock_and_signal_one(lock);
883
884 try
885 {
886 170525x run_task(lock, ctx, task_timeout_us);
887 }
888 catch (...)
889 {
890 task_running_.store(false, std::memory_order_relaxed);
891 throw;
892 }
893
894 170525x task_running_.store(false, std::memory_order_relaxed);
895 170525x completed_ops_.push(&task_op_);
896 170525x if (timeout_us > 0)
897 41x return 0;
898 170484x continue;
899 170484x }
900
901 // Handle operation
902 249511x if (op != nullptr)
903 {
904 249511x bool more = !completed_ops_.empty();
905
906 249511x if (more)
907 249511x ctx->unassisted = !unlock_and_signal_one(lock);
908 else
909 {
910 ctx->unassisted = false;
911 lock.unlock();
912 }
913
914 249511x work_cleanup on_exit{this, &lock, ctx};
915 (void)on_exit;
916
917 249511x (*op)();
918 249511x return 1;
919 249511x }
920
921 // Try private queue before blocking
922 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
923 continue;
924
925 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
926 timeout_us == 0)
927 return 0;
928
929 clear_signal();
930 if (timeout_us < 0)
931 wait_for_signal(lock);
932 else
933 wait_for_signal_for(lock, timeout_us);
934 170484x }
935 }
936
937 } // namespace boost::corosio::detail
938
939 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
940