1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/select/select_op.hpp>
22  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  
#include <errno.h>
34  
#include <errno.h>
35  
#include <fcntl.h>
35  
#include <fcntl.h>
36  

36  

37  
#include <atomic>
37  
#include <atomic>
38  
#include <chrono>
38  
#include <chrono>
39  
#include <cstdint>
39  
#include <cstdint>
40  
#include <limits>
40  
#include <limits>
41  
#include <mutex>
41  
#include <mutex>
42  
#include <unordered_map>
42  
#include <unordered_map>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45  

45  

46  
struct select_op;
46  
struct select_op;
47  
struct select_descriptor_state;
47  
struct select_descriptor_state;
48  

48  

49  
/** POSIX scheduler using select() for I/O multiplexing.
49  
/** POSIX scheduler using select() for I/O multiplexing.
50  

50  

51  
    This scheduler implements the scheduler interface using the POSIX select()
51  
    This scheduler implements the scheduler interface using the POSIX select()
52  
    call for I/O event notification. It inherits the shared reactor threading
52  
    call for I/O event notification. It inherits the shared reactor threading
53  
    model from reactor_scheduler: signal state machine, inline completion
53  
    model from reactor_scheduler: signal state machine, inline completion
54  
    budget, work counting, and the do_one event loop.
54  
    budget, work counting, and the do_one event loop.
55  

55  

56  
    The design mirrors epoll_scheduler for behavioral consistency:
56  
    The design mirrors epoll_scheduler for behavioral consistency:
57  
    - Same single-reactor thread coordination model
57  
    - Same single-reactor thread coordination model
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58  
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59  
    - Same timer integration pattern
59  
    - Same timer integration pattern
60  

60  

61  
    Known Limitations:
61  
    Known Limitations:
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
62  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
63  
    - O(n) scanning: rebuilds fd_sets each iteration
63  
    - O(n) scanning: rebuilds fd_sets each iteration
64  
    - Level-triggered only (no edge-triggered mode)
64  
    - Level-triggered only (no edge-triggered mode)
65  

65  

66  
    @par Thread Safety
66  
    @par Thread Safety
67  
    All public member functions are thread-safe.
67  
    All public member functions are thread-safe.
68  
*/
68  
*/
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
69  
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
70  
{
70  
{
71  
public:
71  
public:
 
72 +
    /// Select needs write-direction notification to rebuild fd_sets.
 
73 +
    static constexpr bool needs_write_notification = true;
 
74 +

72  
    /** Construct the scheduler.
75  
    /** Construct the scheduler.
73  

76  

74  
        Creates a self-pipe for reactor interruption.
77  
        Creates a self-pipe for reactor interruption.
75  

78  

76  
        @param ctx Reference to the owning execution_context.
79  
        @param ctx Reference to the owning execution_context.
77  
        @param concurrency_hint Hint for expected thread count (unused).
80  
        @param concurrency_hint Hint for expected thread count (unused).
78  
    */
81  
    */
79  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
82  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80  

83  

81  
    /// Destroy the scheduler.
84  
    /// Destroy the scheduler.
82  
    ~select_scheduler() override;
85  
    ~select_scheduler() override;
83  

86  

84  
    select_scheduler(select_scheduler const&)            = delete;
87  
    select_scheduler(select_scheduler const&)            = delete;
85  
    select_scheduler& operator=(select_scheduler const&) = delete;
88  
    select_scheduler& operator=(select_scheduler const&) = delete;
86  

89  

87  
    /// Shut down the scheduler, draining pending operations.
90  
    /// Shut down the scheduler, draining pending operations.
88  
    void shutdown() override;
91  
    void shutdown() override;
89  

92  

90  
    /** Return the maximum file descriptor value supported.
93  
    /** Return the maximum file descriptor value supported.
91  

94  

92  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
95  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
93  
        monitored by select(). Operations with fd >= FD_SETSIZE
96  
        monitored by select(). Operations with fd >= FD_SETSIZE
94  
        will fail with EINVAL.
97  
        will fail with EINVAL.
95  

98  

96  
        @return The maximum supported file descriptor value.
99  
        @return The maximum supported file descriptor value.
97  
    */
100  
    */
98  
    static constexpr int max_fd() noexcept
101  
    static constexpr int max_fd() noexcept
99  
    {
102  
    {
100  
        return FD_SETSIZE - 1;
103  
        return FD_SETSIZE - 1;
101  
    }
104  
    }
102  

105  

103  
    /** Register a descriptor for persistent monitoring.
106  
    /** Register a descriptor for persistent monitoring.
104  

107  

105  
        The fd is added to the registered_descs_ map and will be
108  
        The fd is added to the registered_descs_ map and will be
106  
        included in subsequent select() calls. The reactor is
109  
        included in subsequent select() calls. The reactor is
107  
        interrupted so a blocked select() rebuilds its fd_sets.
110  
        interrupted so a blocked select() rebuilds its fd_sets.
108  

111  

109  
        @param fd The file descriptor to register.
112  
        @param fd The file descriptor to register.
110  
        @param desc Pointer to descriptor state for this fd.
113  
        @param desc Pointer to descriptor state for this fd.
111  
    */
114  
    */
112  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
115  
    void register_descriptor(int fd, select_descriptor_state* desc) const;
113  

116  

114  
    /** Deregister a persistently registered descriptor.
117  
    /** Deregister a persistently registered descriptor.
115  

118  

116  
        @param fd The file descriptor to deregister.
119  
        @param fd The file descriptor to deregister.
117  
    */
120  
    */
118  
    void deregister_descriptor(int fd) const;
121  
    void deregister_descriptor(int fd) const;
119  

122  

120  
    /** Interrupt the reactor so it rebuilds its fd_sets.
123  
    /** Interrupt the reactor so it rebuilds its fd_sets.
121  

124  

122  
        Called when a write or connect op is registered after
125  
        Called when a write or connect op is registered after
123  
        the reactor's snapshot was taken. Without this, select()
126  
        the reactor's snapshot was taken. Without this, select()
124  
        may block not watching for writability on the fd.
127  
        may block not watching for writability on the fd.
125  
    */
128  
    */
126  
    void notify_reactor() const;
129  
    void notify_reactor() const;
127  

130  

128  
private:
131  
private:
129  
    void
132  
    void
130  
    run_task(lock_type& lock, context_type* ctx,
133  
    run_task(lock_type& lock, context_type* ctx,
131  
        long timeout_us) override;
134  
        long timeout_us) override;
132  
    void interrupt_reactor() const override;
135  
    void interrupt_reactor() const override;
133  
    long calculate_timeout(long requested_timeout_us) const;
136  
    long calculate_timeout(long requested_timeout_us) const;
134  

137  

135  
    // Self-pipe for interrupting select()
138  
    // Self-pipe for interrupting select()
136  
    int pipe_fds_[2]; // [0]=read, [1]=write
139  
    int pipe_fds_[2]; // [0]=read, [1]=write
137  

140  

138  
    // Per-fd tracking for fd_set building
141  
    // Per-fd tracking for fd_set building
139  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
142  
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
140  
    mutable int max_fd_ = -1;
143  
    mutable int max_fd_ = -1;
141  
};
144  
};
142  

