TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 : #include <boost/corosio/native/detail/make_err.hpp>
16 : #include <boost/corosio/io/io_object.hpp>
17 :
18 : #include <coroutine>
19 : #include <mutex>
20 : #include <utility>
21 :
22 : #include <netinet/in.h>
23 : #include <sys/socket.h>
24 : #include <unistd.h>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Complete a base read/write operation.
29 :
30 : Translates the recorded errno and cancellation state into
31 : an error_code, stores the byte count, then resumes the
32 : caller via symmetric transfer.
33 :
34 : @tparam Op The concrete operation type.
35 : @param op The operation to complete.
36 : */
37 : template<typename Op>
38 : void
39 HIT 79153 : complete_io_op(Op& op)
40 : {
41 79153 : op.stop_cb.reset();
42 79153 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43 :
44 79153 : if (op.cancelled.load(std::memory_order_acquire))
45 305 : *op.ec_out = capy::error::canceled;
46 78848 : else if (op.errn != 0)
47 MIS 0 : *op.ec_out = make_err(op.errn);
48 HIT 78848 : else if (op.is_read_operation() && op.bytes_transferred == 0)
49 MIS 0 : *op.ec_out = capy::error::eof;
50 : else
51 HIT 78848 : *op.ec_out = {};
52 :
53 79153 : *op.bytes_out = op.bytes_transferred;
54 :
55 79153 : op.cont_op.cont.h = op.h;
56 79153 : capy::executor_ref saved_ex(op.ex);
57 79153 : auto prevent = std::move(op.impl_ptr);
58 79153 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 79153 : }
60 :
61 : /** Complete a connect operation with endpoint caching.
62 :
63 : On success, queries the local endpoint via getsockname and
64 : caches both endpoints in the socket impl. Then resumes the
65 : caller via symmetric transfer.
66 :
67 : @tparam Op The concrete connect operation type.
68 : @param op The operation to complete.
69 : */
70 : template<typename Op>
71 : void
72 8001 : complete_connect_op(Op& op)
73 : {
74 8001 : op.stop_cb.reset();
75 8001 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
76 :
77 8001 : bool success =
78 8001 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
79 :
80 8001 : if (success && op.socket_impl_)
81 : {
82 : using ep_type = decltype(op.target_endpoint);
83 7997 : ep_type local_ep;
84 7997 : sockaddr_storage local_storage{};
85 7997 : socklen_t local_len = sizeof(local_storage);
86 7997 : if (::getsockname(
87 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
88 7997 : &local_len) == 0)
89 7993 : local_ep =
90 7997 : from_sockaddr_as(local_storage, local_len, ep_type{});
91 7997 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
92 : }
93 :
94 8001 : if (op.cancelled.load(std::memory_order_acquire))
95 MIS 0 : *op.ec_out = capy::error::canceled;
96 HIT 8001 : else if (op.errn != 0)
97 4 : *op.ec_out = make_err(op.errn);
98 : else
99 7997 : *op.ec_out = {};
100 :
101 8001 : op.cont_op.cont.h = op.h;
102 8001 : capy::executor_ref saved_ex(op.ex);
103 8001 : auto prevent = std::move(op.impl_ptr);
104 8001 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
105 8001 : }
106 :
107 : /** Construct and register a peer socket from an accepted fd.
108 :
109 : Creates a new socket impl via the acceptor's associated
110 : socket service, registers it with the scheduler, and caches
111 : the local and remote endpoints.
112 :
113 : @tparam SocketImpl The concrete socket implementation type.
114 : @tparam AcceptorImpl The concrete acceptor implementation type.
115 : @param acceptor_impl The acceptor that accepted the connection.
116 : @param accepted_fd The accepted file descriptor (set to -1 on success).
117 : @param peer_storage The peer address from accept().
118 : @param peer_addrlen The actual peer address length from accept().
119 : @param impl_out Output pointer for the new socket impl.
120 : @param ec_out Output pointer for any error.
121 : @return True on success, false on failure.
122 : */
123 : template<typename SocketImpl, typename AcceptorImpl>
124 : bool
125 7983 : setup_accepted_socket(
126 : AcceptorImpl* acceptor_impl,
127 : int& accepted_fd,
128 : sockaddr_storage const& peer_storage,
129 : socklen_t peer_addrlen,
130 : io_object::implementation** impl_out,
131 : std::error_code* ec_out)
132 : {
133 7983 : auto* socket_svc = acceptor_impl->service().stream_service();
134 7983 : if (!socket_svc)
135 : {
136 MIS 0 : *ec_out = make_err(ENOENT);
137 0 : return false;
138 : }
139 :
140 HIT 7983 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
141 7983 : impl.set_socket(accepted_fd);
142 :
143 7983 : impl.desc_state_.fd = accepted_fd;
144 : {
145 7983 : std::lock_guard lock(impl.desc_state_.mutex);
146 7983 : impl.desc_state_.read_op = nullptr;
147 7983 : impl.desc_state_.write_op = nullptr;
148 7983 : impl.desc_state_.connect_op = nullptr;
149 7983 : }
150 7983 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
151 :
152 : using ep_type = decltype(acceptor_impl->local_endpoint());
153 7983 : impl.set_endpoints(
154 : acceptor_impl->local_endpoint(),
155 7983 : from_sockaddr_as(peer_storage, peer_addrlen, ep_type{}));
156 :
157 7983 : if (impl_out)
158 7983 : *impl_out = &impl;
159 7983 : accepted_fd = -1;
160 7983 : return true;
161 : }
162 :
163 : /** Complete an accept operation.
164 :
165 : Sets up the peer socket on success, or closes the accepted
166 : fd on failure. Then resumes the caller via symmetric transfer.
167 :
168 : @tparam SocketImpl The concrete socket implementation type.
169 : @tparam Op The concrete accept operation type.
170 : @param op The operation to complete.
171 : */
172 : template<typename SocketImpl, typename Op>
173 : void
174 7995 : complete_accept_op(Op& op)
175 : {
176 7995 : op.stop_cb.reset();
177 7995 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
178 :
179 7995 : bool success =
180 7995 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
181 :
182 7995 : if (op.cancelled.load(std::memory_order_acquire))
183 12 : *op.ec_out = capy::error::canceled;
184 7983 : else if (op.errn != 0)
185 MIS 0 : *op.ec_out = make_err(op.errn);
186 : else
187 HIT 7983 : *op.ec_out = {};
188 :
189 7995 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
190 : {
191 7983 : if (!setup_accepted_socket<SocketImpl>(
192 7983 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
193 : op.peer_addrlen, op.impl_out, op.ec_out))
194 MIS 0 : success = false;
195 : }
196 :
197 HIT 7995 : if (!success || !op.acceptor_impl_)
198 : {
199 12 : if (op.accepted_fd >= 0)
200 : {
201 MIS 0 : ::close(op.accepted_fd);
202 0 : op.accepted_fd = -1;
203 : }
204 HIT 12 : if (op.impl_out)
205 12 : *op.impl_out = nullptr;
206 : }
207 :
208 7995 : op.cont_op.cont.h = op.h;
209 7995 : capy::executor_ref saved_ex(op.ex);
210 7995 : auto prevent = std::move(op.impl_ptr);
211 7995 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
212 7995 : }
213 :
214 : /** Complete a connected datagram operation (send or recv).
215 :
216 : No source endpoint to capture. Critically, does NOT map
217 : bytes_transferred == 0 to EOF — zero-length datagrams are valid
218 : events on connected datagram sockets.
219 :
220 : @tparam Op The concrete datagram operation type.
221 : @param op The operation to complete.
222 : */
223 : template<typename Op>
224 : void
225 10 : complete_datagram_op(Op& op)
226 : {
227 10 : op.stop_cb.reset();
228 10 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
229 :
230 10 : if (op.cancelled.load(std::memory_order_acquire))
231 2 : *op.ec_out = capy::error::canceled;
232 8 : else if (op.errn != 0)
233 MIS 0 : *op.ec_out = make_err(op.errn);
234 : else
235 HIT 8 : *op.ec_out = {};
236 :
237 10 : *op.bytes_out = op.bytes_transferred;
238 :
239 10 : op.cont_op.cont.h = op.h;
240 10 : capy::executor_ref saved_ex(op.ex);
241 10 : auto prevent = std::move(op.impl_ptr);
242 10 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
243 10 : }
244 :
245 : /** Complete a datagram operation with source endpoint capture.
246 :
247 : For recv_from operations, writes the source endpoint from the
248 : recorded sockaddr_storage into the caller's endpoint pointer.
249 : Then resumes the caller via symmetric transfer.
250 :
251 : @tparam Op The concrete datagram operation type.
252 : @param op The operation to complete.
253 : @param source_out Optional pointer to store source endpoint
254 : (non-null for recv_from, null for send_to).
255 : */
256 : template<typename Op, typename Endpoint>
257 : void
258 18 : complete_datagram_op(Op& op, Endpoint* source_out)
259 : {
260 18 : op.stop_cb.reset();
261 18 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
262 :
263 18 : if (op.cancelled.load(std::memory_order_acquire))
264 6 : *op.ec_out = capy::error::canceled;
265 12 : else if (op.errn != 0)
266 MIS 0 : *op.ec_out = make_err(op.errn);
267 : else
268 HIT 12 : *op.ec_out = {};
269 :
270 18 : *op.bytes_out = op.bytes_transferred;
271 :
272 28 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
273 10 : op.errn == 0)
274 20 : *source_out = from_sockaddr_as(
275 10 : op.source_storage,
276 : op.source_addrlen,
277 : Endpoint{});
278 :
279 18 : op.cont_op.cont.h = op.h;
280 18 : capy::executor_ref saved_ex(op.ex);
281 18 : auto prevent = std::move(op.impl_ptr);
282 18 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
283 18 : }
284 :
285 : } // namespace boost::corosio::detail
286 :
287 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|