TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
15 : #include <boost/corosio/detail/dispatch_coro.hpp>
16 : #include <boost/capy/buffers.hpp>
17 :
18 : #include <coroutine>
19 :
20 : #include <errno.h>
21 : #include <sys/socket.h>
22 : #include <sys/uio.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /** CRTP base for reactor-backed stream socket implementations.
27 :
28 : Inherits shared data members and cancel/close/register logic
29 : from reactor_basic_socket. Adds the stream-specific remote
30 : endpoint, shutdown, and I/O dispatch (connect, read, write).
31 :
32 : @tparam Derived The concrete socket type (CRTP).
33 : @tparam Service The backend's socket service type.
34 : @tparam ConnOp The backend's connect op type.
35 : @tparam ReadOp The backend's read op type.
36 : @tparam WriteOp The backend's write op type.
37 : @tparam DescState The backend's descriptor_state type.
38 : @tparam ImplBase The public vtable base
39 : (tcp_socket::implementation or
40 : local_stream_socket::implementation).
41 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
42 : */
43 : template<
44 : class Derived,
45 : class Service,
46 : class ConnOp,
47 : class ReadOp,
48 : class WriteOp,
49 : class DescState,
50 : class ImplBase = tcp_socket::implementation,
51 : class Endpoint = endpoint>
52 : class reactor_stream_socket
53 : : public reactor_basic_socket<
54 : Derived,
55 : ImplBase,
56 : Service,
57 : DescState,
58 : Endpoint>
59 : {
60 : using base_type = reactor_basic_socket<
61 : Derived,
62 : ImplBase,
63 : Service,
64 : DescState,
65 : Endpoint>;
66 : friend base_type;
67 : friend Derived;
68 :
69 HIT 25677 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
70 :
71 : protected:
72 : Endpoint remote_endpoint_;
73 :
74 : public:
75 : /// Pending connect operation slot.
76 : ConnOp conn_;
77 :
78 : /// Pending read operation slot.
79 : ReadOp rd_;
80 :
81 : /// Pending write operation slot.
82 : WriteOp wr_;
83 :
84 25677 : ~reactor_stream_socket() override = default;
85 :
86 : /// Return the cached remote endpoint.
87 44 : Endpoint remote_endpoint() const noexcept override
88 : {
89 44 : return remote_endpoint_;
90 : }
91 :
92 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
93 :
94 8522 : std::coroutine_handle<> connect(
95 : std::coroutine_handle<> h,
96 : capy::executor_ref ex,
97 : Endpoint ep,
98 : std::stop_token token,
99 : std::error_code* ec) override
100 : {
101 8522 : return do_connect(h, ex, ep, token, ec);
102 : }
103 :
104 203205 : std::coroutine_handle<> read_some(
105 : std::coroutine_handle<> h,
106 : capy::executor_ref ex,
107 : buffer_param param,
108 : std::stop_token token,
109 : std::error_code* ec,
110 : std::size_t* bytes_out) override
111 : {
112 203205 : return do_read_some(h, ex, param, token, ec, bytes_out);
113 : }
114 :
115 202910 : std::coroutine_handle<> write_some(
116 : std::coroutine_handle<> h,
117 : capy::executor_ref ex,
118 : buffer_param param,
119 : std::stop_token token,
120 : std::error_code* ec,
121 : std::size_t* bytes_out) override
122 : {
123 202910 : return do_write_some(h, ex, param, token, ec, bytes_out);
124 : }
125 :
126 : std::error_code
127 6 : shutdown(corosio::shutdown_type what) noexcept override
128 : {
129 6 : return do_shutdown(static_cast<int>(what));
130 : }
131 :
132 184 : void cancel() noexcept override
133 : {
134 184 : this->do_cancel();
135 184 : }
136 :
137 : // --- End virtual overrides ---
138 :
139 : /// Close the socket (non-virtual, called by the service).
140 77034 : void close_socket() noexcept
141 : {
142 77034 : this->do_close_socket();
143 77034 : }
144 :
145 : /** Shut down part or all of the full-duplex connection.
146 :
147 : @param what 0 = receive, 1 = send, 2 = both.
148 : */
149 6 : std::error_code do_shutdown(int what) noexcept
150 : {
151 : int how;
152 6 : switch (what)
153 : {
154 2 : case 0: // shutdown_receive
155 2 : how = SHUT_RD;
156 2 : break;
157 2 : case 1: // shutdown_send
158 2 : how = SHUT_WR;
159 2 : break;
160 2 : case 2: // shutdown_both
161 2 : how = SHUT_RDWR;
162 2 : break;
163 MIS 0 : default:
164 0 : return make_err(EINVAL);
165 : }
166 HIT 6 : if (::shutdown(this->fd_, how) != 0)
167 MIS 0 : return make_err(errno);
168 HIT 6 : return {};
169 : }
170 :
171 : /// Cache local and remote endpoints.
172 17036 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
173 : {
174 17036 : this->local_endpoint_ = std::move(local);
175 17036 : remote_endpoint_ = std::move(remote);
176 17036 : }
177 :
178 : /** Shared connect dispatch.
179 :
180 : Tries the connect syscall speculatively. On synchronous
181 : completion, returns via inline budget or posts through queue.
182 : On EINPROGRESS, registers with the reactor.
183 : */
184 : std::coroutine_handle<> do_connect(
185 : std::coroutine_handle<>,
186 : capy::executor_ref,
187 : Endpoint const&,
188 : std::stop_token const&,
189 : std::error_code*);
190 :
191 : /** Shared scatter-read dispatch.
192 :
193 : Tries readv() speculatively. On success or hard error,
194 : returns via inline budget or posts through queue.
195 : On EAGAIN, registers with the reactor.
196 : */
197 : std::coroutine_handle<> do_read_some(
198 : std::coroutine_handle<>,
199 : capy::executor_ref,
200 : buffer_param,
201 : std::stop_token const&,
202 : std::error_code*,
203 : std::size_t*);
204 :
205 : /** Shared gather-write dispatch.
206 :
207 : Tries the write via WriteOp::write_policy speculatively.
208 : On success or hard error, returns via inline budget or
209 : posts through queue. On EAGAIN, registers with the reactor.
210 : */
211 : std::coroutine_handle<> do_write_some(
212 : std::coroutine_handle<>,
213 : capy::executor_ref,
214 : buffer_param,
215 : std::stop_token const&,
216 : std::error_code*,
217 : std::size_t*);
218 :
219 : /** Close the socket and cancel pending operations.
220 :
221 : Extends the base do_close_socket() to also reset
222 : the remote endpoint.
223 : */
224 77034 : void do_close_socket() noexcept
225 : {
226 77034 : base_type::do_close_socket();
227 77034 : remote_endpoint_ = Endpoint{};
228 77034 : }
229 :
230 : private:
231 : // CRTP callbacks for reactor_basic_socket cancel/close
232 :
233 : template<class Op>
234 192 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
235 : {
236 192 : if (&op == static_cast<void*>(&conn_))
237 MIS 0 : return &this->desc_state_.connect_op;
238 HIT 192 : if (&op == static_cast<void*>(&rd_))
239 192 : return &this->desc_state_.read_op;
240 MIS 0 : if (&op == static_cast<void*>(&wr_))
241 0 : return &this->desc_state_.write_op;
242 0 : return nullptr;
243 : }
244 :
245 : template<class Op>
246 0 : bool* op_to_cancel_flag(Op& op) noexcept
247 : {
248 0 : if (&op == static_cast<void*>(&conn_))
249 0 : return &this->desc_state_.connect_cancel_pending;
250 0 : if (&op == static_cast<void*>(&rd_))
251 0 : return &this->desc_state_.read_cancel_pending;
252 0 : if (&op == static_cast<void*>(&wr_))
253 0 : return &this->desc_state_.write_cancel_pending;
254 0 : return nullptr;
255 : }
256 :
257 : template<class Fn>
258 HIT 77220 : void for_each_op(Fn fn) noexcept
259 : {
260 77220 : fn(conn_);
261 77220 : fn(rd_);
262 77220 : fn(wr_);
263 77220 : }
264 :
265 : template<class Fn>
266 77220 : void for_each_desc_entry(Fn fn) noexcept
267 : {
268 77220 : fn(conn_, this->desc_state_.connect_op);
269 77220 : fn(rd_, this->desc_state_.read_op);
270 77220 : fn(wr_, this->desc_state_.write_op);
271 77220 : }
272 : };
273 :
274 : template<
275 : class Derived,
276 : class Service,
277 : class ConnOp,
278 : class ReadOp,
279 : class WriteOp,
280 : class DescState,
281 : class ImplBase,
282 : class Endpoint>
283 : std::coroutine_handle<>
284 8522 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
285 : do_connect(
286 : std::coroutine_handle<> h,
287 : capy::executor_ref ex,
288 : Endpoint const& ep,
289 : std::stop_token const& token,
290 : std::error_code* ec)
291 : {
292 8522 : auto& op = conn_;
293 :
294 8522 : sockaddr_storage storage{};
295 8522 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
296 : int result =
297 8522 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
298 :
299 8522 : if (result == 0)
300 : {
301 4 : sockaddr_storage local_storage{};
302 4 : socklen_t local_len = sizeof(local_storage);
303 4 : if (::getsockname(
304 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
305 4 : &local_len) == 0)
306 MIS 0 : this->local_endpoint_ =
307 HIT 4 : from_sockaddr_as(local_storage, local_len, Endpoint{});
308 4 : remote_endpoint_ = ep;
309 : }
310 :
311 8522 : if (result == 0 || errno != EINPROGRESS)
312 : {
313 4 : int err = (result < 0) ? errno : 0;
314 4 : if (this->svc_.scheduler().try_consume_inline_budget())
315 : {
316 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
317 0 : op.cont_op.cont.h = h;
318 0 : return dispatch_coro(ex, op.cont_op.cont);
319 : }
320 HIT 4 : op.reset();
321 4 : op.h = h;
322 4 : op.ex = ex;
323 4 : op.ec_out = ec;
324 4 : op.fd = this->fd_;
325 4 : op.target_endpoint = ep;
326 4 : op.start(token, static_cast<Derived*>(this));
327 4 : op.impl_ptr = this->shared_from_this();
328 4 : op.complete(err, 0);
329 4 : this->svc_.post(&op);
330 4 : return std::noop_coroutine();
331 : }
332 :
333 : // EINPROGRESS — register with reactor
334 8518 : op.reset();
335 8518 : op.h = h;
336 8518 : op.ex = ex;
337 8518 : op.ec_out = ec;
338 8518 : op.fd = this->fd_;
339 8518 : op.target_endpoint = ep;
340 8518 : op.start(token, static_cast<Derived*>(this));
341 8518 : op.impl_ptr = this->shared_from_this();
342 :
343 8518 : this->register_op(
344 8518 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
345 8518 : this->desc_state_.connect_cancel_pending, true);
346 8518 : return std::noop_coroutine();
347 : }
348 :
349 : template<
350 : class Derived,
351 : class Service,
352 : class ConnOp,
353 : class ReadOp,
354 : class WriteOp,
355 : class DescState,
356 : class ImplBase,
357 : class Endpoint>
358 : std::coroutine_handle<>
359 203205 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
360 : do_read_some(
361 : std::coroutine_handle<> h,
362 : capy::executor_ref ex,
363 : buffer_param param,
364 : std::stop_token const& token,
365 : std::error_code* ec,
366 : std::size_t* bytes_out)
367 : {
368 203205 : auto& op = rd_;
369 203205 : op.reset();
370 :
371 203205 : capy::mutable_buffer bufs[ReadOp::max_buffers];
372 203205 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
373 :
374 203205 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 : {
376 2 : op.empty_buffer_read = true;
377 2 : op.h = h;
378 2 : op.ex = ex;
379 2 : op.ec_out = ec;
380 2 : op.bytes_out = bytes_out;
381 2 : op.start(token, static_cast<Derived*>(this));
382 2 : op.impl_ptr = this->shared_from_this();
383 2 : op.complete(0, 0);
384 2 : this->svc_.post(&op);
385 2 : return std::noop_coroutine();
386 : }
387 :
388 406406 : for (int i = 0; i < op.iovec_count; ++i)
389 : {
390 203203 : op.iovecs[i].iov_base = bufs[i].data();
391 203203 : op.iovecs[i].iov_len = bufs[i].size();
392 : }
393 :
394 : // Speculative read
395 : ssize_t n;
396 : do
397 : {
398 203203 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
399 : }
400 203203 : while (n < 0 && errno == EINTR);
401 :
402 203203 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 : {
404 202816 : int err = (n < 0) ? errno : 0;
405 202816 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406 :
407 202816 : if (this->svc_.scheduler().try_consume_inline_budget())
408 : {
409 162286 : if (err)
410 MIS 0 : *ec = make_err(err);
411 HIT 162286 : else if (n == 0)
412 10 : *ec = capy::error::eof;
413 : else
414 162276 : *ec = {};
415 162286 : *bytes_out = bytes;
416 162286 : op.cont_op.cont.h = h;
417 162286 : return dispatch_coro(ex, op.cont_op.cont);
418 : }
419 40530 : op.h = h;
420 40530 : op.ex = ex;
421 40530 : op.ec_out = ec;
422 40530 : op.bytes_out = bytes_out;
423 40530 : op.start(token, static_cast<Derived*>(this));
424 40530 : op.impl_ptr = this->shared_from_this();
425 40530 : op.complete(err, bytes);
426 40530 : this->svc_.post(&op);
427 40530 : return std::noop_coroutine();
428 : }
429 :
430 : // EAGAIN — register with reactor
431 387 : op.h = h;
432 387 : op.ex = ex;
433 387 : op.ec_out = ec;
434 387 : op.bytes_out = bytes_out;
435 387 : op.fd = this->fd_;
436 387 : op.start(token, static_cast<Derived*>(this));
437 387 : op.impl_ptr = this->shared_from_this();
438 :
439 387 : this->register_op(
440 387 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
441 387 : this->desc_state_.read_cancel_pending);
442 387 : return std::noop_coroutine();
443 : }
444 :
445 : template<
446 : class Derived,
447 : class Service,
448 : class ConnOp,
449 : class ReadOp,
450 : class WriteOp,
451 : class DescState,
452 : class ImplBase,
453 : class Endpoint>
454 : std::coroutine_handle<>
455 202910 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
456 : do_write_some(
457 : std::coroutine_handle<> h,
458 : capy::executor_ref ex,
459 : buffer_param param,
460 : std::stop_token const& token,
461 : std::error_code* ec,
462 : std::size_t* bytes_out)
463 : {
464 202910 : auto& op = wr_;
465 202910 : op.reset();
466 :
467 202910 : capy::mutable_buffer bufs[WriteOp::max_buffers];
468 202910 : op.iovec_count =
469 202910 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
470 :
471 202910 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
472 : {
473 2 : op.h = h;
474 2 : op.ex = ex;
475 2 : op.ec_out = ec;
476 2 : op.bytes_out = bytes_out;
477 2 : op.start(token, static_cast<Derived*>(this));
478 2 : op.impl_ptr = this->shared_from_this();
479 2 : op.complete(0, 0);
480 2 : this->svc_.post(&op);
481 2 : return std::noop_coroutine();
482 : }
483 :
484 405816 : for (int i = 0; i < op.iovec_count; ++i)
485 : {
486 202908 : op.iovecs[i].iov_base = bufs[i].data();
487 202908 : op.iovecs[i].iov_len = bufs[i].size();
488 : }
489 :
490 : // Speculative write via backend-specific write policy
491 : ssize_t n =
492 202908 : WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
493 :
494 202908 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
495 : {
496 202908 : int err = (n < 0) ? errno : 0;
497 202908 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
498 :
499 202908 : if (this->svc_.scheduler().try_consume_inline_budget())
500 : {
501 162340 : *ec = err ? make_err(err) : std::error_code{};
502 162340 : *bytes_out = bytes;
503 162340 : op.cont_op.cont.h = h;
504 162340 : return dispatch_coro(ex, op.cont_op.cont);
505 : }
506 40568 : op.h = h;
507 40568 : op.ex = ex;
508 40568 : op.ec_out = ec;
509 40568 : op.bytes_out = bytes_out;
510 40568 : op.start(token, static_cast<Derived*>(this));
511 40568 : op.impl_ptr = this->shared_from_this();
512 40568 : op.complete(err, bytes);
513 40568 : this->svc_.post(&op);
514 40568 : return std::noop_coroutine();
515 : }
516 :
517 : // EAGAIN — register with reactor
518 MIS 0 : op.h = h;
519 0 : op.ex = ex;
520 0 : op.ec_out = ec;
521 0 : op.bytes_out = bytes_out;
522 0 : op.fd = this->fd_;
523 0 : op.start(token, static_cast<Derived*>(this));
524 0 : op.impl_ptr = this->shared_from_this();
525 :
526 0 : this->register_op(
527 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
528 0 : this->desc_state_.write_cancel_pending, true);
529 0 : return std::noop_coroutine();
530 : }
531 :
532 : } // namespace boost::corosio::detail
533 :
534 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|