145  

143  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
146  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
144  
    : pipe_fds_{-1, -1}
147  
    : pipe_fds_{-1, -1}
145  
    , max_fd_(-1)
148  
    , max_fd_(-1)
146  
{
149  
{
147  
    if (::pipe(pipe_fds_) < 0)
150  
    if (::pipe(pipe_fds_) < 0)
148  
        detail::throw_system_error(make_err(errno), "pipe");
151  
        detail::throw_system_error(make_err(errno), "pipe");
149  

152  

150  
    for (int i = 0; i < 2; ++i)
153  
    for (int i = 0; i < 2; ++i)
151  
    {
154  
    {
152  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
155  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
153  
        if (flags == -1)
156  
        if (flags == -1)
154  
        {
157  
        {
155  
            int errn = errno;
158  
            int errn = errno;
156  
            ::close(pipe_fds_[0]);
159  
            ::close(pipe_fds_[0]);
157  
            ::close(pipe_fds_[1]);
160  
            ::close(pipe_fds_[1]);
158  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
161  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
159  
        }
162  
        }
160  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
163  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
161  
        {
164  
        {
162  
            int errn = errno;
165  
            int errn = errno;
163  
            ::close(pipe_fds_[0]);
166  
            ::close(pipe_fds_[0]);
164  
            ::close(pipe_fds_[1]);
167  
            ::close(pipe_fds_[1]);
165  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
168  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
166  
        }
169  
        }
167  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
170  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
168  
        {
171  
        {
169  
            int errn = errno;
172  
            int errn = errno;
170  
            ::close(pipe_fds_[0]);
173  
            ::close(pipe_fds_[0]);
171  
            ::close(pipe_fds_[1]);
174  
            ::close(pipe_fds_[1]);
172  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
175  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
173  
        }
176  
        }
174  
    }
177  
    }
175  

178  

176  
    timer_svc_ = &get_timer_service(ctx, *this);
179  
    timer_svc_ = &get_timer_service(ctx, *this);
177  
    timer_svc_->set_on_earliest_changed(
180  
    timer_svc_->set_on_earliest_changed(
178  
        timer_service::callback(this, [](void* p) {
181  
        timer_service::callback(this, [](void* p) {
179  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
182  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
180  
        }));
183  
        }));
181  

184  

182  
    get_resolver_service(ctx, *this);
185  
    get_resolver_service(ctx, *this);
183  
    get_signal_service(ctx, *this);
186  
    get_signal_service(ctx, *this);
184  
    get_stream_file_service(ctx, *this);
187  
    get_stream_file_service(ctx, *this);
185  
    get_random_access_file_service(ctx, *this);
188  
    get_random_access_file_service(ctx, *this);
186  

189  

187  
    completed_ops_.push(&task_op_);
190  
    completed_ops_.push(&task_op_);
188  
}
191  
}
189  

