TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : /// Stop-token callback that invokes cancel() on the target op.
54 : struct canceller
55 : {
56 : reactor_op* op;
57 HIT 199 : void operator()() const noexcept
58 : {
59 199 : op->cancel();
60 199 : }
61 : };
62 :
63 : /// Caller's coroutine handle to resume on completion.
64 : std::coroutine_handle<> h;
65 :
66 : /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 : detail::continuation_op cont_op;
68 :
69 : /// Executor for dispatching the completion.
70 : capy::executor_ref ex;
71 :
72 : /// Output pointer for the error code.
73 : std::error_code* ec_out = nullptr;
74 :
75 : /// Output pointer for bytes transferred.
76 : std::size_t* bytes_out = nullptr;
77 :
78 : /// File descriptor this operation targets.
79 : int fd = -1;
80 :
81 : /// Stop-token callback registration.
82 : std::optional<std::stop_callback<canceller>> stop_cb;
83 :
84 : /// Owning socket impl (for stop_token cancellation).
85 : Socket* socket_impl_ = nullptr;
86 :
87 : /// Owning acceptor impl (for stop_token cancellation).
88 : Acceptor* acceptor_impl_ = nullptr;
89 :
90 73019 : reactor_op() = default;
91 :
92 : /// Reset operation state for reuse.
93 410378 : void reset() noexcept
94 : {
95 410378 : fd = -1;
96 410378 : errn = 0;
97 410378 : bytes_transferred = 0;
98 410378 : cancelled.store(false, std::memory_order_relaxed);
99 410378 : impl_ptr.reset();
100 410378 : socket_impl_ = nullptr;
101 410378 : acceptor_impl_ = nullptr;
102 410378 : }
103 :
104 : /// Return true if this is a read-direction operation.
105 39410 : virtual bool is_read_operation() const noexcept
106 : {
107 39410 : return false;
108 : }
109 :
110 : /// Cancel this operation via the owning impl.
111 : virtual void cancel() noexcept = 0;
112 :
113 : /// Destroy without invoking.
114 MIS 0 : void destroy() override
115 : {
116 0 : stop_cb.reset();
117 0 : reactor_op_base::destroy();
118 0 : }
119 :
120 : /// Arm the stop-token callback for a socket operation.
121 HIT 87182 : void start(std::stop_token const& token, Socket* impl)
122 : {
123 87182 : cancelled.store(false, std::memory_order_release);
124 87182 : stop_cb.reset();
125 87182 : socket_impl_ = impl;
126 87182 : acceptor_impl_ = nullptr;
127 :
128 87182 : if (token.stop_possible())
129 197 : stop_cb.emplace(token, canceller{this});
130 87182 : }
131 :
132 : /// Arm the stop-token callback for an acceptor operation.
133 7995 : void start(std::stop_token const& token, Acceptor* impl)
134 : {
135 7995 : cancelled.store(false, std::memory_order_release);
136 7995 : stop_cb.reset();
137 7995 : socket_impl_ = nullptr;
138 7995 : acceptor_impl_ = impl;
139 :
140 7995 : if (token.stop_possible())
141 9 : stop_cb.emplace(token, canceller{this});
142 7995 : }
143 : };
144 :
145 : /** Shared connect operation.
146 :
147 : Checks SO_ERROR for connect completion status. The operator()()
148 : and cancel() are provided by the concrete backend type.
149 :
150 : @tparam Base The backend's base op type.
151 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
152 : */
153 : template<class Base, class Endpoint = endpoint>
154 : struct reactor_connect_op : Base
155 : {
156 : /// Endpoint to connect to.
157 : Endpoint target_endpoint;
158 :
159 : /// Reset operation state for reuse.
160 8001 : void reset() noexcept
161 : {
162 8001 : Base::reset();
163 8001 : target_endpoint = Endpoint{};
164 8001 : }
165 :
166 7982 : void perform_io() noexcept override
167 : {
168 7982 : int err = 0;
169 7982 : socklen_t len = sizeof(err);
170 7982 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
171 MIS 0 : err = errno;
172 HIT 7982 : this->complete(err, 0);
173 7982 : }
174 : };
175 :
176 : /** Shared scatter-read operation.
177 :
178 : Uses readv() with an EINTR retry loop.
179 :
180 : @tparam Base The backend's base op type.
181 : */
182 : template<class Base>
183 : struct reactor_read_op : Base
184 : {
185 : /// Maximum scatter-gather buffer count.
186 : static constexpr std::size_t max_buffers = 16;
187 :
188 : /// Scatter-gather I/O vectors.
189 : iovec iovecs[max_buffers];
190 :
191 : /// Number of active I/O vectors.
192 : int iovec_count = 0;
193 :
194 : /// True for zero-length reads (completed immediately).
195 : bool empty_buffer_read = false;
196 :
197 : /// Return true (this is a read-direction operation).
198 39438 : bool is_read_operation() const noexcept override
199 : {
200 39438 : return !empty_buffer_read;
201 : }
202 :
203 197286 : void reset() noexcept
204 : {
205 197286 : Base::reset();
206 197286 : iovec_count = 0;
207 197286 : empty_buffer_read = false;
208 197286 : }
209 :
210 325 : void perform_io() noexcept override
211 : {
212 : ssize_t n;
213 : do
214 : {
215 325 : n = ::readv(this->fd, iovecs, iovec_count);
216 : }
217 325 : while (n < 0 && errno == EINTR);
218 :
219 325 : if (n >= 0)
220 97 : this->complete(0, static_cast<std::size_t>(n));
221 : else
222 228 : this->complete(errno, 0);
223 325 : }
224 : };
225 :
226 : /** Shared gather-write operation.
227 :
228 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
229 : which returns ssize_t (bytes written or -1 with errno set).
230 :
231 : @tparam Base The backend's base op type.
232 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
233 : */
234 : template<class Base, class WritePolicy>
235 : struct reactor_write_op : Base
236 : {
237 : /// The write syscall policy type.
238 : using write_policy = WritePolicy;
239 :
240 : /// Maximum scatter-gather buffer count.
241 : static constexpr std::size_t max_buffers = 16;
242 :
243 : /// Scatter-gather I/O vectors.
244 : iovec iovecs[max_buffers];
245 :
246 : /// Number of active I/O vectors.
247 : int iovec_count = 0;
248 :
249 196992 : void reset() noexcept
250 : {
251 196992 : Base::reset();
252 196992 : iovec_count = 0;
253 196992 : }
254 :
255 MIS 0 : void perform_io() noexcept override
256 : {
257 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
258 0 : if (n >= 0)
259 0 : this->complete(0, static_cast<std::size_t>(n));
260 : else
261 0 : this->complete(errno, 0);
262 0 : }
263 : };
264 :
265 : /** Shared accept operation.
266 :
267 : Delegates the actual syscall to
268 : AcceptPolicy::do_accept(fd, peer_storage, peer_addrlen),
269 : which returns the accepted fd or -1 with errno set and writes
270 : the real peer address length into peer_addrlen.
271 :
272 : @tparam Base The backend's base op type.
273 : @tparam AcceptPolicy Provides
274 : `static int do_accept(int, sockaddr_storage&, socklen_t&)`.
275 : */
276 : template<class Base, class AcceptPolicy>
277 : struct reactor_accept_op : Base
278 : {
279 : /// File descriptor of the accepted connection.
280 : int accepted_fd = -1;
281 :
282 : /// Pointer to the peer socket implementation.
283 : io_object::implementation* peer_impl = nullptr;
284 :
285 : /// Output pointer for the accepted implementation.
286 : io_object::implementation** impl_out = nullptr;
287 :
288 : /// Peer address storage filled by accept.
289 : sockaddr_storage peer_storage{};
290 :
291 : /// Actual peer address length returned by accept.
292 : socklen_t peer_addrlen = 0;
293 :
294 HIT 7995 : void reset() noexcept
295 : {
296 7995 : Base::reset();
297 7995 : accepted_fd = -1;
298 7995 : peer_impl = nullptr;
299 7995 : impl_out = nullptr;
300 7995 : peer_storage = {};
301 7995 : peer_addrlen = 0;
302 7995 : }
303 :
304 7977 : void perform_io() noexcept override
305 : {
306 : int new_fd =
307 7977 : AcceptPolicy::do_accept(this->fd, peer_storage, peer_addrlen);
308 7977 : if (new_fd >= 0)
309 : {
310 7977 : accepted_fd = new_fd;
311 7977 : this->complete(0, 0);
312 : }
313 : else
314 : {
315 MIS 0 : this->complete(errno, 0);
316 : }
317 HIT 7977 : }
318 : };
319 :
320 : /** Shared connected send operation for datagram sockets.
321 :
322 : Uses sendmsg() with msg_name=nullptr (connected mode).
323 :
324 : @tparam Base The backend's base op type.
325 : */
326 : template<class Base>
327 : struct reactor_send_op : Base
328 : {
329 : /// Maximum scatter-gather buffer count.
330 : static constexpr std::size_t max_buffers = 16;
331 :
332 : /// Scatter-gather I/O vectors.
333 : iovec iovecs[max_buffers];
334 :
335 : /// Number of active I/O vectors.
336 : int iovec_count = 0;
337 :
338 : /// User-supplied message flags.
339 : int msg_flags = 0;
340 :
341 18 : void reset() noexcept
342 : {
343 18 : Base::reset();
344 18 : iovec_count = 0;
345 18 : msg_flags = 0;
346 18 : }
347 :
348 MIS 0 : void perform_io() noexcept override
349 : {
350 0 : msghdr msg{};
351 0 : msg.msg_iov = iovecs;
352 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
353 :
354 : #ifdef MSG_NOSIGNAL
355 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
356 : #else
357 : int send_flags = msg_flags;
358 : #endif
359 :
360 : ssize_t n;
361 : do
362 : {
363 0 : n = ::sendmsg(this->fd, &msg, send_flags);
364 : }
365 0 : while (n < 0 && errno == EINTR);
366 :
367 0 : if (n >= 0)
368 0 : this->complete(0, static_cast<std::size_t>(n));
369 : else
370 0 : this->complete(errno, 0);
371 0 : }
372 : };
373 :
374 : /** Shared connected recv operation for datagram sockets.
375 :
376 : Uses recvmsg() with msg_name=nullptr (connected mode).
377 : Unlike reactor_read_op, does not map n==0 to EOF
378 : (zero-length datagrams are valid).
379 :
380 : @tparam Base The backend's base op type.
381 : */
382 : template<class Base>
383 : struct reactor_recv_op : Base
384 : {
385 : /// Maximum scatter-gather buffer count.
386 : static constexpr std::size_t max_buffers = 16;
387 :
388 : /// Scatter-gather I/O vectors.
389 : iovec iovecs[max_buffers];
390 :
391 : /// Number of active I/O vectors.
392 : int iovec_count = 0;
393 :
394 : /// User-supplied message flags.
395 : int msg_flags = 0;
396 :
397 : /// Return true (this is a read-direction operation).
398 0 : bool is_read_operation() const noexcept override
399 : {
400 0 : return true;
401 : }
402 :
403 HIT 18 : void reset() noexcept
404 : {
405 18 : Base::reset();
406 18 : iovec_count = 0;
407 18 : msg_flags = 0;
408 18 : }
409 :
410 MIS 0 : void perform_io() noexcept override
411 : {
412 0 : msghdr msg{};
413 0 : msg.msg_iov = iovecs;
414 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
415 :
416 : ssize_t n;
417 : do
418 : {
419 0 : n = ::recvmsg(this->fd, &msg, msg_flags);
420 : }
421 0 : while (n < 0 && errno == EINTR);
422 :
423 0 : if (n >= 0)
424 0 : this->complete(0, static_cast<std::size_t>(n));
425 : else
426 0 : this->complete(errno, 0);
427 0 : }
428 : };
429 :
430 : /** Shared send_to operation for datagram sockets.
431 :
432 : Uses sendmsg() with the destination endpoint in msg_name.
433 :
434 : @tparam Base The backend's base op type.
435 : */
436 : template<class Base>
437 : struct reactor_send_to_op : Base
438 : {
439 : /// Maximum scatter-gather buffer count.
440 : static constexpr std::size_t max_buffers = 16;
441 :
442 : /// Scatter-gather I/O vectors.
443 : iovec iovecs[max_buffers];
444 :
445 : /// Number of active I/O vectors.
446 : int iovec_count = 0;
447 :
448 : /// Destination address storage.
449 : sockaddr_storage dest_storage{};
450 :
451 : /// Destination address length.
452 : socklen_t dest_len = 0;
453 :
454 : /// User-supplied message flags.
455 : int msg_flags = 0;
456 :
457 HIT 28 : void reset() noexcept
458 : {
459 28 : Base::reset();
460 28 : iovec_count = 0;
461 28 : dest_storage = {};
462 28 : dest_len = 0;
463 28 : msg_flags = 0;
464 28 : }
465 :
466 MIS 0 : void perform_io() noexcept override
467 : {
468 0 : msghdr msg{};
469 0 : msg.msg_name = &dest_storage;
470 0 : msg.msg_namelen = dest_len;
471 0 : msg.msg_iov = iovecs;
472 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
473 :
474 : #ifdef MSG_NOSIGNAL
475 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
476 : #else
477 : int send_flags = msg_flags;
478 : #endif
479 :
480 : ssize_t n;
481 : do
482 : {
483 0 : n = ::sendmsg(this->fd, &msg, send_flags);
484 : }
485 0 : while (n < 0 && errno == EINTR);
486 :
487 0 : if (n >= 0)
488 0 : this->complete(0, static_cast<std::size_t>(n));
489 : else
490 0 : this->complete(errno, 0);
491 0 : }
492 : };
493 :
494 : /** Shared recv_from operation for datagram sockets.
495 :
496 : Uses recvmsg() with msg_name to capture the source endpoint.
497 :
498 : @tparam Base The backend's base op type.
499 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
500 : */
501 : template<class Base, class Endpoint = endpoint>
502 : struct reactor_recv_from_op : Base
503 : {
504 : /// Maximum scatter-gather buffer count.
505 : static constexpr std::size_t max_buffers = 16;
506 :
507 : /// Scatter-gather I/O vectors.
508 : iovec iovecs[max_buffers];
509 :
510 : /// Number of active I/O vectors.
511 : int iovec_count = 0;
512 :
513 : /// Source address storage filled by recvmsg.
514 : sockaddr_storage source_storage{};
515 :
516 : /// Actual source address length returned by recvmsg.
517 : socklen_t source_addrlen = 0;
518 :
519 : /// Output pointer for the source endpoint (set by do_recv_from).
520 : Endpoint* source_out = nullptr;
521 :
522 : /// User-supplied message flags.
523 : int msg_flags = 0;
524 :
525 : /// Return true (this is a read-direction operation).
526 0 : bool is_read_operation() const noexcept override
527 : {
528 0 : return true;
529 : }
530 :
531 HIT 40 : void reset() noexcept
532 : {
533 40 : Base::reset();
534 40 : iovec_count = 0;
535 40 : source_storage = {};
536 40 : source_addrlen = 0;
537 40 : source_out = nullptr;
538 40 : msg_flags = 0;
539 40 : }
540 :
541 2 : void perform_io() noexcept override
542 : {
543 2 : msghdr msg{};
544 2 : msg.msg_name = &source_storage;
545 2 : msg.msg_namelen = sizeof(source_storage);
546 2 : msg.msg_iov = iovecs;
547 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
548 :
549 : ssize_t n;
550 : do
551 : {
552 2 : n = ::recvmsg(this->fd, &msg, msg_flags);
553 : }
554 2 : while (n < 0 && errno == EINTR);
555 :
556 2 : if (n >= 0)
557 : {
558 2 : source_addrlen = msg.msg_namelen;
559 2 : this->complete(0, static_cast<std::size_t>(n));
560 : }
561 : else
562 MIS 0 : this->complete(errno, 0);
563 HIT 2 : }
564 : };
565 :
566 : } // namespace boost::corosio::detail
567 :
568 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|