TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : /// Stop-token callback that invokes cancel() on the target op.
54 : struct canceller
55 : {
56 : reactor_op* op;
57 HIT 200 : void operator()() const noexcept
58 : {
59 200 : op->cancel();
60 200 : }
61 : };
62 :
63 : /// Caller's coroutine handle to resume on completion.
64 : std::coroutine_handle<> h;
65 :
66 : /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 : detail::continuation_op cont_op;
68 :
69 : /// Executor for dispatching the completion.
70 : capy::executor_ref ex;
71 :
72 : /// Output pointer for the error code.
73 : std::error_code* ec_out = nullptr;
74 :
75 : /// Output pointer for bytes transferred.
76 : std::size_t* bytes_out = nullptr;
77 :
78 : /// File descriptor this operation targets.
79 : int fd = -1;
80 :
81 : /// Stop-token callback registration.
82 : std::optional<std::stop_callback<canceller>> stop_cb;
83 :
84 : /// Owning socket impl (for stop_token cancellation).
85 : Socket* socket_impl_ = nullptr;
86 :
87 : /// Owning acceptor impl (for stop_token cancellation).
88 : Acceptor* acceptor_impl_ = nullptr;
89 :
90 77794 : reactor_op() = default;
91 :
92 : /// Reset operation state for reuse.
93 423273 : void reset() noexcept
94 : {
95 423273 : fd = -1;
96 423273 : errn = 0;
97 423273 : bytes_transferred = 0;
98 423273 : cancelled.store(false, std::memory_order_relaxed);
99 423273 : impl_ptr.reset();
100 423273 : socket_impl_ = nullptr;
101 423273 : acceptor_impl_ = nullptr;
102 423273 : }
103 :
104 : /// Return true if this is a read-direction operation.
105 40590 : virtual bool is_read_operation() const noexcept
106 : {
107 40590 : return false;
108 : }
109 :
110 : /// Cancel this operation via the owning impl.
111 : virtual void cancel() noexcept = 0;
112 :
113 : /// Destroy without invoking.
114 MIS 0 : void destroy() override
115 : {
116 0 : stop_cb.reset();
117 0 : reactor_op_base::destroy();
118 0 : }
119 :
120 : /// Arm the stop-token callback for a socket operation.
121 HIT 90073 : void start(std::stop_token const& token, Socket* impl)
122 : {
123 90073 : cancelled.store(false, std::memory_order_release);
124 90073 : stop_cb.reset();
125 90073 : socket_impl_ = impl;
126 90073 : acceptor_impl_ = nullptr;
127 :
128 90073 : if (token.stop_possible())
129 198 : stop_cb.emplace(token, canceller{this});
130 90073 : }
131 :
132 : /// Arm the stop-token callback for an acceptor operation.
133 8530 : void start(std::stop_token const& token, Acceptor* impl)
134 : {
135 8530 : cancelled.store(false, std::memory_order_release);
136 8530 : stop_cb.reset();
137 8530 : socket_impl_ = nullptr;
138 8530 : acceptor_impl_ = impl;
139 :
140 8530 : if (token.stop_possible())
141 9 : stop_cb.emplace(token, canceller{this});
142 8530 : }
143 : };
144 :
145 : /** Shared connect operation.
146 :
147 : Checks SO_ERROR for connect completion status. The operator()()
148 : and cancel() are provided by the concrete backend type.
149 :
150 : @tparam Base The backend's base op type.
151 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
152 : */
153 : template<class Base, class Endpoint = endpoint>
154 : struct reactor_connect_op : Base
155 : {
156 : /// Endpoint to connect to.
157 : Endpoint target_endpoint;
158 :
159 : /// Reset operation state for reuse.
160 8532 : void reset() noexcept
161 : {
162 8532 : Base::reset();
163 8532 : target_endpoint = Endpoint{};
164 8532 : }
165 :
166 8517 : void perform_io() noexcept override
167 : {
168 8517 : int err = 0;
169 8517 : socklen_t len = sizeof(err);
170 8517 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
171 MIS 0 : err = errno;
172 HIT 8517 : this->complete(err, 0);
173 8517 : }
174 : };
175 :
176 : /** Shared scatter-read operation.
177 :
178 : Uses readv() with an EINTR retry loop.
179 :
180 : @tparam Base The backend's base op type.
181 : */
182 : template<class Base>
183 : struct reactor_read_op : Base
184 : {
185 : /// Maximum scatter-gather buffer count.
186 : static constexpr std::size_t max_buffers = 16;
187 :
188 : /// Scatter-gather I/O vectors.
189 : iovec iovecs[max_buffers];
190 :
191 : /// Number of active I/O vectors.
192 : int iovec_count = 0;
193 :
194 : /// True for zero-length reads (completed immediately).
195 : bool empty_buffer_read = false;
196 :
197 : /// Return true (this is a read-direction operation).
198 40620 : bool is_read_operation() const noexcept override
199 : {
200 40620 : return !empty_buffer_read;
201 : }
202 :
203 203205 : void reset() noexcept
204 : {
205 203205 : Base::reset();
206 203205 : iovec_count = 0;
207 203205 : empty_buffer_read = false;
208 203205 : }
209 :
210 325 : void perform_io() noexcept override
211 : {
212 : ssize_t n;
213 : do
214 : {
215 325 : n = ::readv(this->fd, iovecs, iovec_count);
216 : }
217 325 : while (n < 0 && errno == EINTR);
218 :
219 325 : if (n >= 0)
220 96 : this->complete(0, static_cast<std::size_t>(n));
221 : else
222 229 : this->complete(errno, 0);
223 325 : }
224 : };
225 :
226 : /** Shared gather-write operation.
227 :
228 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
229 : which returns ssize_t (bytes written or -1 with errno set).
230 :
231 : @tparam Base The backend's base op type.
232 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
233 : */
234 : template<class Base, class WritePolicy>
235 : struct reactor_write_op : Base
236 : {
237 : /// The write syscall policy type.
238 : using write_policy = WritePolicy;
239 :
240 : /// Maximum scatter-gather buffer count.
241 : static constexpr std::size_t max_buffers = 16;
242 :
243 : /// Scatter-gather I/O vectors.
244 : iovec iovecs[max_buffers];
245 :
246 : /// Number of active I/O vectors.
247 : int iovec_count = 0;
248 :
249 202910 : void reset() noexcept
250 : {
251 202910 : Base::reset();
252 202910 : iovec_count = 0;
253 202910 : }
254 :
255 MIS 0 : void perform_io() noexcept override
256 : {
257 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
258 0 : if (n >= 0)
259 0 : this->complete(0, static_cast<std::size_t>(n));
260 : else
261 0 : this->complete(errno, 0);
262 0 : }
263 : };
264 :
265 : /** Shared accept operation.
266 :
267 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
268 : which returns the accepted fd or -1 with errno set.
269 :
270 : @tparam Base The backend's base op type.
271 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
272 : */
273 : template<class Base, class AcceptPolicy>
274 : struct reactor_accept_op : Base
275 : {
276 : /// File descriptor of the accepted connection.
277 : int accepted_fd = -1;
278 :
279 : /// Pointer to the peer socket implementation.
280 : io_object::implementation* peer_impl = nullptr;
281 :
282 : /// Output pointer for the accepted implementation.
283 : io_object::implementation** impl_out = nullptr;
284 :
285 : /// Peer address storage filled by accept.
286 : sockaddr_storage peer_storage{};
287 :
288 HIT 8530 : void reset() noexcept
289 : {
290 8530 : Base::reset();
291 8530 : accepted_fd = -1;
292 8530 : peer_impl = nullptr;
293 8530 : impl_out = nullptr;
294 8530 : peer_storage = {};
295 8530 : }
296 :
297 8512 : void perform_io() noexcept override
298 : {
299 8512 : int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
300 8512 : if (new_fd >= 0)
301 : {
302 8512 : accepted_fd = new_fd;
303 8512 : this->complete(0, 0);
304 : }
305 : else
306 : {
307 MIS 0 : this->complete(errno, 0);
308 : }
309 HIT 8512 : }
310 : };
311 :
312 : /** Shared connected send operation for datagram sockets.
313 :
314 : Uses sendmsg() with msg_name=nullptr (connected mode).
315 :
316 : @tparam Base The backend's base op type.
317 : */
318 : template<class Base>
319 : struct reactor_send_op : Base
320 : {
321 : /// Maximum scatter-gather buffer count.
322 : static constexpr std::size_t max_buffers = 16;
323 :
324 : /// Scatter-gather I/O vectors.
325 : iovec iovecs[max_buffers];
326 :
327 : /// Number of active I/O vectors.
328 : int iovec_count = 0;
329 :
330 : /// User-supplied message flags.
331 : int msg_flags = 0;
332 :
333 14 : void reset() noexcept
334 : {
335 14 : Base::reset();
336 14 : iovec_count = 0;
337 14 : msg_flags = 0;
338 14 : }
339 :
340 MIS 0 : void perform_io() noexcept override
341 : {
342 0 : msghdr msg{};
343 0 : msg.msg_iov = iovecs;
344 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
345 :
346 : #ifdef MSG_NOSIGNAL
347 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
348 : #else
349 : int send_flags = msg_flags;
350 : #endif
351 :
352 : ssize_t n;
353 : do
354 : {
355 0 : n = ::sendmsg(this->fd, &msg, send_flags);
356 : }
357 0 : while (n < 0 && errno == EINTR);
358 :
359 0 : if (n >= 0)
360 0 : this->complete(0, static_cast<std::size_t>(n));
361 : else
362 0 : this->complete(errno, 0);
363 0 : }
364 : };
365 :
366 : /** Shared connected recv operation for datagram sockets.
367 :
368 : Uses recvmsg() with msg_name=nullptr (connected mode).
369 : Unlike reactor_read_op, does not map n==0 to EOF
370 : (zero-length datagrams are valid).
371 :
372 : @tparam Base The backend's base op type.
373 : */
374 : template<class Base>
375 : struct reactor_recv_op : Base
376 : {
377 : /// Maximum scatter-gather buffer count.
378 : static constexpr std::size_t max_buffers = 16;
379 :
380 : /// Scatter-gather I/O vectors.
381 : iovec iovecs[max_buffers];
382 :
383 : /// Number of active I/O vectors.
384 : int iovec_count = 0;
385 :
386 : /// User-supplied message flags.
387 : int msg_flags = 0;
388 :
389 : /// Return true (this is a read-direction operation).
390 HIT 4 : bool is_read_operation() const noexcept override
391 : {
392 4 : return true;
393 : }
394 :
395 14 : void reset() noexcept
396 : {
397 14 : Base::reset();
398 14 : iovec_count = 0;
399 14 : msg_flags = 0;
400 14 : }
401 :
402 MIS 0 : void perform_io() noexcept override
403 : {
404 0 : msghdr msg{};
405 0 : msg.msg_iov = iovecs;
406 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
407 :
408 : ssize_t n;
409 : do
410 : {
411 0 : n = ::recvmsg(this->fd, &msg, msg_flags);
412 : }
413 0 : while (n < 0 && errno == EINTR);
414 :
415 0 : if (n >= 0)
416 0 : this->complete(0, static_cast<std::size_t>(n));
417 : else
418 0 : this->complete(errno, 0);
419 0 : }
420 : };
421 :
422 : /** Shared send_to operation for datagram sockets.
423 :
424 : Uses sendmsg() with the destination endpoint in msg_name.
425 :
426 : @tparam Base The backend's base op type.
427 : */
428 : template<class Base>
429 : struct reactor_send_to_op : Base
430 : {
431 : /// Maximum scatter-gather buffer count.
432 : static constexpr std::size_t max_buffers = 16;
433 :
434 : /// Scatter-gather I/O vectors.
435 : iovec iovecs[max_buffers];
436 :
437 : /// Number of active I/O vectors.
438 : int iovec_count = 0;
439 :
440 : /// Destination address storage.
441 : sockaddr_storage dest_storage{};
442 :
443 : /// Destination address length.
444 : socklen_t dest_len = 0;
445 :
446 : /// User-supplied message flags.
447 : int msg_flags = 0;
448 :
449 HIT 28 : void reset() noexcept
450 : {
451 28 : Base::reset();
452 28 : iovec_count = 0;
453 28 : dest_storage = {};
454 28 : dest_len = 0;
455 28 : msg_flags = 0;
456 28 : }
457 :
458 MIS 0 : void perform_io() noexcept override
459 : {
460 0 : msghdr msg{};
461 0 : msg.msg_name = &dest_storage;
462 0 : msg.msg_namelen = dest_len;
463 0 : msg.msg_iov = iovecs;
464 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
465 :
466 : #ifdef MSG_NOSIGNAL
467 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
468 : #else
469 : int send_flags = msg_flags;
470 : #endif
471 :
472 : ssize_t n;
473 : do
474 : {
475 0 : n = ::sendmsg(this->fd, &msg, send_flags);
476 : }
477 0 : while (n < 0 && errno == EINTR);
478 :
479 0 : if (n >= 0)
480 0 : this->complete(0, static_cast<std::size_t>(n));
481 : else
482 0 : this->complete(errno, 0);
483 0 : }
484 : };
485 :
486 : /** Shared recv_from operation for datagram sockets.
487 :
488 : Uses recvmsg() with msg_name to capture the source endpoint.
489 :
490 : @tparam Base The backend's base op type.
491 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
492 : */
493 : template<class Base, class Endpoint = endpoint>
494 : struct reactor_recv_from_op : Base
495 : {
496 : /// Maximum scatter-gather buffer count.
497 : static constexpr std::size_t max_buffers = 16;
498 :
499 : /// Scatter-gather I/O vectors.
500 : iovec iovecs[max_buffers];
501 :
502 : /// Number of active I/O vectors.
503 : int iovec_count = 0;
504 :
505 : /// Source address storage filled by recvmsg.
506 : sockaddr_storage source_storage{};
507 :
508 : /// Actual source address length returned by recvmsg.
509 : socklen_t source_addrlen = 0;
510 :
511 : /// Output pointer for the source endpoint (set by do_recv_from).
512 : Endpoint* source_out = nullptr;
513 :
514 : /// User-supplied message flags.
515 : int msg_flags = 0;
516 :
517 : /// Return true (this is a read-direction operation).
518 0 : bool is_read_operation() const noexcept override
519 : {
520 0 : return true;
521 : }
522 :
523 HIT 40 : void reset() noexcept
524 : {
525 40 : Base::reset();
526 40 : iovec_count = 0;
527 40 : source_storage = {};
528 40 : source_addrlen = 0;
529 40 : source_out = nullptr;
530 40 : msg_flags = 0;
531 40 : }
532 :
533 2 : void perform_io() noexcept override
534 : {
535 2 : msghdr msg{};
536 2 : msg.msg_name = &source_storage;
537 2 : msg.msg_namelen = sizeof(source_storage);
538 2 : msg.msg_iov = iovecs;
539 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
540 :
541 : ssize_t n;
542 : do
543 : {
544 2 : n = ::recvmsg(this->fd, &msg, msg_flags);
545 : }
546 2 : while (n < 0 && errno == EINTR);
547 :
548 2 : if (n >= 0)
549 : {
550 2 : source_addrlen = msg.msg_namelen;
551 2 : this->complete(0, static_cast<std::size_t>(n));
552 : }
553 : else
554 MIS 0 : this->complete(errno, 0);
555 HIT 2 : }
556 : };
557 :
558 : } // namespace boost::corosio::detail
559 :
560 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|