192  

190  
inline select_scheduler::~select_scheduler()
193  
inline select_scheduler::~select_scheduler()
191  
{
194  
{
192  
    if (pipe_fds_[0] >= 0)
195  
    if (pipe_fds_[0] >= 0)
193  
        ::close(pipe_fds_[0]);
196  
        ::close(pipe_fds_[0]);
194  
    if (pipe_fds_[1] >= 0)
197  
    if (pipe_fds_[1] >= 0)
195  
        ::close(pipe_fds_[1]);
198  
        ::close(pipe_fds_[1]);
196  
}
199  
}
197  

200  

198  
inline void
201  
inline void
199  
select_scheduler::shutdown()
202  
select_scheduler::shutdown()
200  
{
203  
{
201  
    shutdown_drain();
204  
    shutdown_drain();
202  

205  

203  
    if (pipe_fds_[1] >= 0)
206  
    if (pipe_fds_[1] >= 0)
204  
        interrupt_reactor();
207  
        interrupt_reactor();
205  
}
208  
}
206  

209  

207  
inline void
210  
inline void
208  
select_scheduler::register_descriptor(
211  
select_scheduler::register_descriptor(
209  
    int fd, select_descriptor_state* desc) const
212  
    int fd, select_descriptor_state* desc) const
210  
{
213  
{
211  
    if (fd < 0 || fd >= FD_SETSIZE)
214  
    if (fd < 0 || fd >= FD_SETSIZE)
212  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
215  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
213  

216  

214  
    desc->registered_events = reactor_event_read | reactor_event_write;
217  
    desc->registered_events = reactor_event_read | reactor_event_write;
215  
    desc->fd                = fd;
218  
    desc->fd                = fd;
216  
    desc->scheduler_        = this;
219  
    desc->scheduler_        = this;
217  
    desc->mutex.set_enabled(!single_threaded_);
220  
    desc->mutex.set_enabled(!single_threaded_);
218  
    desc->ready_events_.store(0, std::memory_order_relaxed);
221  
    desc->ready_events_.store(0, std::memory_order_relaxed);
219  

222  

220  
    {
223  
    {
221  
        conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
224  
        conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
222  
        desc->impl_ref_.reset();
225  
        desc->impl_ref_.reset();
223  
        desc->read_ready  = false;
226  
        desc->read_ready  = false;
224  
        desc->write_ready = false;
227  
        desc->write_ready = false;
225  
    }
228  
    }
226  

229  

227  
    {
230  
    {
228  
        mutex_type::scoped_lock lock(mutex_);
231  
        mutex_type::scoped_lock lock(mutex_);
229  
        registered_descs_[fd] = desc;
232  
        registered_descs_[fd] = desc;
230  
        if (fd > max_fd_)
233  
        if (fd > max_fd_)
231  
            max_fd_ = fd;
234  
            max_fd_ = fd;
232  
    }
235  
    }
233  

236  

234  
    interrupt_reactor();
237  
    interrupt_reactor();
235  
}
238  
}
236  

239  

