include/boost/corosio/native/detail/select/select_scheduler.hpp

87.9% Lines (145/165) 100.0% List of functions (10/10)
select_scheduler.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/select/select_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <sys/select.h>
33 #include <unistd.h>
34 #include <errno.h>
35 #include <fcntl.h>
36
37 #include <atomic>
38 #include <chrono>
39 #include <cstdint>
40 #include <limits>
41 #include <mutex>
42 #include <unordered_map>
43
44 namespace boost::corosio::detail {
45
46 struct select_op;
47 struct select_descriptor_state;
48
49 /** POSIX scheduler using select() for I/O multiplexing.
50
51 This scheduler implements the scheduler interface using the POSIX select()
52 call for I/O event notification. It inherits the shared reactor threading
53 model from reactor_scheduler: signal state machine, inline completion
54 budget, work counting, and the do_one event loop.
55
56 The design mirrors epoll_scheduler for behavioral consistency:
57 - Same single-reactor thread coordination model
58 - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59 - Same timer integration pattern
60
61 Known Limitations:
62 - FD_SETSIZE (~1024) limits maximum concurrent connections
63 - O(n) scanning: rebuilds fd_sets each iteration
64 - Level-triggered only (no edge-triggered mode)
65
66 @par Thread Safety
67 All public member functions are thread-safe.
68 */
69 class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
70 {
71 public:
72 /// Select needs write-direction notification to rebuild fd_sets.
73 static constexpr bool needs_write_notification = true;
74
75 /** Construct the scheduler.
76
77 Creates a self-pipe for reactor interruption.
78
79 @param ctx Reference to the owning execution_context.
80 @param concurrency_hint Hint for expected thread count (unused).
81 */
82 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
83
84 /// Destroy the scheduler.
85 ~select_scheduler() override;
86
87 select_scheduler(select_scheduler const&) = delete;
88 select_scheduler& operator=(select_scheduler const&) = delete;
89
90 /// Shut down the scheduler, draining pending operations.
91 void shutdown() override;
92
93 /** Return the maximum file descriptor value supported.
94
95 Returns FD_SETSIZE - 1, the maximum fd value that can be
96 monitored by select(). Operations with fd >= FD_SETSIZE
97 will fail with EINVAL.
98
99 @return The maximum supported file descriptor value.
100 */
101 static constexpr int max_fd() noexcept
102 {
103 return FD_SETSIZE - 1;
104 }
105
106 /** Register a descriptor for persistent monitoring.
107
108 The fd is added to the registered_descs_ map and will be
109 included in subsequent select() calls. The reactor is
110 interrupted so a blocked select() rebuilds its fd_sets.
111
112 @param fd The file descriptor to register.
113 @param desc Pointer to descriptor state for this fd.
114 */
115 void register_descriptor(int fd, select_descriptor_state* desc) const;
116
117 /** Deregister a persistently registered descriptor.
118
119 @param fd The file descriptor to deregister.
120 */
121 void deregister_descriptor(int fd) const;
122
123 /** Interrupt the reactor so it rebuilds its fd_sets.
124
125 Called when a write or connect op is registered after
126 the reactor's snapshot was taken. Without this, select()
127 may block not watching for writability on the fd.
128 */
129 void notify_reactor() const;
130
131 private:
132 void
133 run_task(lock_type& lock, context_type* ctx,
134 long timeout_us) override;
135 void interrupt_reactor() const override;
136 long calculate_timeout(long requested_timeout_us) const;
137
138 // Self-pipe for interrupting select()
139 int pipe_fds_[2]; // [0]=read, [1]=write
140
141 // Per-fd tracking for fd_set building
142 mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
143 mutable int max_fd_ = -1;
144 };
145
146 230x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
147 230x : pipe_fds_{-1, -1}
148 230x , max_fd_(-1)
149 {
150 230x if (::pipe(pipe_fds_) < 0)
151 detail::throw_system_error(make_err(errno), "pipe");
152
153 690x for (int i = 0; i < 2; ++i)
154 {
155 460x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
156 460x if (flags == -1)
157 {
158 int errn = errno;
159 ::close(pipe_fds_[0]);
160 ::close(pipe_fds_[1]);
161 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
162 }
163 460x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
164 {
165 int errn = errno;
166 ::close(pipe_fds_[0]);
167 ::close(pipe_fds_[1]);
168 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
169 }
170 460x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
171 {
172 int errn = errno;
173 ::close(pipe_fds_[0]);
174 ::close(pipe_fds_[1]);
175 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
176 }
177 }
178
179 230x timer_svc_ = &get_timer_service(ctx, *this);
180 230x timer_svc_->set_on_earliest_changed(
181 2528x timer_service::callback(this, [](void* p) {
182 2298x static_cast<select_scheduler*>(p)->interrupt_reactor();
183 2298x }));
184
185 230x get_resolver_service(ctx, *this);
186 230x get_signal_service(ctx, *this);
187 230x get_stream_file_service(ctx, *this);
188 230x get_random_access_file_service(ctx, *this);
189
190 230x completed_ops_.push(&task_op_);
191 230x }
192
193 460x inline select_scheduler::~select_scheduler()
194 {
195 230x if (pipe_fds_[0] >= 0)
196 230x ::close(pipe_fds_[0]);
197 230x if (pipe_fds_[1] >= 0)
198 230x ::close(pipe_fds_[1]);
199 460x }
200
201 inline void
202 230x select_scheduler::shutdown()
203 {
204 230x shutdown_drain();
205
206 230x if (pipe_fds_[1] >= 0)
207 230x interrupt_reactor();
208 230x }
209
210 inline void
211 4296x select_scheduler::register_descriptor(
212 int fd, select_descriptor_state* desc) const
213 {
214 4296x if (fd < 0 || fd >= FD_SETSIZE)
215 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
216
217 4296x desc->registered_events = reactor_event_read | reactor_event_write;
218 4296x desc->fd = fd;
219 4296x desc->scheduler_ = this;
220 4296x desc->mutex.set_enabled(!single_threaded_);
221 4296x desc->ready_events_.store(0, std::memory_order_relaxed);
222
223 {
224 4296x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
225 4296x desc->impl_ref_.reset();
226 4296x desc->read_ready = false;
227 4296x desc->write_ready = false;
228 4296x }
229
230 {
231 4296x mutex_type::scoped_lock lock(mutex_);
232 4296x registered_descs_[fd] = desc;
233 4296x if (fd > max_fd_)
234 4292x max_fd_ = fd;
235 4296x }
236
237 4296x interrupt_reactor();
238 4296x }
239
240 inline void
241 4296x select_scheduler::deregister_descriptor(int fd) const
242 {
243 4296x mutex_type::scoped_lock lock(mutex_);
244
245 4296x auto it = registered_descs_.find(fd);
246 4296x if (it == registered_descs_.end())
247 return;
248
249 4296x registered_descs_.erase(it);
250
251 4296x if (fd == max_fd_)
252 {
253 4233x max_fd_ = pipe_fds_[0];
254 8356x for (auto& [registered_fd, state] : registered_descs_)
255 {
256 4123x if (registered_fd > max_fd_)
257 4113x max_fd_ = registered_fd;
258 }
259 }
260 4296x }
261
262 inline void
263 2074x select_scheduler::notify_reactor() const
264 {
265 2074x interrupt_reactor();
266 2074x }
267
268 inline void
269 9053x select_scheduler::interrupt_reactor() const
270 {
271 9053x char byte = 1;
272 9053x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
273 9053x }
274
275 inline long
276 115269x select_scheduler::calculate_timeout(long requested_timeout_us) const
277 {
278 115269x if (requested_timeout_us == 0)
279 return 0;
280
281 115269x auto nearest = timer_svc_->nearest_expiry();
282 115269x if (nearest == timer_service::time_point::max())
283 46x return requested_timeout_us;
284
285 115223x auto now = std::chrono::steady_clock::now();
286 115223x if (nearest <= now)
287 433x return 0;
288
289 auto timer_timeout_us =
290 114790x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
291 114790x .count();
292
293 114790x constexpr auto long_max =
294 static_cast<long long>((std::numeric_limits<long>::max)());
295 auto capped_timer_us =
296 114790x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
297 114790x static_cast<long long>(0)),
298 114790x long_max);
299
300 114790x if (requested_timeout_us < 0)
301 114784x return static_cast<long>(capped_timer_us);
302
303 return static_cast<long>(
304 6x (std::min)(static_cast<long long>(requested_timeout_us),
305 6x capped_timer_us));
306 }
307
308 inline void
309 138308x select_scheduler::run_task(
310 lock_type& lock, context_type* ctx, long timeout_us)
311 {
312 long effective_timeout_us =
313 138308x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
314
315 // Snapshot registered descriptors while holding lock.
316 // Record which fds need write monitoring to avoid a hot loop:
317 // select is level-triggered so writable sockets (nearly always
318 // writable) would cause select() to return immediately every
319 // iteration if unconditionally added to write_fds.
320 struct fd_entry
321 {
322 int fd;
323 select_descriptor_state* desc;
324 bool needs_write;
325 };
326 fd_entry snapshot[FD_SETSIZE];
327 138308x int snapshot_count = 0;
328
329 413074x for (auto& [fd, desc] : registered_descs_)
330 {
331 274766x if (snapshot_count < FD_SETSIZE)
332 {
333 274766x conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
334 274766x snapshot[snapshot_count].fd = fd;
335 274766x snapshot[snapshot_count].desc = desc;
336 274766x snapshot[snapshot_count].needs_write =
337 274766x (desc->write_op || desc->connect_op);
338 274766x ++snapshot_count;
339 274766x }
340 }
341
342 138308x if (lock.owns_lock())
343 115269x lock.unlock();
344
345 138308x task_cleanup on_exit{this, &lock, ctx};
346
347 fd_set read_fds, write_fds, except_fds;
348 2351236x FD_ZERO(&read_fds);
349 2351236x FD_ZERO(&write_fds);
350 2351236x FD_ZERO(&except_fds);
351
352 138308x FD_SET(pipe_fds_[0], &read_fds);
353 138308x int nfds = pipe_fds_[0];
354
355 413074x for (int i = 0; i < snapshot_count; ++i)
356 {
357 274766x int fd = snapshot[i].fd;
358 274766x FD_SET(fd, &read_fds);
359 274766x if (snapshot[i].needs_write)
360 2074x FD_SET(fd, &write_fds);
361 274766x FD_SET(fd, &except_fds);
362 274766x if (fd > nfds)
363 138041x nfds = fd;
364 }
365
366 struct timeval tv;
367 138308x struct timeval* tv_ptr = nullptr;
368 138308x if (effective_timeout_us >= 0)
369 {
370 138262x tv.tv_sec = effective_timeout_us / 1000000;
371 138262x tv.tv_usec = effective_timeout_us % 1000000;
372 138262x tv_ptr = &tv;
373 }
374
375 138308x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
376
377 // EINTR: signal interrupted select(), just retry.
378 // EBADF: an fd was closed between snapshot and select(); retry
379 // with a fresh snapshot from registered_descs_.
380 138308x if (ready < 0)
381 {
382 if (errno == EINTR || errno == EBADF)
383 return;
384 detail::throw_system_error(make_err(errno), "select");
385 }
386
387 // Process timers outside the lock
388 138308x timer_svc_->process_expired();
389
390 138308x op_queue local_ops;
391
392 138308x if (ready > 0)
393 {
394 122412x if (FD_ISSET(pipe_fds_[0], &read_fds))
395 {
396 char buf[256];
397 8920x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
398 {
399 }
400 }
401
402 350419x for (int i = 0; i < snapshot_count; ++i)
403 {
404 228007x int fd = snapshot[i].fd;
405 228007x select_descriptor_state* desc = snapshot[i].desc;
406
407 228007x std::uint32_t flags = 0;
408 228007x if (FD_ISSET(fd, &read_fds))
409 120205x flags |= reactor_event_read;
410 228007x if (FD_ISSET(fd, &write_fds))
411 2074x flags |= reactor_event_write;
412 228007x if (FD_ISSET(fd, &except_fds))
413 flags |= reactor_event_error;
414
415 228007x if (flags == 0)
416 105731x continue;
417
418 122276x desc->add_ready_events(flags);
419
420 122276x bool expected = false;
421 122276x if (desc->is_enqueued_.compare_exchange_strong(
422 expected, true, std::memory_order_release,
423 std::memory_order_relaxed))
424 {
425 122276x local_ops.push(desc);
426 }
427 }
428 }
429
430 138308x lock.lock();
431
432 138308x if (!local_ops.empty())
433 120206x completed_ops_.splice(local_ops);
434 138308x }
435
436 } // namespace boost::corosio::detail
437
438 #endif // BOOST_COROSIO_HAS_SELECT
439
440 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
441