TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
15 : #include <boost/corosio/detail/dispatch_coro.hpp>
16 : #include <boost/capy/buffers.hpp>
17 :
18 : #include <coroutine>
19 :
20 : #include <errno.h>
21 : #include <sys/socket.h>
22 : #include <sys/uio.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /** CRTP base for reactor-backed stream socket implementations.
27 :
28 : Inherits shared data members and cancel/close/register logic
29 : from reactor_basic_socket. Adds the stream-specific remote
30 : endpoint, shutdown, and I/O dispatch (connect, read, write).
31 :
32 : @tparam Derived The concrete socket type (CRTP).
33 : @tparam Service The backend's socket service type.
34 : @tparam ConnOp The backend's connect op type.
35 : @tparam ReadOp The backend's read op type.
36 : @tparam WriteOp The backend's write op type.
37 : @tparam DescState The backend's descriptor_state type.
38 : @tparam ImplBase The public vtable base
39 : (tcp_socket::implementation or
40 : local_stream_socket::implementation).
41 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
42 : */
43 : template<
44 : class Derived,
45 : class Service,
46 : class ConnOp,
47 : class ReadOp,
48 : class WriteOp,
49 : class DescState,
50 : class ImplBase = tcp_socket::implementation,
51 : class Endpoint = endpoint>
52 : class reactor_stream_socket
53 : : public reactor_basic_socket<
54 : Derived,
55 : ImplBase,
56 : Service,
57 : DescState,
58 : Endpoint>
59 : {
60 : using base_type = reactor_basic_socket<
61 : Derived,
62 : ImplBase,
63 : Service,
64 : DescState,
65 : Endpoint>;
66 : friend base_type;
67 : friend Derived;
68 :
69 HIT 24072 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
70 :
71 : protected:
72 : Endpoint remote_endpoint_;
73 :
74 : public:
75 : /// Pending connect operation slot.
76 : ConnOp conn_;
77 :
78 : /// Pending read operation slot.
79 : ReadOp rd_;
80 :
81 : /// Pending write operation slot.
82 : WriteOp wr_;
83 :
84 24072 : ~reactor_stream_socket() override = default;
85 :
86 : /// Return the cached remote endpoint.
87 44 : Endpoint remote_endpoint() const noexcept override
88 : {
89 44 : return remote_endpoint_;
90 : }
91 :
92 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
93 :
94 7987 : std::coroutine_handle<> connect(
95 : std::coroutine_handle<> h,
96 : capy::executor_ref ex,
97 : Endpoint ep,
98 : std::stop_token token,
99 : std::error_code* ec) override
100 : {
101 7987 : return do_connect(h, ex, ep, token, ec);
102 : }
103 :
104 197286 : std::coroutine_handle<> read_some(
105 : std::coroutine_handle<> h,
106 : capy::executor_ref ex,
107 : buffer_param param,
108 : std::stop_token token,
109 : std::error_code* ec,
110 : std::size_t* bytes_out) override
111 : {
112 197286 : return do_read_some(h, ex, param, token, ec, bytes_out);
113 : }
114 :
115 196992 : std::coroutine_handle<> write_some(
116 : std::coroutine_handle<> h,
117 : capy::executor_ref ex,
118 : buffer_param param,
119 : std::stop_token token,
120 : std::error_code* ec,
121 : std::size_t* bytes_out) override
122 : {
123 196992 : return do_write_some(h, ex, param, token, ec, bytes_out);
124 : }
125 :
126 : std::error_code
127 6 : shutdown(corosio::shutdown_type what) noexcept override
128 : {
129 6 : return do_shutdown(static_cast<int>(what));
130 : }
131 :
132 184 : void cancel() noexcept override
133 : {
134 184 : this->do_cancel();
135 184 : }
136 :
137 : // --- End virtual overrides ---
138 :
139 : /// Close the socket (non-virtual, called by the service).
140 : void close_socket() noexcept
141 : {
142 : this->do_close_socket();
143 : }
144 :
145 : /** Shut down part or all of the full-duplex connection.
146 :
147 : @param what 0 = receive, 1 = send, 2 = both.
148 : */
149 6 : std::error_code do_shutdown(int what) noexcept
150 : {
151 : int how;
152 6 : switch (what)
153 : {
154 2 : case 0: // shutdown_receive
155 2 : how = SHUT_RD;
156 2 : break;
157 2 : case 1: // shutdown_send
158 2 : how = SHUT_WR;
159 2 : break;
160 2 : case 2: // shutdown_both
161 2 : how = SHUT_RDWR;
162 2 : break;
163 MIS 0 : default:
164 0 : return make_err(EINVAL);
165 : }
166 HIT 6 : if (::shutdown(this->fd_, how) != 0)
167 MIS 0 : return make_err(errno);
168 HIT 6 : return {};
169 : }
170 :
171 : /// Cache local and remote endpoints.
172 15982 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
173 : {
174 15982 : this->local_endpoint_ = std::move(local);
175 15982 : remote_endpoint_ = std::move(remote);
176 15982 : }
177 :
178 : /** Shared connect dispatch.
179 :
180 : Tries the connect syscall speculatively. On synchronous
181 : completion, returns via inline budget or posts through queue.
182 : On EINPROGRESS, registers with the reactor.
183 : */
184 : std::coroutine_handle<> do_connect(
185 : std::coroutine_handle<>,
186 : capy::executor_ref,
187 : Endpoint const&,
188 : std::stop_token const&,
189 : std::error_code*);
190 :
191 : /** Shared scatter-read dispatch.
192 :
193 : Tries readv() speculatively. On success or hard error,
194 : returns via inline budget or posts through queue.
195 : On EAGAIN, registers with the reactor.
196 : */
197 : std::coroutine_handle<> do_read_some(
198 : std::coroutine_handle<>,
199 : capy::executor_ref,
200 : buffer_param,
201 : std::stop_token const&,
202 : std::error_code*,
203 : std::size_t*);
204 :
205 : /** Shared gather-write dispatch.
206 :
207 : Tries the write via WriteOp::write_policy speculatively.
208 : On success or hard error, returns via inline budget or
209 : posts through queue. On EAGAIN, registers with the reactor.
210 : */
211 : std::coroutine_handle<> do_write_some(
212 : std::coroutine_handle<>,
213 : capy::executor_ref,
214 : buffer_param,
215 : std::stop_token const&,
216 : std::error_code*,
217 : std::size_t*);
218 :
219 : /** Close the socket and cancel pending operations.
220 :
221 : Extends the base do_close_socket() to also reset
222 : the remote endpoint.
223 : */
224 72219 : void do_close_socket() noexcept
225 : {
226 72219 : base_type::do_close_socket();
227 72219 : remote_endpoint_ = Endpoint{};
228 72219 : }
229 :
230 : /** Release the socket without closing the fd.
231 :
232 : Extends the base do_release_socket() to also reset
233 : the remote endpoint.
234 : */
235 2 : native_handle_type do_release_socket() noexcept
236 : {
237 2 : auto fd = base_type::do_release_socket();
238 2 : remote_endpoint_ = Endpoint{};
239 2 : return fd;
240 : }
241 :
242 : private:
243 : // CRTP callbacks for reactor_basic_socket cancel/close
244 :
245 : template<class Op>
246 191 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
247 : {
248 191 : if (&op == static_cast<void*>(&conn_))
249 MIS 0 : return &this->desc_state_.connect_op;
250 HIT 191 : if (&op == static_cast<void*>(&rd_))
251 191 : return &this->desc_state_.read_op;
252 MIS 0 : if (&op == static_cast<void*>(&wr_))
253 0 : return &this->desc_state_.write_op;
254 0 : return nullptr;
255 : }
256 :
257 : template<class Op>
258 0 : bool* op_to_cancel_flag(Op& op) noexcept
259 : {
260 0 : if (&op == static_cast<void*>(&conn_))
261 0 : return &this->desc_state_.connect_cancel_pending;
262 0 : if (&op == static_cast<void*>(&rd_))
263 0 : return &this->desc_state_.read_cancel_pending;
264 0 : if (&op == static_cast<void*>(&wr_))
265 0 : return &this->desc_state_.write_cancel_pending;
266 0 : return nullptr;
267 : }
268 :
269 : template<class Fn>
270 HIT 72405 : void for_each_op(Fn fn) noexcept
271 : {
272 72405 : fn(conn_);
273 72405 : fn(rd_);
274 72405 : fn(wr_);
275 72405 : }
276 :
277 : template<class Fn>
278 72405 : void for_each_desc_entry(Fn fn) noexcept
279 : {
280 72405 : fn(conn_, this->desc_state_.connect_op);
281 72405 : fn(rd_, this->desc_state_.read_op);
282 72405 : fn(wr_, this->desc_state_.write_op);
283 72405 : }
284 : };
285 :
286 : template<
287 : class Derived,
288 : class Service,
289 : class ConnOp,
290 : class ReadOp,
291 : class WriteOp,
292 : class DescState,
293 : class ImplBase,
294 : class Endpoint>
295 : std::coroutine_handle<>
296 7987 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
297 : do_connect(
298 : std::coroutine_handle<> h,
299 : capy::executor_ref ex,
300 : Endpoint const& ep,
301 : std::stop_token const& token,
302 : std::error_code* ec)
303 : {
304 7987 : auto& op = conn_;
305 :
306 7987 : sockaddr_storage storage{};
307 7987 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
308 : int result =
309 7987 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
310 :
311 7987 : if (result == 0)
312 : {
313 4 : sockaddr_storage local_storage{};
314 4 : socklen_t local_len = sizeof(local_storage);
315 4 : if (::getsockname(
316 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
317 4 : &local_len) == 0)
318 MIS 0 : this->local_endpoint_ =
319 HIT 4 : from_sockaddr_as(local_storage, local_len, Endpoint{});
320 4 : remote_endpoint_ = ep;
321 : }
322 :
323 7987 : if (result == 0 || errno != EINPROGRESS)
324 : {
325 4 : int err = (result < 0) ? errno : 0;
326 4 : if (this->svc_.scheduler().try_consume_inline_budget())
327 : {
328 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
329 0 : op.cont_op.cont.h = h;
330 0 : return dispatch_coro(ex, op.cont_op.cont);
331 : }
332 HIT 4 : op.reset();
333 4 : op.h = h;
334 4 : op.ex = ex;
335 4 : op.ec_out = ec;
336 4 : op.fd = this->fd_;
337 4 : op.target_endpoint = ep;
338 4 : op.start(token, static_cast<Derived*>(this));
339 4 : op.impl_ptr = this->shared_from_this();
340 4 : op.complete(err, 0);
341 4 : this->svc_.post(&op);
342 4 : return std::noop_coroutine();
343 : }
344 :
345 : // EINPROGRESS — register with reactor
346 7983 : op.reset();
347 7983 : op.h = h;
348 7983 : op.ex = ex;
349 7983 : op.ec_out = ec;
350 7983 : op.fd = this->fd_;
351 7983 : op.target_endpoint = ep;
352 7983 : op.start(token, static_cast<Derived*>(this));
353 7983 : op.impl_ptr = this->shared_from_this();
354 :
355 7983 : this->register_op(
356 7983 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
357 7983 : this->desc_state_.connect_cancel_pending, true);
358 7983 : return std::noop_coroutine();
359 : }
360 :
361 : template<
362 : class Derived,
363 : class Service,
364 : class ConnOp,
365 : class ReadOp,
366 : class WriteOp,
367 : class DescState,
368 : class ImplBase,
369 : class Endpoint>
370 : std::coroutine_handle<>
371 197286 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
372 : do_read_some(
373 : std::coroutine_handle<> h,
374 : capy::executor_ref ex,
375 : buffer_param param,
376 : std::stop_token const& token,
377 : std::error_code* ec,
378 : std::size_t* bytes_out)
379 : {
380 197286 : auto& op = rd_;
381 197286 : op.reset();
382 :
383 197286 : capy::mutable_buffer bufs[ReadOp::max_buffers];
384 197286 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
385 :
386 197286 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
387 : {
388 2 : op.empty_buffer_read = true;
389 2 : op.h = h;
390 2 : op.ex = ex;
391 2 : op.ec_out = ec;
392 2 : op.bytes_out = bytes_out;
393 2 : op.start(token, static_cast<Derived*>(this));
394 2 : op.impl_ptr = this->shared_from_this();
395 2 : op.complete(0, 0);
396 2 : this->svc_.post(&op);
397 2 : return std::noop_coroutine();
398 : }
399 :
400 394568 : for (int i = 0; i < op.iovec_count; ++i)
401 : {
402 197284 : op.iovecs[i].iov_base = bufs[i].data();
403 197284 : op.iovecs[i].iov_len = bufs[i].size();
404 : }
405 :
406 : // Speculative read
407 : ssize_t n;
408 : do
409 : {
410 197284 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
411 : }
412 197284 : while (n < 0 && errno == EINTR);
413 :
414 197284 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
415 : {
416 196898 : int err = (n < 0) ? errno : 0;
417 196898 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
418 :
419 196898 : if (this->svc_.scheduler().try_consume_inline_budget())
420 : {
421 157551 : if (err)
422 MIS 0 : *ec = make_err(err);
423 HIT 157551 : else if (n == 0)
424 10 : *ec = capy::error::eof;
425 : else
426 157541 : *ec = {};
427 157551 : *bytes_out = bytes;
428 157551 : op.cont_op.cont.h = h;
429 157551 : return dispatch_coro(ex, op.cont_op.cont);
430 : }
431 39347 : op.h = h;
432 39347 : op.ex = ex;
433 39347 : op.ec_out = ec;
434 39347 : op.bytes_out = bytes_out;
435 39347 : op.start(token, static_cast<Derived*>(this));
436 39347 : op.impl_ptr = this->shared_from_this();
437 39347 : op.complete(err, bytes);
438 39347 : this->svc_.post(&op);
439 39347 : return std::noop_coroutine();
440 : }
441 :
442 : // EAGAIN — register with reactor
443 386 : op.h = h;
444 386 : op.ex = ex;
445 386 : op.ec_out = ec;
446 386 : op.bytes_out = bytes_out;
447 386 : op.fd = this->fd_;
448 386 : op.start(token, static_cast<Derived*>(this));
449 386 : op.impl_ptr = this->shared_from_this();
450 :
451 386 : this->register_op(
452 386 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
453 386 : this->desc_state_.read_cancel_pending);
454 386 : return std::noop_coroutine();
455 : }
456 :
457 : template<
458 : class Derived,
459 : class Service,
460 : class ConnOp,
461 : class ReadOp,
462 : class WriteOp,
463 : class DescState,
464 : class ImplBase,
465 : class Endpoint>
466 : std::coroutine_handle<>
467 196992 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
468 : do_write_some(
469 : std::coroutine_handle<> h,
470 : capy::executor_ref ex,
471 : buffer_param param,
472 : std::stop_token const& token,
473 : std::error_code* ec,
474 : std::size_t* bytes_out)
475 : {
476 196992 : auto& op = wr_;
477 196992 : op.reset();
478 :
479 196992 : capy::mutable_buffer bufs[WriteOp::max_buffers];
480 196992 : op.iovec_count =
481 196992 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
482 :
483 196992 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
484 : {
485 2 : op.h = h;
486 2 : op.ex = ex;
487 2 : op.ec_out = ec;
488 2 : op.bytes_out = bytes_out;
489 2 : op.start(token, static_cast<Derived*>(this));
490 2 : op.impl_ptr = this->shared_from_this();
491 2 : op.complete(0, 0);
492 2 : this->svc_.post(&op);
493 2 : return std::noop_coroutine();
494 : }
495 :
496 393980 : for (int i = 0; i < op.iovec_count; ++i)
497 : {
498 196990 : op.iovecs[i].iov_base = bufs[i].data();
499 196990 : op.iovecs[i].iov_len = bufs[i].size();
500 : }
501 :
502 : // Speculative write via backend-specific write policy
503 : ssize_t n =
504 196990 : WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
505 :
506 196990 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
507 : {
508 196990 : int err = (n < 0) ? errno : 0;
509 196990 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
510 :
511 196990 : if (this->svc_.scheduler().try_consume_inline_budget())
512 : {
513 157606 : *ec = err ? make_err(err) : std::error_code{};
514 157606 : *bytes_out = bytes;
515 157606 : op.cont_op.cont.h = h;
516 157606 : return dispatch_coro(ex, op.cont_op.cont);
517 : }
518 39384 : op.h = h;
519 39384 : op.ex = ex;
520 39384 : op.ec_out = ec;
521 39384 : op.bytes_out = bytes_out;
522 39384 : op.start(token, static_cast<Derived*>(this));
523 39384 : op.impl_ptr = this->shared_from_this();
524 39384 : op.complete(err, bytes);
525 39384 : this->svc_.post(&op);
526 39384 : return std::noop_coroutine();
527 : }
528 :
529 : // EAGAIN — register with reactor
530 MIS 0 : op.h = h;
531 0 : op.ex = ex;
532 0 : op.ec_out = ec;
533 0 : op.bytes_out = bytes_out;
534 0 : op.fd = this->fd_;
535 0 : op.start(token, static_cast<Derived*>(this));
536 0 : op.impl_ptr = this->shared_from_this();
537 :
538 0 : this->register_op(
539 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
540 0 : this->desc_state_.write_cancel_pending, true);
541 0 : return std::noop_coroutine();
542 : }
543 :
544 : } // namespace boost::corosio::detail
545 :
546 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|