237  
inline void
240  
inline void
238  
select_scheduler::deregister_descriptor(int fd) const
241  
select_scheduler::deregister_descriptor(int fd) const
239  
{
242  
{
240  
    mutex_type::scoped_lock lock(mutex_);
243  
    mutex_type::scoped_lock lock(mutex_);
241  

244  

242  
    auto it = registered_descs_.find(fd);
245  
    auto it = registered_descs_.find(fd);
243  
    if (it == registered_descs_.end())
246  
    if (it == registered_descs_.end())
244  
        return;
247  
        return;
245  

248  

246  
    registered_descs_.erase(it);
249  
    registered_descs_.erase(it);
247  

250  

248  
    if (fd == max_fd_)
251  
    if (fd == max_fd_)
249  
    {
252  
    {
250  
        max_fd_ = pipe_fds_[0];
253  
        max_fd_ = pipe_fds_[0];
251  
        for (auto& [registered_fd, state] : registered_descs_)
254  
        for (auto& [registered_fd, state] : registered_descs_)
252  
        {
255  
        {
253  
            if (registered_fd > max_fd_)
256  
            if (registered_fd > max_fd_)
254  
                max_fd_ = registered_fd;
257  
                max_fd_ = registered_fd;
255  
        }
258  
        }
256  
    }
259  
    }
257  
}
260  
}
258  

261  

259  
inline void
262  
inline void
260  
select_scheduler::notify_reactor() const
263  
select_scheduler::notify_reactor() const
261  
{
264  
{
262  
    interrupt_reactor();
265  
    interrupt_reactor();
263  
}
266  
}
264  

267  

265  
inline void
268  
inline void
266  
select_scheduler::interrupt_reactor() const
269  
select_scheduler::interrupt_reactor() const
267  
{
270  
{
268  
    char byte               = 1;
271  
    char byte               = 1;
269  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
272  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
270  
}
273  
}
271  

274  

272  
inline long
275  
inline long
273  
select_scheduler::calculate_timeout(long requested_timeout_us) const
276  
select_scheduler::calculate_timeout(long requested_timeout_us) const
274  
{
277  
{
275  
    if (requested_timeout_us == 0)
278  
    if (requested_timeout_us == 0)
276  
        return 0;
279  
        return 0;
277  

280  

278  
    auto nearest = timer_svc_->nearest_expiry();
281  
    auto nearest = timer_svc_->nearest_expiry();
279  
    if (nearest == timer_service::time_point::max())
282  
    if (nearest == timer_service::time_point::max())
280  
        return requested_timeout_us;
283  
        return requested_timeout_us;
281  

284  

282  
    auto now = std::chrono::steady_clock::now();
285  
    auto now = std::chrono::steady_clock::now();
283  
    if (nearest <= now)
286  
    if (nearest <= now)
284  
        return 0;
287  
        return 0;
285  

288  

286  
    auto timer_timeout_us =
289  
    auto timer_timeout_us =
287  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
290  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
288  
            .count();
291  
            .count();
289  

292  

290  
    constexpr auto long_max =
293  
    constexpr auto long_max =
291  
        static_cast<long long>((std::numeric_limits<long>::max)());
294  
        static_cast<long long>((std::numeric_limits<long>::max)());
292  
    auto capped_timer_us =
295  
    auto capped_timer_us =
293  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
296  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
294  
                              static_cast<long long>(0)),
297  
                              static_cast<long long>(0)),
295  
                   long_max);
298  
                   long_max);
296  

299  

297  
    if (requested_timeout_us < 0)
300  
    if (requested_timeout_us < 0)
298  
        return static_cast<long>(capped_timer_us);
301  
        return static_cast<long>(capped_timer_us);
299  

302  

300  
    return static_cast<long>(
303  
    return static_cast<long>(
301  
        (std::min)(static_cast<long long>(requested_timeout_us),
304  
        (std::min)(static_cast<long long>(requested_timeout_us),
302  
                   capped_timer_us));
305  
                   capped_timer_us));
303  
}
306  
}
304  

307  

305  
inline void
308  
inline void
306  
select_scheduler::run_task(
309  
select_scheduler::run_task(
307  
    lock_type& lock, context_type* ctx, long timeout_us)
310  
    lock_type& lock, context_type* ctx, long timeout_us)
