TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/detail/scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declarations
35 : class reactor_scheduler;
36 : class timer_service;
37 :
38 : /** Per-thread state for a reactor scheduler.
39 :
40 : Each thread running a scheduler's event loop has one of these
41 : on a thread-local stack. It holds a private work queue and
42 : inline completion budget for speculative I/O fast paths.
43 : */
44 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 : {
46 : /// Scheduler this context belongs to.
47 : reactor_scheduler const* key;
48 :
49 : /// Next context frame on this thread's stack.
50 : reactor_scheduler_context* next;
51 :
52 : /// Private work queue for reduced contention.
53 : op_queue private_queue;
54 :
55 : /// Unflushed work count for the private queue.
56 : std::int64_t private_outstanding_work;
57 :
58 : /// Remaining inline completions allowed this cycle.
59 : int inline_budget;
60 :
61 : /// Maximum inline budget (adaptive, 2-16).
62 : int inline_budget_max;
63 :
64 : /// True if no other thread absorbed queued work last cycle.
65 : bool unassisted;
66 :
67 : /// Construct a context frame linked to @a n.
68 : reactor_scheduler_context(
69 : reactor_scheduler const* k,
70 : reactor_scheduler_context* n);
71 : };
72 :
73 : /// Thread-local context stack for reactor schedulers.
74 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75 :
76 : /// Find the context frame for a scheduler on this thread.
77 : inline reactor_scheduler_context*
78 HIT 756975 : reactor_find_context(reactor_scheduler const* self) noexcept
79 : {
80 756975 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 : {
82 753927 : if (c->key == self)
83 753927 : return c;
84 : }
85 3048 : return nullptr;
86 : }
87 :
88 : /// Flush private work count to global counter.
89 : inline void
90 MIS 0 : reactor_flush_private_work(
91 : reactor_scheduler_context* ctx,
92 : std::atomic<std::int64_t>& outstanding_work) noexcept
93 : {
94 0 : if (ctx && ctx->private_outstanding_work > 0)
95 : {
96 0 : outstanding_work.fetch_add(
97 : ctx->private_outstanding_work, std::memory_order_relaxed);
98 0 : ctx->private_outstanding_work = 0;
99 : }
100 0 : }
101 :
102 : /** Drain private queue to global queue, flushing work count first.
103 :
104 : @return True if any ops were drained.
105 : */
106 : inline bool
107 0 : reactor_drain_private_queue(
108 : reactor_scheduler_context* ctx,
109 : std::atomic<std::int64_t>& outstanding_work,
110 : op_queue& completed_ops) noexcept
111 : {
112 0 : if (!ctx || ctx->private_queue.empty())
113 0 : return false;
114 :
115 0 : reactor_flush_private_work(ctx, outstanding_work);
116 0 : completed_ops.splice(ctx->private_queue);
117 0 : return true;
118 : }
119 :
120 : /** Non-template base for reactor-backed scheduler implementations.
121 :
122 : Provides the complete threading model shared by epoll, kqueue,
123 : and select schedulers: signal state machine, inline completion
124 : budget, work counting, run/poll methods, and the do_one event
125 : loop.
126 :
127 : Derived classes provide platform-specific hooks by overriding:
128 : - `run_task(lock, ctx)` to run the reactor poll
129 : - `interrupt_reactor()` to wake a blocked reactor
130 :
131 : De-templated from the original CRTP design to eliminate
132 : duplicate instantiations when multiple backends are compiled
133 : into the same binary. Virtual dispatch for run_task (called
134 : once per reactor cycle, before a blocking syscall) has
135 : negligible overhead.
136 :
137 : @par Thread Safety
138 : All public member functions are thread-safe.
139 : */
140 : class reactor_scheduler
141 : : public scheduler
142 : , public capy::execution_context::service
143 : {
144 : public:
145 : using key_type = scheduler;
146 : using context_type = reactor_scheduler_context;
147 : using mutex_type = conditionally_enabled_mutex;
148 : using lock_type = mutex_type::scoped_lock;
149 : using event_type = conditionally_enabled_event;
150 :
151 : /// Epoll and kqueue do not need write-direction notification.
152 : static constexpr bool needs_write_notification = false;
153 :
154 : /// Post a coroutine for deferred execution.
155 : void post(std::coroutine_handle<> h) const override;
156 :
157 : /// Post a scheduler operation for deferred execution.
158 : void post(scheduler_op* h) const override;
159 :
160 : /// Return true if called from a thread running this scheduler.
161 : bool running_in_this_thread() const noexcept override;
162 :
163 : /// Request the scheduler to stop dispatching handlers.
164 : void stop() override;
165 :
166 : /// Return true if the scheduler has been stopped.
167 : bool stopped() const noexcept override;
168 :
169 : /// Reset the stopped state so `run()` can resume.
170 : void restart() override;
171 :
172 : /// Run the event loop until no work remains.
173 : std::size_t run() override;
174 :
175 : /// Run until one handler completes or no work remains.
176 : std::size_t run_one() override;
177 :
178 : /// Run until one handler completes or @a usec elapses.
179 : std::size_t wait_one(long usec) override;
180 :
181 : /// Run ready handlers without blocking.
182 : std::size_t poll() override;
183 :
184 : /// Run at most one ready handler without blocking.
185 : std::size_t poll_one() override;
186 :
187 : /// Increment the outstanding work count.
188 : void work_started() noexcept override;
189 :
190 : /// Decrement the outstanding work count, stopping on zero.
191 : void work_finished() noexcept override;
192 :
193 : /** Reset the thread's inline completion budget.
194 :
195 : Called at the start of each posted completion handler to
196 : grant a fresh budget for speculative inline completions.
197 : */
198 : void reset_inline_budget() const noexcept;
199 :
200 : /** Consume one unit of inline budget if available.
201 :
202 : @return True if budget was available and consumed.
203 : */
204 : bool try_consume_inline_budget() const noexcept;
205 :
206 : /** Offset a forthcoming work_finished from work_cleanup.
207 :
208 : Called by descriptor_state when all I/O returned EAGAIN and
209 : no handler will be executed. Must be called from a scheduler
210 : thread.
211 : */
212 : void compensating_work_started() const noexcept;
213 :
214 : /** Drain work from thread context's private queue to global queue.
215 :
216 : Flushes private work count to the global counter, then
217 : transfers the queue under mutex protection.
218 :
219 : @param queue The private queue to drain.
220 : @param count Private work count to flush before draining.
221 : */
222 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
223 :
224 : /** Post completed operations for deferred invocation.
225 :
226 : If called from a thread running this scheduler, operations
227 : go to the thread's private queue (fast path). Otherwise,
228 : operations are added to the global queue under mutex and a
229 : waiter is signaled.
230 :
231 : @par Preconditions
232 : work_started() must have been called for each operation.
233 :
234 : @param ops Queue of operations to post.
235 : */
236 : void post_deferred_completions(op_queue& ops) const;
237 :
238 : /** Apply runtime configuration to the scheduler.
239 :
240 : Called by `io_context` after construction. Values that do
241 : not apply to this backend are silently ignored.
242 :
243 : @param max_events Event buffer size for epoll/kqueue.
244 : @param budget_init Starting inline completion budget.
245 : @param budget_max Hard ceiling on adaptive budget ramp-up.
246 : @param unassisted Budget when single-threaded.
247 : */
248 : virtual void configure_reactor(
249 : unsigned max_events,
250 : unsigned budget_init,
251 : unsigned budget_max,
252 : unsigned unassisted);
253 :
254 : /// Return the configured initial inline budget.
255 HIT 483 : unsigned inline_budget_initial() const noexcept
256 : {
257 483 : return inline_budget_initial_;
258 : }
259 :
260 : /// Return true if single-threaded (lockless) mode is active.
261 64 : bool is_single_threaded() const noexcept
262 : {
263 64 : return single_threaded_;
264 : }
265 :
266 : /** Enable or disable single-threaded (lockless) mode.
267 :
268 : When enabled, all scheduler mutex and condition variable
269 : operations become no-ops. Cross-thread post() is
270 : undefined behavior.
271 : */
272 MIS 0 : void configure_single_threaded(bool v) noexcept
273 : {
274 0 : single_threaded_ = v;
275 0 : mutex_.set_enabled(!v);
276 0 : cond_.set_enabled(!v);
277 0 : }
278 :
279 : protected:
280 : timer_service* timer_svc_ = nullptr;
281 : bool single_threaded_ = false;
282 :
283 HIT 587 : reactor_scheduler() = default;
284 :
285 : /** Drain completed_ops during shutdown.
286 :
287 : Pops all operations from the global queue and destroys them,
288 : skipping the task sentinel. Signals all waiting threads.
289 : Derived classes call this from their shutdown() override
290 : before performing platform-specific cleanup.
291 : */
292 : void shutdown_drain();
293 :
294 : /// RAII guard that re-inserts the task sentinel after `run_task`.
295 : struct task_cleanup
296 : {
297 : reactor_scheduler const* sched;
298 : lock_type* lock;
299 : context_type* ctx;
300 : ~task_cleanup();
301 : };
302 :
303 : mutable mutex_type mutex_{true};
304 : mutable event_type cond_{true};
305 : mutable op_queue completed_ops_;
306 : mutable std::atomic<std::int64_t> outstanding_work_{0};
307 : std::atomic<bool> stopped_{false};
308 : mutable std::atomic<bool> task_running_{false};
309 : mutable bool task_interrupted_ = false;
310 :
311 : // Runtime-configurable reactor tuning parameters.
312 : // Defaults match the library's built-in values.
313 : unsigned max_events_per_poll_ = 128;
314 : unsigned inline_budget_initial_ = 2;
315 : unsigned inline_budget_max_ = 16;
316 : unsigned unassisted_budget_ = 4;
317 :
318 : /// Bit 0 of `state_`: set when the condvar should be signaled.
319 : static constexpr std::size_t signaled_bit = 1;
320 :
321 : /// Increment per waiting thread in `state_`.
322 : static constexpr std::size_t waiter_increment = 2;
323 : mutable std::size_t state_ = 0;
324 :
325 : /// Sentinel op that triggers a reactor poll when dequeued.
326 : struct task_op final : scheduler_op
327 : {
328 MIS 0 : void operator()() override {}
329 0 : void destroy() override {}
330 : };
331 : task_op task_op_;
332 :
333 : /// Run the platform-specific reactor poll.
334 : virtual void
335 : run_task(lock_type& lock, context_type* ctx,
336 : long timeout_us) = 0;
337 :
338 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
339 : virtual void interrupt_reactor() const = 0;
340 :
341 : private:
342 : struct work_cleanup
343 : {
344 : reactor_scheduler* sched;
345 : lock_type* lock;
346 : context_type* ctx;
347 : ~work_cleanup();
348 : };
349 :
350 : std::size_t do_one(
351 : lock_type& lock, long timeout_us, context_type* ctx);
352 :
353 : void signal_all(lock_type& lock) const;
354 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
355 : bool unlock_and_signal_one(lock_type& lock) const;
356 : void clear_signal() const;
357 : void wait_for_signal(lock_type& lock) const;
358 : void wait_for_signal_for(
359 : lock_type& lock, long timeout_us) const;
360 : void wake_one_thread_and_unlock(lock_type& lock) const;
361 : };
362 :
363 : /** RAII guard that pushes/pops a scheduler context frame.
364 :
365 : On construction, pushes a new context frame onto the
366 : thread-local stack. On destruction, drains any remaining
367 : private queue items to the global queue and pops the frame.
368 : */
369 : struct reactor_thread_context_guard
370 : {
371 : /// The context frame managed by this guard.
372 : reactor_scheduler_context frame_;
373 :
374 : /// Construct the guard, pushing a frame for @a sched.
375 HIT 483 : explicit reactor_thread_context_guard(
376 : reactor_scheduler const* sched) noexcept
377 483 : : frame_(sched, reactor_context_stack.get())
378 : {
379 483 : reactor_context_stack.set(&frame_);
380 483 : }
381 :
382 : /// Destroy the guard, draining private work and popping the frame.
383 483 : ~reactor_thread_context_guard() noexcept
384 : {
385 483 : if (!frame_.private_queue.empty())
386 MIS 0 : frame_.key->drain_thread_queue(
387 0 : frame_.private_queue, frame_.private_outstanding_work);
388 HIT 483 : reactor_context_stack.set(frame_.next);
389 483 : }
390 : };
391 :
392 : // ---- Inline implementations ------------------------------------------------
393 :
394 : inline
395 483 : reactor_scheduler_context::reactor_scheduler_context(
396 : reactor_scheduler const* k,
397 483 : reactor_scheduler_context* n)
398 483 : : key(k)
399 483 : , next(n)
400 483 : , private_outstanding_work(0)
401 483 : , inline_budget(0)
402 483 : , inline_budget_max(
403 483 : static_cast<int>(k->inline_budget_initial()))
404 483 : , unassisted(false)
405 : {
406 483 : }
407 :
408 : inline void
409 MIS 0 : reactor_scheduler::configure_reactor(
410 : unsigned max_events,
411 : unsigned budget_init,
412 : unsigned budget_max,
413 : unsigned unassisted)
414 : {
415 0 : if (max_events < 1 ||
416 0 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 : throw std::out_of_range(
418 0 : "max_events_per_poll must be in [1, INT_MAX]");
419 0 : if (budget_max < 1 ||
420 0 : budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
421 : throw std::out_of_range(
422 0 : "inline_budget_max must be in [1, INT_MAX]");
423 :
424 : // Clamp initial and unassisted to budget_max.
425 0 : if (budget_init > budget_max)
426 0 : budget_init = budget_max;
427 0 : if (unassisted > budget_max)
428 0 : unassisted = budget_max;
429 :
430 0 : max_events_per_poll_ = max_events;
431 0 : inline_budget_initial_ = budget_init;
432 0 : inline_budget_max_ = budget_max;
433 0 : unassisted_budget_ = unassisted;
434 0 : }
435 :
436 : inline void
437 HIT 98603 : reactor_scheduler::reset_inline_budget() const noexcept
438 : {
439 98603 : if (auto* ctx = reactor_find_context(this))
440 : {
441 : // Cap when no other thread absorbed queued work
442 98603 : if (ctx->unassisted)
443 : {
444 98603 : ctx->inline_budget_max =
445 98603 : static_cast<int>(unassisted_budget_);
446 98603 : ctx->inline_budget =
447 98603 : static_cast<int>(unassisted_budget_);
448 98603 : return;
449 : }
450 : // Ramp up when previous cycle fully consumed budget
451 MIS 0 : if (ctx->inline_budget == 0)
452 0 : ctx->inline_budget_max = (std::min)(
453 0 : ctx->inline_budget_max * 2,
454 0 : static_cast<int>(inline_budget_max_));
455 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
456 0 : ctx->inline_budget_max =
457 0 : static_cast<int>(inline_budget_initial_);
458 0 : ctx->inline_budget = ctx->inline_budget_max;
459 : }
460 : }
461 :
462 : inline bool
463 HIT 405830 : reactor_scheduler::try_consume_inline_budget() const noexcept
464 : {
465 405830 : if (auto* ctx = reactor_find_context(this))
466 : {
467 405830 : if (ctx->inline_budget > 0)
468 : {
469 324672 : --ctx->inline_budget;
470 324672 : return true;
471 : }
472 : }
473 81158 : return false;
474 : }
475 :
476 : inline void
477 2155 : reactor_scheduler::post(std::coroutine_handle<> h) const
478 : {
479 : struct post_handler final : scheduler_op
480 : {
481 : std::coroutine_handle<> h_;
482 :
483 2155 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
484 4310 : ~post_handler() override = default;
485 :
486 2146 : void operator()() override
487 : {
488 2146 : auto saved = h_;
489 2146 : delete this;
490 : // Ensure stores from the posting thread are visible
491 : std::atomic_thread_fence(std::memory_order_acquire);
492 2146 : saved.resume();
493 2146 : }
494 :
495 9 : void destroy() override
496 : {
497 9 : auto saved = h_;
498 9 : delete this;
499 9 : saved.destroy();
500 9 : }
501 : };
502 :
503 2155 : auto ph = std::make_unique<post_handler>(h);
504 :
505 2155 : if (auto* ctx = reactor_find_context(this))
506 : {
507 6 : ++ctx->private_outstanding_work;
508 6 : ctx->private_queue.push(ph.release());
509 6 : return;
510 : }
511 :
512 2149 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
513 :
514 2149 : lock_type lock(mutex_);
515 2149 : completed_ops_.push(ph.release());
516 2149 : wake_one_thread_and_unlock(lock);
517 2155 : }
518 :
519 : inline void
520 99898 : reactor_scheduler::post(scheduler_op* h) const
521 : {
522 99898 : if (auto* ctx = reactor_find_context(this))
523 : {
524 99726 : ++ctx->private_outstanding_work;
525 99726 : ctx->private_queue.push(h);
526 99726 : return;
527 : }
528 :
529 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
530 :
531 172 : lock_type lock(mutex_);
532 172 : completed_ops_.push(h);
533 172 : wake_one_thread_and_unlock(lock);
534 172 : }
535 :
536 : inline bool
537 1326 : reactor_scheduler::running_in_this_thread() const noexcept
538 : {
539 1326 : return reactor_find_context(this) != nullptr;
540 : }
541 :
542 : inline void
543 440 : reactor_scheduler::stop()
544 : {
545 440 : lock_type lock(mutex_);
546 440 : if (!stopped_.load(std::memory_order_acquire))
547 : {
548 400 : stopped_.store(true, std::memory_order_release);
549 400 : signal_all(lock);
550 400 : interrupt_reactor();
551 : }
552 440 : }
553 :
554 : inline bool
555 62 : reactor_scheduler::stopped() const noexcept
556 : {
557 62 : return stopped_.load(std::memory_order_acquire);
558 : }
559 :
560 : inline void
561 111 : reactor_scheduler::restart()
562 : {
563 111 : stopped_.store(false, std::memory_order_release);
564 111 : }
565 :
566 : inline std::size_t
567 412 : reactor_scheduler::run()
568 : {
569 824 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 : {
571 30 : stop();
572 30 : return 0;
573 : }
574 :
575 382 : reactor_thread_context_guard ctx(this);
576 382 : lock_type lock(mutex_);
577 :
578 382 : std::size_t n = 0;
579 : for (;;)
580 : {
581 268648 : if (!do_one(lock, -1, &ctx.frame_))
582 382 : break;
583 268266 : if (n != (std::numeric_limits<std::size_t>::max)())
584 268266 : ++n;
585 268266 : if (!lock.owns_lock())
586 177515 : lock.lock();
587 : }
588 382 : return n;
589 382 : }
590 :
591 : inline std::size_t
592 2 : reactor_scheduler::run_one()
593 : {
594 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
595 : {
596 MIS 0 : stop();
597 0 : return 0;
598 : }
599 :
600 HIT 2 : reactor_thread_context_guard ctx(this);
601 2 : lock_type lock(mutex_);
602 2 : return do_one(lock, -1, &ctx.frame_);
603 2 : }
604 :
605 : inline std::size_t
606 102 : reactor_scheduler::wait_one(long usec)
607 : {
608 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
609 : {
610 10 : stop();
611 10 : return 0;
612 : }
613 :
614 92 : reactor_thread_context_guard ctx(this);
615 92 : lock_type lock(mutex_);
616 92 : return do_one(lock, usec, &ctx.frame_);
617 92 : }
618 :
619 : inline std::size_t
620 6 : reactor_scheduler::poll()
621 : {
622 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
623 : {
624 1 : stop();
625 1 : return 0;
626 : }
627 :
628 5 : reactor_thread_context_guard ctx(this);
629 5 : lock_type lock(mutex_);
630 :
631 5 : std::size_t n = 0;
632 : for (;;)
633 : {
634 11 : if (!do_one(lock, 0, &ctx.frame_))
635 5 : break;
636 6 : if (n != (std::numeric_limits<std::size_t>::max)())
637 6 : ++n;
638 6 : if (!lock.owns_lock())
639 6 : lock.lock();
640 : }
641 5 : return n;
642 5 : }
643 :
644 : inline std::size_t
645 4 : reactor_scheduler::poll_one()
646 : {
647 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
648 : {
649 2 : stop();
650 2 : return 0;
651 : }
652 :
653 2 : reactor_thread_context_guard ctx(this);
654 2 : lock_type lock(mutex_);
655 2 : return do_one(lock, 0, &ctx.frame_);
656 2 : }
657 :
658 : inline void
659 27944 : reactor_scheduler::work_started() noexcept
660 : {
661 27944 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
662 27944 : }
663 :
664 : inline void
665 39207 : reactor_scheduler::work_finished() noexcept
666 : {
667 78414 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
668 392 : stop();
669 39207 : }
670 :
671 : inline void
672 149163 : reactor_scheduler::compensating_work_started() const noexcept
673 : {
674 149163 : auto* ctx = reactor_find_context(this);
675 149163 : if (ctx)
676 149163 : ++ctx->private_outstanding_work;
677 149163 : }
678 :
679 : inline void
680 MIS 0 : reactor_scheduler::drain_thread_queue(
681 : op_queue& queue, std::int64_t count) const
682 : {
683 0 : if (count > 0)
684 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
685 :
686 0 : lock_type lock(mutex_);
687 0 : completed_ops_.splice(queue);
688 0 : if (count > 0)
689 0 : maybe_unlock_and_signal_one(lock);
690 0 : }
691 :
692 : inline void
693 HIT 17128 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
694 : {
695 17128 : if (ops.empty())
696 17128 : return;
697 :
698 MIS 0 : if (auto* ctx = reactor_find_context(this))
699 : {
700 0 : ctx->private_queue.splice(ops);
701 0 : return;
702 : }
703 :
704 0 : lock_type lock(mutex_);
705 0 : completed_ops_.splice(ops);
706 0 : wake_one_thread_and_unlock(lock);
707 0 : }
708 :
709 : inline void
710 HIT 587 : reactor_scheduler::shutdown_drain()
711 : {
712 587 : lock_type lock(mutex_);
713 :
714 1279 : while (auto* h = completed_ops_.pop())
715 : {
716 692 : if (h == &task_op_)
717 587 : continue;
718 105 : lock.unlock();
719 105 : h->destroy();
720 105 : lock.lock();
721 692 : }
722 :
723 587 : signal_all(lock);
724 587 : }
725 :
726 : inline void
727 987 : reactor_scheduler::signal_all(lock_type&) const
728 : {
729 987 : state_ |= signaled_bit;
730 987 : cond_.notify_all();
731 987 : }
732 :
733 : inline bool
734 2321 : reactor_scheduler::maybe_unlock_and_signal_one(
735 : lock_type& lock) const
736 : {
737 2321 : state_ |= signaled_bit;
738 2321 : if (state_ > signaled_bit)
739 : {
740 MIS 0 : lock.unlock();
741 0 : cond_.notify_one();
742 0 : return true;
743 : }
744 HIT 2321 : return false;
745 : }
746 :
747 : inline bool
748 319544 : reactor_scheduler::unlock_and_signal_one(
749 : lock_type& lock) const
750 : {
751 319544 : state_ |= signaled_bit;
752 319544 : bool have_waiters = state_ > signaled_bit;
753 319544 : lock.unlock();
754 319544 : if (have_waiters)
755 MIS 0 : cond_.notify_one();
756 HIT 319544 : return have_waiters;
757 : }
758 :
759 : inline void
760 MIS 0 : reactor_scheduler::clear_signal() const
761 : {
762 0 : state_ &= ~signaled_bit;
763 0 : }
764 :
765 : inline void
766 0 : reactor_scheduler::wait_for_signal(
767 : lock_type& lock) const
768 : {
769 0 : while ((state_ & signaled_bit) == 0)
770 : {
771 0 : state_ += waiter_increment;
772 0 : cond_.wait(lock);
773 0 : state_ -= waiter_increment;
774 : }
775 0 : }
776 :
777 : inline void
778 0 : reactor_scheduler::wait_for_signal_for(
779 : lock_type& lock, long timeout_us) const
780 : {
781 0 : if ((state_ & signaled_bit) == 0)
782 : {
783 0 : state_ += waiter_increment;
784 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
785 0 : state_ -= waiter_increment;
786 : }
787 0 : }
788 :
789 : inline void
790 HIT 2321 : reactor_scheduler::wake_one_thread_and_unlock(
791 : lock_type& lock) const
792 : {
793 2321 : if (maybe_unlock_and_signal_one(lock))
794 MIS 0 : return;
795 :
796 HIT 2321 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
797 : {
798 57 : task_interrupted_ = true;
799 57 : lock.unlock();
800 57 : interrupt_reactor();
801 : }
802 : else
803 : {
804 2264 : lock.unlock();
805 : }
806 : }
807 :
808 268327 : inline reactor_scheduler::work_cleanup::~work_cleanup()
809 : {
810 268327 : if (ctx)
811 : {
812 268327 : std::int64_t produced = ctx->private_outstanding_work;
813 268327 : if (produced > 1)
814 15 : sched->outstanding_work_.fetch_add(
815 : produced - 1, std::memory_order_relaxed);
816 268312 : else if (produced < 1)
817 28391 : sched->work_finished();
818 268327 : ctx->private_outstanding_work = 0;
819 :
820 268327 : if (!ctx->private_queue.empty())
821 : {
822 90773 : lock->lock();
823 90773 : sched->completed_ops_.splice(ctx->private_queue);
824 : }
825 : }
826 : else
827 : {
828 MIS 0 : sched->work_finished();
829 : }
830 HIT 268327 : }
831 :
832 375524 : inline reactor_scheduler::task_cleanup::~task_cleanup()
833 : {
834 187762 : if (!ctx)
835 MIS 0 : return;
836 :
837 HIT 187762 : if (ctx->private_outstanding_work > 0)
838 : {
839 8933 : sched->outstanding_work_.fetch_add(
840 8933 : ctx->private_outstanding_work, std::memory_order_relaxed);
841 8933 : ctx->private_outstanding_work = 0;
842 : }
843 :
844 187762 : if (!ctx->private_queue.empty())
845 : {
846 8933 : if (!lock->owns_lock())
847 MIS 0 : lock->lock();
848 HIT 8933 : sched->completed_ops_.splice(ctx->private_queue);
849 : }
850 187762 : }
851 :
852 : inline std::size_t
853 268755 : reactor_scheduler::do_one(
854 : lock_type& lock, long timeout_us, context_type* ctx)
855 : {
856 : for (;;)
857 : {
858 456476 : if (stopped_.load(std::memory_order_acquire))
859 383 : return 0;
860 :
861 456093 : scheduler_op* op = completed_ops_.pop();
862 :
863 : // Handle reactor sentinel — time to poll for I/O
864 456093 : if (op == &task_op_)
865 : {
866 : bool more_handlers =
867 187766 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
868 :
869 324315 : if (!more_handlers &&
870 273098 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
871 : timeout_us == 0))
872 : {
873 4 : completed_ops_.push(&task_op_);
874 4 : return 0;
875 : }
876 :
877 187762 : long task_timeout_us = more_handlers ? 0 : timeout_us;
878 187762 : task_interrupted_ = task_timeout_us == 0;
879 187762 : task_running_.store(true, std::memory_order_release);
880 :
881 187762 : if (more_handlers)
882 51217 : unlock_and_signal_one(lock);
883 :
884 : try
885 : {
886 187762 : run_task(lock, ctx, task_timeout_us);
887 : }
888 MIS 0 : catch (...)
889 : {
890 0 : task_running_.store(false, std::memory_order_relaxed);
891 0 : throw;
892 0 : }
893 :
894 HIT 187762 : task_running_.store(false, std::memory_order_relaxed);
895 187762 : completed_ops_.push(&task_op_);
896 187762 : if (timeout_us > 0)
897 41 : return 0;
898 187721 : continue;
899 187721 : }
900 :
901 : // Handle operation
902 268327 : if (op != nullptr)
903 : {
904 268327 : bool more = !completed_ops_.empty();
905 :
906 268327 : if (more)
907 268327 : ctx->unassisted = !unlock_and_signal_one(lock);
908 : else
909 : {
910 MIS 0 : ctx->unassisted = false;
911 0 : lock.unlock();
912 : }
913 :
914 HIT 268327 : work_cleanup on_exit{this, &lock, ctx};
915 : (void)on_exit;
916 :
917 268327 : (*op)();
918 268327 : return 1;
919 268327 : }
920 :
921 : // Try private queue before blocking
922 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
923 0 : continue;
924 :
925 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
926 : timeout_us == 0)
927 0 : return 0;
928 :
929 0 : clear_signal();
930 0 : if (timeout_us < 0)
931 0 : wait_for_signal(lock);
932 : else
933 0 : wait_for_signal_for(lock, timeout_us);
934 HIT 187721 : }
935 : }
936 :
937 : } // namespace boost::corosio::detail
938 :
939 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|