TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/select/select_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <unistd.h>
34 : #include <errno.h>
35 : #include <fcntl.h>
36 :
37 : #include <atomic>
38 : #include <chrono>
39 : #include <cstdint>
40 : #include <limits>
41 : #include <mutex>
42 : #include <unordered_map>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : struct select_op;
47 : struct select_descriptor_state;
48 :
49 : /** POSIX scheduler using select() for I/O multiplexing.
50 :
51 : This scheduler implements the scheduler interface using the POSIX select()
52 : call for I/O event notification. It inherits the shared reactor threading
53 : model from reactor_scheduler: signal state machine, inline completion
54 : budget, work counting, and the do_one event loop.
55 :
56 : The design mirrors epoll_scheduler for behavioral consistency:
57 : - Same single-reactor thread coordination model
58 : - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59 : - Same timer integration pattern
60 :
61 : Known Limitations:
62 : - FD_SETSIZE (~1024) limits maximum concurrent connections
63 : - O(n) scanning: rebuilds fd_sets each iteration
64 : - Level-triggered only (no edge-triggered mode)
65 :
66 : @par Thread Safety
67 : All public member functions are thread-safe.
68 : */
69 : class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
70 : {
71 : public:
72 : /// Select needs write-direction notification to rebuild fd_sets.
73 : static constexpr bool needs_write_notification = true;
74 :
75 : /** Construct the scheduler.
76 :
77 : Creates a self-pipe for reactor interruption.
78 :
79 : @param ctx Reference to the owning execution_context.
80 : @param concurrency_hint Hint for expected thread count (unused).
81 : */
82 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
83 :
84 : /// Destroy the scheduler.
85 : ~select_scheduler() override;
86 :
87 : select_scheduler(select_scheduler const&) = delete;
88 : select_scheduler& operator=(select_scheduler const&) = delete;
89 :
90 : /// Shut down the scheduler, draining pending operations.
91 : void shutdown() override;
92 :
93 : /** Return the maximum file descriptor value supported.
94 :
95 : Returns FD_SETSIZE - 1, the maximum fd value that can be
96 : monitored by select(). Operations with fd >= FD_SETSIZE
97 : will fail with EINVAL.
98 :
99 : @return The maximum supported file descriptor value.
100 : */
101 : static constexpr int max_fd() noexcept
102 : {
103 : return FD_SETSIZE - 1;
104 : }
105 :
106 : /** Register a descriptor for persistent monitoring.
107 :
108 : The fd is added to the registered_descs_ map and will be
109 : included in subsequent select() calls. The reactor is
110 : interrupted so a blocked select() rebuilds its fd_sets.
111 :
112 : @param fd The file descriptor to register.
113 : @param desc Pointer to descriptor state for this fd.
114 : */
115 : void register_descriptor(int fd, select_descriptor_state* desc) const;
116 :
117 : /** Deregister a persistently registered descriptor.
118 :
119 : @param fd The file descriptor to deregister.
120 : */
121 : void deregister_descriptor(int fd) const;
122 :
123 : /** Interrupt the reactor so it rebuilds its fd_sets.
124 :
125 : Called when a write or connect op is registered after
126 : the reactor's snapshot was taken. Without this, select()
127 : may block not watching for writability on the fd.
128 : */
129 : void notify_reactor() const;
130 :
131 : private:
132 : void
133 : run_task(lock_type& lock, context_type* ctx,
134 : long timeout_us) override;
135 : void interrupt_reactor() const override;
136 : long calculate_timeout(long requested_timeout_us) const;
137 :
138 : // Self-pipe for interrupting select()
139 : int pipe_fds_[2]; // [0]=read, [1]=write
140 :
141 : // Per-fd tracking for fd_set building
142 : mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
143 : mutable int max_fd_ = -1;
144 : };
145 :
146 HIT 230 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
147 230 : : pipe_fds_{-1, -1}
148 230 : , max_fd_(-1)
149 : {
150 230 : if (::pipe(pipe_fds_) < 0)
151 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
152 :
153 HIT 690 : for (int i = 0; i < 2; ++i)
154 : {
155 460 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
156 460 : if (flags == -1)
157 : {
158 MIS 0 : int errn = errno;
159 0 : ::close(pipe_fds_[0]);
160 0 : ::close(pipe_fds_[1]);
161 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
162 : }
163 HIT 460 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
164 : {
165 MIS 0 : int errn = errno;
166 0 : ::close(pipe_fds_[0]);
167 0 : ::close(pipe_fds_[1]);
168 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
169 : }
170 HIT 460 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
171 : {
172 MIS 0 : int errn = errno;
173 0 : ::close(pipe_fds_[0]);
174 0 : ::close(pipe_fds_[1]);
175 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
176 : }
177 : }
178 :
179 HIT 230 : timer_svc_ = &get_timer_service(ctx, *this);
180 230 : timer_svc_->set_on_earliest_changed(
181 4071 : timer_service::callback(this, [](void* p) {
182 3841 : static_cast<select_scheduler*>(p)->interrupt_reactor();
183 3841 : }));
184 :
185 230 : get_resolver_service(ctx, *this);
186 230 : get_signal_service(ctx, *this);
187 230 : get_stream_file_service(ctx, *this);
188 230 : get_random_access_file_service(ctx, *this);
189 :
190 230 : completed_ops_.push(&task_op_);
191 230 : }
192 :
193 460 : inline select_scheduler::~select_scheduler()
194 : {
195 230 : if (pipe_fds_[0] >= 0)
196 230 : ::close(pipe_fds_[0]);
197 230 : if (pipe_fds_[1] >= 0)
198 230 : ::close(pipe_fds_[1]);
199 460 : }
200 :
201 : inline void
202 230 : select_scheduler::shutdown()
203 : {
204 230 : shutdown_drain();
205 :
206 230 : if (pipe_fds_[1] >= 0)
207 230 : interrupt_reactor();
208 230 : }
209 :
210 : inline void
211 7383 : select_scheduler::register_descriptor(
212 : int fd, select_descriptor_state* desc) const
213 : {
214 7383 : if (fd < 0 || fd >= FD_SETSIZE)
215 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
216 :
217 HIT 7383 : desc->registered_events = reactor_event_read | reactor_event_write;
218 7383 : desc->fd = fd;
219 7383 : desc->scheduler_ = this;
220 7383 : desc->mutex.set_enabled(!single_threaded_);
221 7383 : desc->ready_events_.store(0, std::memory_order_relaxed);
222 :
223 : {
224 7383 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
225 7383 : desc->impl_ref_.reset();
226 7383 : desc->read_ready = false;
227 7383 : desc->write_ready = false;
228 7383 : }
229 :
230 : {
231 7383 : mutex_type::scoped_lock lock(mutex_);
232 7383 : registered_descs_[fd] = desc;
233 7383 : if (fd > max_fd_)
234 7379 : max_fd_ = fd;
235 7383 : }
236 :
237 7383 : interrupt_reactor();
238 7383 : }
239 :
240 : inline void
241 7383 : select_scheduler::deregister_descriptor(int fd) const
242 : {
243 7383 : mutex_type::scoped_lock lock(mutex_);
244 :
245 7383 : auto it = registered_descs_.find(fd);
246 7383 : if (it == registered_descs_.end())
247 MIS 0 : return;
248 :
249 HIT 7383 : registered_descs_.erase(it);
250 :
251 7383 : if (fd == max_fd_)
252 : {
253 7321 : max_fd_ = pipe_fds_[0];
254 14532 : for (auto& [registered_fd, state] : registered_descs_)
255 : {
256 7211 : if (registered_fd > max_fd_)
257 7201 : max_fd_ = registered_fd;
258 : }
259 : }
260 7383 : }
261 :
262 : inline void
263 3617 : select_scheduler::notify_reactor() const
264 : {
265 3617 : interrupt_reactor();
266 3617 : }
267 :
268 : inline void
269 15226 : select_scheduler::interrupt_reactor() const
270 : {
271 15226 : char byte = 1;
272 15226 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
273 15226 : }
274 :
275 : inline long
276 121870 : select_scheduler::calculate_timeout(long requested_timeout_us) const
277 : {
278 121870 : if (requested_timeout_us == 0)
279 MIS 0 : return 0;
280 :
281 HIT 121870 : auto nearest = timer_svc_->nearest_expiry();
282 121870 : if (nearest == timer_service::time_point::max())
283 46 : return requested_timeout_us;
284 :
285 121824 : auto now = std::chrono::steady_clock::now();
286 121824 : if (nearest <= now)
287 374 : return 0;
288 :
289 : auto timer_timeout_us =
290 121450 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
291 121450 : .count();
292 :
293 121450 : constexpr auto long_max =
294 : static_cast<long long>((std::numeric_limits<long>::max)());
295 : auto capped_timer_us =
296 121450 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
297 121450 : static_cast<long long>(0)),
298 121450 : long_max);
299 :
300 121450 : if (requested_timeout_us < 0)
301 121444 : return static_cast<long>(capped_timer_us);
302 :
303 : return static_cast<long>(
304 6 : (std::min)(static_cast<long long>(requested_timeout_us),
305 6 : capped_timer_us));
306 : }
307 :
308 : inline void
309 146301 : select_scheduler::run_task(
310 : lock_type& lock, context_type* ctx, long timeout_us)
311 : {
312 : long effective_timeout_us =
313 146301 : task_interrupted_ ? 0 : calculate_timeout(timeout_us);
314 :
315 : // Snapshot registered descriptors while holding lock.
316 : // Record which fds need write monitoring to avoid a hot loop:
317 : // select is level-triggered so writable sockets (nearly always
318 : // writable) would cause select() to return immediately every
319 : // iteration if unconditionally added to write_fds.
320 : struct fd_entry
321 : {
322 : int fd;
323 : select_descriptor_state* desc;
324 : bool needs_write;
325 : };
326 : fd_entry snapshot[FD_SETSIZE];
327 146301 : int snapshot_count = 0;
328 :
329 434243 : for (auto& [fd, desc] : registered_descs_)
330 : {
331 287942 : if (snapshot_count < FD_SETSIZE)
332 : {
333 287942 : conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
334 287942 : snapshot[snapshot_count].fd = fd;
335 287942 : snapshot[snapshot_count].desc = desc;
336 287942 : snapshot[snapshot_count].needs_write =
337 287942 : (desc->write_op || desc->connect_op);
338 287942 : ++snapshot_count;
339 287942 : }
340 : }
341 :
342 146301 : if (lock.owns_lock())
343 121870 : lock.unlock();
344 :
345 146301 : task_cleanup on_exit{this, &lock, ctx};
346 :
347 : fd_set read_fds, write_fds, except_fds;
348 2487117 : FD_ZERO(&read_fds);
349 2487117 : FD_ZERO(&write_fds);
350 2487117 : FD_ZERO(&except_fds);
351 :
352 146301 : FD_SET(pipe_fds_[0], &read_fds);
353 146301 : int nfds = pipe_fds_[0];
354 :
355 434243 : for (int i = 0; i < snapshot_count; ++i)
356 : {
357 287942 : int fd = snapshot[i].fd;
358 287942 : FD_SET(fd, &read_fds);
359 287942 : if (snapshot[i].needs_write)
360 3617 : FD_SET(fd, &write_fds);
361 287942 : FD_SET(fd, &except_fds);
362 287942 : if (fd > nfds)
363 146035 : nfds = fd;
364 : }
365 :
366 : struct timeval tv;
367 146301 : struct timeval* tv_ptr = nullptr;
368 146301 : if (effective_timeout_us >= 0)
369 : {
370 146255 : tv.tv_sec = effective_timeout_us / 1000000;
371 146255 : tv.tv_usec = effective_timeout_us % 1000000;
372 146255 : tv_ptr = &tv;
373 : }
374 :
375 146301 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
376 :
377 : // EINTR: signal interrupted select(), just retry.
378 : // EBADF: an fd was closed between snapshot and select(); retry
379 : // with a fresh snapshot from registered_descs_.
380 146301 : if (ready < 0)
381 : {
382 MIS 0 : if (errno == EINTR || errno == EBADF)
383 0 : return;
384 0 : detail::throw_system_error(make_err(errno), "select");
385 : }
386 :
387 : // Process timers outside the lock
388 HIT 146301 : timer_svc_->process_expired();
389 :
390 146301 : op_queue local_ops;
391 :
392 146301 : if (ready > 0)
393 : {
394 127360 : if (FD_ISSET(pipe_fds_[0], &read_fds))
395 : {
396 : char buf[256];
397 15094 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
398 : {
399 : }
400 : }
401 :
402 366470 : for (int i = 0; i < snapshot_count; ++i)
403 : {
404 239110 : int fd = snapshot[i].fd;
405 239110 : select_descriptor_state* desc = snapshot[i].desc;
406 :
407 239110 : std::uint32_t flags = 0;
408 239110 : if (FD_ISSET(fd, &read_fds))
409 123609 : flags |= reactor_event_read;
410 239110 : if (FD_ISSET(fd, &write_fds))
411 3617 : flags |= reactor_event_write;
412 239110 : if (FD_ISSET(fd, &except_fds))
413 MIS 0 : flags |= reactor_event_error;
414 :
415 HIT 239110 : if (flags == 0)
416 111886 : continue;
417 :
418 127224 : desc->add_ready_events(flags);
419 :
420 127224 : bool expected = false;
421 127224 : if (desc->is_enqueued_.compare_exchange_strong(
422 : expected, true, std::memory_order_release,
423 : std::memory_order_relaxed))
424 : {
425 127224 : local_ops.push(desc);
426 : }
427 : }
428 : }
429 :
430 146301 : lock.lock();
431 :
432 146301 : if (!local_ops.empty())
433 123610 : completed_ops_.splice(local_ops);
434 146301 : }
435 :
436 : } // namespace boost::corosio::detail
437 :
438 : #endif // BOOST_COROSIO_HAS_SELECT
439 :
440 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|