TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/io/io_object.hpp>
16 : #include <boost/capy/io_result.hpp>
17 : #include <boost/corosio/local_endpoint.hpp>
18 : #include <boost/corosio/local_stream.hpp>
19 : #include <boost/corosio/local_stream_socket.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/concept/executor.hpp>
24 :
25 : #include <system_error>
26 :
27 : #include <cassert>
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <stop_token>
32 : #include <type_traits>
33 :
34 : namespace boost::corosio {
35 :
36 : /* An asynchronous Unix stream acceptor for coroutine I/O.
37 :
38 : This class provides asynchronous Unix domain stream accept
39 : operations that return awaitable types. The acceptor binds
40 : to a local endpoint (filesystem path) and listens for
41 : incoming connections.
42 :
43 : The library does NOT automatically unlink the socket path
44 : on close. Callers are responsible for removing the socket
45 : file before bind or after close.
46 :
47 : Thread Safety:
48 : Distinct objects: Safe.
49 : Shared objects: Unsafe. An acceptor must not have
50 : concurrent accept operations.
51 : */
52 : /** Options for local_stream_acceptor::bind(). */
53 : enum class bind_option
54 : {
55 : none,
56 : /// Unlink the socket path before binding (ignored for abstract paths).
57 : unlink_existing
58 : };
59 :
60 : class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
61 : {
62 : struct move_accept_awaitable
63 : {
64 : local_stream_acceptor& acc_;
65 : std::stop_token token_;
66 : mutable std::error_code ec_;
67 : mutable io_object::implementation* peer_impl_ = nullptr;
68 :
69 HIT 2 : explicit move_accept_awaitable(
70 : local_stream_acceptor& acc) noexcept
71 2 : : acc_(acc)
72 : {
73 2 : }
74 :
75 2 : bool await_ready() const noexcept
76 : {
77 2 : return token_.stop_requested();
78 : }
79 :
80 2 : capy::io_result<local_stream_socket> await_resume() const noexcept
81 : {
82 2 : if (token_.stop_requested())
83 MIS 0 : return {make_error_code(std::errc::operation_canceled),
84 0 : local_stream_socket()};
85 :
86 HIT 2 : if (ec_ || !peer_impl_)
87 MIS 0 : return {ec_, local_stream_socket()};
88 :
89 HIT 2 : local_stream_socket peer(acc_.ctx_);
90 2 : reset_peer_impl(peer, peer_impl_);
91 2 : return {ec_, std::move(peer)};
92 2 : }
93 :
94 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
95 : -> std::coroutine_handle<>
96 : {
97 2 : token_ = env->stop_token;
98 6 : return acc_.get().accept(
99 6 : h, env->executor, token_, &ec_, &peer_impl_);
100 : }
101 : };
102 :
103 : struct accept_awaitable
104 : {
105 : local_stream_acceptor& acc_;
106 : local_stream_socket& peer_;
107 : std::stop_token token_;
108 : mutable std::error_code ec_;
109 : mutable io_object::implementation* peer_impl_ = nullptr;
110 :
111 2 : accept_awaitable(
112 : local_stream_acceptor& acc, local_stream_socket& peer) noexcept
113 2 : : acc_(acc)
114 2 : , peer_(peer)
115 : {
116 2 : }
117 :
118 2 : bool await_ready() const noexcept
119 : {
120 2 : return token_.stop_requested();
121 : }
122 :
123 2 : capy::io_result<> await_resume() const noexcept
124 : {
125 2 : if (token_.stop_requested())
126 MIS 0 : return {make_error_code(std::errc::operation_canceled)};
127 :
128 HIT 2 : if (!ec_ && peer_impl_)
129 2 : peer_.h_.reset(peer_impl_);
130 2 : return {ec_};
131 : }
132 :
133 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
134 : -> std::coroutine_handle<>
135 : {
136 2 : token_ = env->stop_token;
137 6 : return acc_.get().accept(
138 6 : h, env->executor, token_, &ec_, &peer_impl_);
139 : }
140 : };
141 :
142 : public:
143 : ~local_stream_acceptor() override;
144 :
145 : explicit local_stream_acceptor(capy::execution_context& ctx);
146 :
147 : template<class Ex>
148 : requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
149 : capy::Executor<Ex>
150 : explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
151 : {
152 : }
153 :
154 : local_stream_acceptor(local_stream_acceptor&& other) noexcept
155 : : local_stream_acceptor(other.ctx_, std::move(other))
156 : {
157 : }
158 :
159 : local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
160 : {
161 : assert(&ctx_ == &other.ctx_ &&
162 : "move-assign requires the same execution_context");
163 : if (this != &other)
164 : {
165 : close();
166 : io_object::operator=(std::move(other));
167 : }
168 : return *this;
169 : }
170 :
171 : local_stream_acceptor(local_stream_acceptor const&) = delete;
172 : local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
173 :
174 : /** Create the acceptor socket.
175 :
176 : @param proto The protocol. Defaults to local_stream{}.
177 :
178 : @throws std::system_error on failure.
179 : */
180 : void open(local_stream proto = {});
181 :
182 : /** Bind to a local endpoint.
183 :
184 : @param ep The local endpoint (path) to bind to.
185 : @param opt Bind options. Pass bind_option::unlink_existing
186 : to unlink the socket path before binding (ignored for
187 : abstract sockets and empty endpoints).
188 :
189 : @return An error code on failure, empty on success.
190 :
191 : @throws std::logic_error if the acceptor is not open.
192 : */
193 : [[nodiscard]] std::error_code
194 : bind(corosio::local_endpoint ep,
195 : bind_option opt = bind_option::none);
196 :
197 : /** Start listening for incoming connections.
198 :
199 : @param backlog The maximum pending connection queue length.
200 :
201 : @return An error code on failure, empty on success.
202 :
203 : @throws std::logic_error if the acceptor is not open.
204 : */
205 : [[nodiscard]] std::error_code listen(int backlog = 128);
206 :
207 : /// Close the acceptor.
208 : void close();
209 :
210 : /// Check if the acceptor is listening.
211 44 : bool is_open() const noexcept
212 : {
213 44 : return h_ && get().is_open();
214 : }
215 :
216 : /** Initiate an asynchronous accept operation.
217 :
218 : @param peer The socket to receive the accepted connection.
219 :
220 : @return An awaitable that completes with io_result<>.
221 :
222 : @throws std::logic_error if the acceptor is not listening.
223 : */
224 2 : auto accept(local_stream_socket& peer)
225 : {
226 2 : if (!is_open())
227 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
228 HIT 2 : return accept_awaitable(*this, peer);
229 : }
230 :
231 : /** Initiate an asynchronous accept, returning the socket.
232 :
233 : @return An awaitable that completes with
234 : io_result<local_stream_socket>.
235 :
236 : @throws std::logic_error if the acceptor is not listening.
237 : */
238 2 : auto accept()
239 : {
240 2 : if (!is_open())
241 MIS 0 : detail::throw_logic_error("accept: acceptor not listening");
242 HIT 2 : return move_accept_awaitable(*this);
243 : }
244 :
245 : void cancel();
246 :
247 : /** Release ownership of the native socket handle.
248 :
249 : Deregisters the acceptor from the reactor and cancels
250 : pending operations without closing the fd.
251 :
252 : @return The native handle, or -1 if not open.
253 :
254 : @throws std::logic_error if the acceptor is not open.
255 : */
256 : native_handle_type release();
257 :
258 : corosio::local_endpoint local_endpoint() const noexcept;
259 :
260 : template<class Option>
261 : void set_option(Option const& opt)
262 : {
263 : if (!is_open())
264 : detail::throw_logic_error("set_option: acceptor not open");
265 : std::error_code ec = get().set_option(
266 : Option::level(), Option::name(), opt.data(), opt.size());
267 : if (ec)
268 : detail::throw_system_error(ec, "local_stream_acceptor::set_option");
269 : }
270 :
271 : template<class Option>
272 : Option get_option() const
273 : {
274 : if (!is_open())
275 : detail::throw_logic_error("get_option: acceptor not open");
276 : Option opt{};
277 : std::size_t sz = opt.size();
278 : std::error_code ec =
279 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
280 : if (ec)
281 : detail::throw_system_error(ec, "local_stream_acceptor::get_option");
282 : opt.resize(sz);
283 : return opt;
284 : }
285 :
286 : /** Define backend hooks for local stream acceptor operations. */
287 : struct implementation : io_object::implementation
288 : {
289 : virtual std::coroutine_handle<> accept(
290 : std::coroutine_handle<>,
291 : capy::executor_ref,
292 : std::stop_token,
293 : std::error_code*,
294 : io_object::implementation**) = 0;
295 :
296 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
297 :
298 : virtual bool is_open() const noexcept = 0;
299 :
300 : virtual native_handle_type release_socket() noexcept = 0;
301 :
302 : virtual void cancel() noexcept = 0;
303 :
304 : virtual std::error_code set_option(
305 : int level,
306 : int optname,
307 : void const* data,
308 : std::size_t size) noexcept = 0;
309 :
310 : virtual std::error_code
311 : get_option(int level, int optname, void* data, std::size_t* size)
312 : const noexcept = 0;
313 : };
314 :
315 : protected:
316 : local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
317 : : io_object(std::move(h))
318 : , ctx_(ctx)
319 : {
320 : }
321 :
322 : local_stream_acceptor(
323 : capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
324 : : io_object(std::move(other))
325 : , ctx_(ctx)
326 : {
327 : }
328 :
329 2 : static void reset_peer_impl(
330 : local_stream_socket& peer, io_object::implementation* impl) noexcept
331 : {
332 2 : if (impl)
333 2 : peer.h_.reset(impl);
334 2 : }
335 :
336 : private:
337 : capy::execution_context& ctx_;
338 :
339 48 : inline implementation& get() const noexcept
340 : {
341 48 : return *static_cast<implementation*>(h_.get());
342 : }
343 : };
344 :
345 : } // namespace boost::corosio
346 :
347 : #endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
|