308  
{
311  
{
309  
    long effective_timeout_us =
312  
    long effective_timeout_us =
310  
        task_interrupted_ ? 0 : calculate_timeout(timeout_us);
313  
        task_interrupted_ ? 0 : calculate_timeout(timeout_us);
311  

314  

312  
    // Snapshot registered descriptors while holding lock.
315  
    // Snapshot registered descriptors while holding lock.
313  
    // Record which fds need write monitoring to avoid a hot loop:
316  
    // Record which fds need write monitoring to avoid a hot loop:
314  
    // select is level-triggered so writable sockets (nearly always
317  
    // select is level-triggered so writable sockets (nearly always
315  
    // writable) would cause select() to return immediately every
318  
    // writable) would cause select() to return immediately every
316  
    // iteration if unconditionally added to write_fds.
319  
    // iteration if unconditionally added to write_fds.
317  
    struct fd_entry
320  
    struct fd_entry
318  
    {
321  
    {
319  
        int fd;
322  
        int fd;
320  
        select_descriptor_state* desc;
323  
        select_descriptor_state* desc;
321  
        bool needs_write;
324  
        bool needs_write;
322  
    };
325  
    };
323  
    fd_entry snapshot[FD_SETSIZE];
326  
    fd_entry snapshot[FD_SETSIZE];
324  
    int snapshot_count = 0;
327  
    int snapshot_count = 0;
325  

328  

326  
    for (auto& [fd, desc] : registered_descs_)
329  
    for (auto& [fd, desc] : registered_descs_)
327  
    {
330  
    {
328  
        if (snapshot_count < FD_SETSIZE)
331  
        if (snapshot_count < FD_SETSIZE)
329  
        {
332  
        {
330  
            conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
333  
            conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
331  
            snapshot[snapshot_count].fd   = fd;
334  
            snapshot[snapshot_count].fd   = fd;
332  
            snapshot[snapshot_count].desc = desc;
335  
            snapshot[snapshot_count].desc = desc;
333  
            snapshot[snapshot_count].needs_write =
336  
            snapshot[snapshot_count].needs_write =
334  
                (desc->write_op || desc->connect_op);
337  
                (desc->write_op || desc->connect_op);
335  
            ++snapshot_count;
338  
            ++snapshot_count;
336  
        }
339  
        }
337  
    }
340  
    }
338  

341  

339  
    if (lock.owns_lock())
342  
    if (lock.owns_lock())
340  
        lock.unlock();
343  
        lock.unlock();
341  

344  

342  
    task_cleanup on_exit{this, &lock, ctx};
345  
    task_cleanup on_exit{this, &lock, ctx};
343  

346  

344  
    fd_set read_fds, write_fds, except_fds;
347  
    fd_set read_fds, write_fds, except_fds;
345  
    FD_ZERO(&read_fds);
348  
    FD_ZERO(&read_fds);
346  
    FD_ZERO(&write_fds);
349  
    FD_ZERO(&write_fds);
347  
    FD_ZERO(&except_fds);
350  
    FD_ZERO(&except_fds);
348  

351  

349  
    FD_SET(pipe_fds_[0], &read_fds);
352  
    FD_SET(pipe_fds_[0], &read_fds);
350  
    int nfds = pipe_fds_[0];
353  
    int nfds = pipe_fds_[0];
351  

354  

352  
    for (int i = 0; i < snapshot_count; ++i)
355  
    for (int i = 0; i < snapshot_count; ++i)
353  
    {
356  
    {
354  
        int fd = snapshot[i].fd;
357  
        int fd = snapshot[i].fd;
355  
        FD_SET(fd, &read_fds);
358  
        FD_SET(fd, &read_fds);
356  
        if (snapshot[i].needs_write)
359  
        if (snapshot[i].needs_write)
357  
            FD_SET(fd, &write_fds);
360  
            FD_SET(fd, &write_fds);
358  
        FD_SET(fd, &except_fds);
361  
        FD_SET(fd, &except_fds);
359  
        if (fd > nfds)
362  
        if (fd > nfds)
360  
            nfds = fd;
363  
            nfds = fd;
361  
    }
364  
    }
362  

365  

363  
    struct timeval tv;
366  
    struct timeval tv;
364  
    struct timeval* tv_ptr = nullptr;
367  
    struct timeval* tv_ptr = nullptr;
365  
    if (effective_timeout_us >= 0)
368  
    if (effective_timeout_us >= 0)
366  
    {
369  
    {
367  
        tv.tv_sec  = effective_timeout_us / 1000000;
370  
        tv.tv_sec  = effective_timeout_us / 1000000;
368  
        tv.tv_usec = effective_timeout_us % 1000000;
371  
        tv.tv_usec = effective_timeout_us % 1000000;
369  
        tv_ptr     = &tv;
372  
        tv_ptr     = &tv;
370  
    }
373  
    }
371  

374  

372  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
375  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
373  

376  

374  
    // EINTR: signal interrupted select(), just retry.
377  
    // EINTR: signal interrupted select(), just retry.
375  
    // EBADF: an fd was closed between snapshot and select(); retry
378  
    // EBADF: an fd was closed between snapshot and select(); retry
376  
    // with a fresh snapshot from registered_descs_.
379  
    // with a fresh snapshot from registered_descs_.
377  
    if (ready < 0)
380  
    if (ready < 0)
378  
    {
381  
    {
379  
        if (errno == EINTR || errno == EBADF)
382  
        if (errno == EINTR || errno == EBADF)
380  
            return;
383  
            return;
381  
        detail::throw_system_error(make_err(errno), "select");
384  
        detail::throw_system_error(make_err(errno), "select");
382  
    }
385  
    }
383  

386  

384  
    // Process timers outside the lock
387  
    // Process timers outside the lock
385  
    timer_svc_->process_expired();
388  
    timer_svc_->process_expired();
386  

389  

387  
    op_queue local_ops;
390  
    op_queue local_ops;
388  

391  

389  
    if (ready > 0)
392  
    if (ready > 0)
390  
    {
393  
    {
391  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
394  
        if (FD_ISSET(pipe_fds_[0], &read_fds))
392  
        {
395  
        {
393  
            char buf[256];
396  
            char buf[256];
394  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
397  
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
395  
            {
398  
            {
396  
            }
399  
            }
397  
        }
400  
        }
398  

401  

399  
        for (int i = 0; i < snapshot_count; ++i)
402  
        for (int i = 0; i < snapshot_count; ++i)
400  
        {
403  
        {
401  
            int fd                        = snapshot[i].fd;
404  
            int fd                        = snapshot[i].fd;
402  
            select_descriptor_state* desc = snapshot[i].desc;
405  
            select_descriptor_state* desc = snapshot[i].desc;
403  

406  

404  
            std::uint32_t flags = 0;
407  
            std::uint32_t flags = 0;
405  
            if (FD_ISSET(fd, &read_fds))
408  
            if (FD_ISSET(fd, &read_fds))
406  
                flags |= reactor_event_read;
409  
                flags |= reactor_event_read;
407  
            if (FD_ISSET(fd, &write_fds))
410  
            if (FD_ISSET(fd, &write_fds))
408  
                flags |= reactor_event_write;
411  
                flags |= reactor_event_write;
409  
            if (FD_ISSET(fd, &except_fds))
412  
            if (FD_ISSET(fd, &except_fds))
410  
                flags |= reactor_event_error;
413  
                flags |= reactor_event_error;
411  

414  

412  
            if (flags == 0)
415  
            if (flags == 0)
413  
                continue;
416  
                continue;
414  

417  

415  
            desc->add_ready_events(flags);
418  
            desc->add_ready_events(flags);
416  

419  

417  
            bool expected = false;
420  
            bool expected = false;
418  
            if (desc->is_enqueued_.compare_exchange_strong(
421  
            if (desc->is_enqueued_.compare_exchange_strong(
419  
                    expected, true, std::memory_order_release,
422  
                    expected, true, std::memory_order_release,
420  
                    std::memory_order_relaxed))
423  
                    std::memory_order_relaxed))
421  
            {
424  
            {
422  
                local_ops.push(desc);
425  
                local_ops.push(desc);
423  
            }
426  
            }
424  
        }
427  
        }
425  
    }
428  
    }
426  

429  

427  
    lock.lock();
430  
    lock.lock();
428  

431  

429  
    if (!local_ops.empty())
432  
    if (!local_ops.empty())
430  
        completed_ops_.splice(local_ops);
433  
        completed_ops_.splice(local_ops);
431  
}
434  
}
432  

435  

433  
} // namespace boost::corosio::detail
436  
} // namespace boost::corosio::detail
434  

437  

435  
#endif // BOOST_COROSIO_HAS_SELECT
438  
#endif // BOOST_COROSIO_HAS_SELECT
436  

439  

437  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
440  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP