TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
11 : #define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/except.hpp>
15 : #include <boost/corosio/io/io_object.hpp>
16 : #include <boost/capy/io_result.hpp>
17 : #include <boost/corosio/local_endpoint.hpp>
18 : #include <boost/corosio/local_stream.hpp>
19 : #include <boost/corosio/local_stream_socket.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/concept/executor.hpp>
24 :
25 : #include <system_error>
26 :
27 : #include <cassert>
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <stop_token>
32 : #include <type_traits>
33 :
34 : namespace boost::corosio {
35 :
36 : /** Options for @ref local_stream_acceptor::bind. */
37 : enum class bind_option
38 : {
39 : /// Default: do not unlink the socket path.
40 : none,
41 : /// Unlink the socket path before binding (ignored for abstract paths).
42 : unlink_existing
43 : };
44 :
45 : /** An asynchronous Unix stream acceptor for coroutine I/O.
46 :
47 : This class provides asynchronous Unix domain stream accept
48 : operations that return awaitable types. The acceptor binds
49 : to a local endpoint (filesystem path) and listens for
50 : incoming connections.
51 :
52 : The library does NOT automatically unlink the socket path
53 : on close. Callers are responsible for removing the socket
54 : file before bind or after close, or pass
55 : @ref bind_option::unlink_existing to @ref bind.
56 :
57 : @par Thread Safety
58 : Distinct objects: Safe.@n
59 : Shared objects: Unsafe. An acceptor must not have concurrent
60 : accept operations.
61 :
62 : @par Semantics
63 : Wraps the platform Unix domain socket listener. Operations
64 : dispatch to OS accept APIs via the io_context reactor.
65 : Cancellation propagates through the IoAwaitable stop_token
66 : or via @ref cancel; cancelled operations resume with
67 : `errc::operation_canceled`.
68 :
69 : @par Example
70 : @code
71 : io_context ioc;
72 : local_stream_acceptor acc(ioc);
73 : acc.open();
74 : if (auto ec = acc.bind(
75 : local_endpoint("/tmp/my.sock"),
76 : bind_option::unlink_existing))
77 : return ec;
78 : if (auto ec = acc.listen())
79 : return ec;
80 :
81 : local_stream_socket peer(ioc);
82 : auto [ec] = co_await acc.accept(peer);
83 : if (!ec) {
84 : // peer is now connected
85 : }
86 : @endcode
87 :
88 : @see local_stream_socket, local_endpoint, local_stream
89 : */
90 : class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
91 : {
92 : struct move_accept_awaitable
93 : {
94 : local_stream_acceptor& acc_;
95 : std::stop_token token_;
96 : mutable std::error_code ec_;
97 : mutable io_object::implementation* peer_impl_ = nullptr;
98 :
99 HIT 2 : explicit move_accept_awaitable(
100 : local_stream_acceptor& acc) noexcept
101 2 : : acc_(acc)
102 : {
103 2 : }
104 :
105 2 : bool await_ready() const noexcept
106 : {
107 2 : return token_.stop_requested();
108 : }
109 :
110 2 : capy::io_result<local_stream_socket> await_resume() const noexcept
111 : {
112 2 : if (token_.stop_requested())
113 MIS 0 : return {make_error_code(std::errc::operation_canceled),
114 0 : local_stream_socket()};
115 :
116 HIT 2 : if (ec_ || !peer_impl_)
117 MIS 0 : return {ec_, local_stream_socket()};
118 :
119 HIT 2 : local_stream_socket peer(acc_.ctx_);
120 2 : reset_peer_impl(peer, peer_impl_);
121 2 : return {ec_, std::move(peer)};
122 2 : }
123 :
124 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
125 : -> std::coroutine_handle<>
126 : {
127 2 : token_ = env->stop_token;
128 2 : if (token_.stop_requested())
129 : {
130 MIS 0 : ec_ = make_error_code(std::errc::operation_canceled);
131 0 : return h;
132 : }
133 HIT 6 : return acc_.get().accept(
134 4 : h, env->executor, token_, &ec_, &peer_impl_);
135 : }
136 : };
137 :
138 : struct accept_awaitable
139 : {
140 : local_stream_acceptor& acc_;
141 : local_stream_socket& peer_;
142 : std::stop_token token_;
143 : mutable std::error_code ec_;
144 : mutable io_object::implementation* peer_impl_ = nullptr;
145 :
146 2 : accept_awaitable(
147 : local_stream_acceptor& acc, local_stream_socket& peer) noexcept
148 2 : : acc_(acc)
149 2 : , peer_(peer)
150 : {
151 2 : }
152 :
153 2 : bool await_ready() const noexcept
154 : {
155 2 : return token_.stop_requested();
156 : }
157 :
158 2 : capy::io_result<> await_resume() const noexcept
159 : {
160 2 : if (token_.stop_requested())
161 MIS 0 : return {make_error_code(std::errc::operation_canceled)};
162 :
163 HIT 2 : if (!ec_ && peer_impl_)
164 2 : peer_.h_.reset(peer_impl_);
165 2 : return {ec_};
166 : }
167 :
168 2 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
169 : -> std::coroutine_handle<>
170 : {
171 2 : token_ = env->stop_token;
172 2 : if (token_.stop_requested())
173 : {
174 MIS 0 : ec_ = make_error_code(std::errc::operation_canceled);
175 0 : return h;
176 : }
177 HIT 6 : return acc_.get().accept(
178 4 : h, env->executor, token_, &ec_, &peer_impl_);
179 : }
180 : };
181 :
182 : public:
183 : ~local_stream_acceptor() override;
184 :
185 : explicit local_stream_acceptor(capy::execution_context& ctx);
186 :
187 : template<class Ex>
188 : requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
189 : capy::Executor<Ex>
190 : explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
191 : {
192 : }
193 :
194 : local_stream_acceptor(local_stream_acceptor&& other) noexcept
195 : : local_stream_acceptor(other.ctx_, std::move(other))
196 : {
197 : }
198 :
199 : local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
200 : {
201 : assert(&ctx_ == &other.ctx_ &&
202 : "move-assign requires the same execution_context");
203 : if (this != &other)
204 : {
205 : close();
206 : io_object::operator=(std::move(other));
207 : }
208 : return *this;
209 : }
210 :
211 : local_stream_acceptor(local_stream_acceptor const&) = delete;
212 : local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
213 :
214 : /** Create the acceptor socket.
215 :
216 : @param proto The protocol. Defaults to local_stream{}.
217 :
218 : @throws std::system_error on failure.
219 : */
220 : void open(local_stream proto = {});
221 :
222 : /** Bind to a local endpoint.
223 :
224 : @param ep The local endpoint (path) to bind to.
225 : @param opt Bind options. Pass bind_option::unlink_existing
226 : to unlink the socket path before binding (ignored for
227 : abstract sockets and empty endpoints).
228 :
229 : @return An error code on failure, empty on success.
230 :
231 : @throws std::logic_error if the acceptor is not open.
232 : */
233 : [[nodiscard]] std::error_code
234 : bind(corosio::local_endpoint ep,
235 : bind_option opt = bind_option::none);
236 :
237 : /** Start listening for incoming connections.
238 :
239 : @param backlog The maximum pending connection queue length.
240 :
241 : @return An error code on failure, empty on success.
242 :
243 : @throws std::logic_error if the acceptor is not open.
244 : */
245 : [[nodiscard]] std::error_code listen(int backlog = 128);
246 :
247 : /// Close the acceptor.
248 : void close();
249 :
250 : /** Check if the acceptor has a native handle.
251 :
252 : Returns true once @ref open succeeds and until @ref close is
253 : called. This does not indicate that @ref listen has been
254 : invoked — an open-but-not-listening acceptor will still
255 : report `true`.
256 : */
257 44 : bool is_open() const noexcept
258 : {
259 44 : return h_ && get().is_open();
260 : }
261 :
262 : /** Initiate an asynchronous accept operation.
263 :
264 : @param peer The socket to receive the accepted connection.
265 :
266 : @return An awaitable that completes with io_result<>.
267 :
268 : @throws std::logic_error if the native acceptor handle is
269 : absent (i.e., `!is_open()`). Calling accept on an
270 : open-but-not-listening acceptor does not throw; the
271 : awaitable completes with a kernel error such as
272 : `errc::invalid_argument` (EINVAL).
273 : */
274 2 : auto accept(local_stream_socket& peer)
275 : {
276 2 : if (!is_open())
277 MIS 0 : detail::throw_logic_error("accept: acceptor not open");
278 HIT 2 : return accept_awaitable(*this, peer);
279 : }
280 :
281 : /** Initiate an asynchronous accept, returning the socket.
282 :
283 : @return An awaitable that completes with
284 : io_result<local_stream_socket>.
285 :
286 : @throws std::logic_error if the native acceptor handle is
287 : absent (i.e., `!is_open()`). Calling accept on an
288 : open-but-not-listening acceptor does not throw; the
289 : awaitable completes with a kernel error such as
290 : `errc::invalid_argument` (EINVAL).
291 : */
292 2 : auto accept()
293 : {
294 2 : if (!is_open())
295 MIS 0 : detail::throw_logic_error("accept: acceptor not open");
296 HIT 2 : return move_accept_awaitable(*this);
297 : }
298 :
299 : void cancel();
300 :
301 : /** Release ownership of the native socket handle.
302 :
303 : Deregisters the acceptor from the reactor and cancels
304 : pending operations without closing the fd.
305 :
306 : @return The native handle.
307 :
308 : @throws std::logic_error if the acceptor is not open.
309 : */
310 : native_handle_type release();
311 :
312 : corosio::local_endpoint local_endpoint() const noexcept;
313 :
314 : template<class Option>
315 : void set_option(Option const& opt)
316 : {
317 : if (!is_open())
318 : detail::throw_logic_error("set_option: acceptor not open");
319 : std::error_code ec = get().set_option(
320 : Option::level(), Option::name(), opt.data(), opt.size());
321 : if (ec)
322 : detail::throw_system_error(ec, "local_stream_acceptor::set_option");
323 : }
324 :
325 : template<class Option>
326 : Option get_option() const
327 : {
328 : if (!is_open())
329 : detail::throw_logic_error("get_option: acceptor not open");
330 : Option opt{};
331 : std::size_t sz = opt.size();
332 : std::error_code ec =
333 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
334 : if (ec)
335 : detail::throw_system_error(ec, "local_stream_acceptor::get_option");
336 : opt.resize(sz);
337 : return opt;
338 : }
339 :
340 : /** Define backend hooks for local stream acceptor operations. */
341 : struct implementation : io_object::implementation
342 : {
343 : virtual std::coroutine_handle<> accept(
344 : std::coroutine_handle<>,
345 : capy::executor_ref,
346 : std::stop_token,
347 : std::error_code*,
348 : io_object::implementation**) = 0;
349 :
350 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
351 :
352 : virtual bool is_open() const noexcept = 0;
353 :
354 : virtual native_handle_type release_socket() noexcept = 0;
355 :
356 : virtual void cancel() noexcept = 0;
357 :
358 : virtual std::error_code set_option(
359 : int level,
360 : int optname,
361 : void const* data,
362 : std::size_t size) noexcept = 0;
363 :
364 : virtual std::error_code
365 : get_option(int level, int optname, void* data, std::size_t* size)
366 : const noexcept = 0;
367 : };
368 :
369 : protected:
370 : local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
371 : : io_object(std::move(h))
372 : , ctx_(ctx)
373 : {
374 : }
375 :
376 : local_stream_acceptor(
377 : capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
378 : : io_object(std::move(other))
379 : , ctx_(ctx)
380 : {
381 : }
382 :
383 2 : static void reset_peer_impl(
384 : local_stream_socket& peer, io_object::implementation* impl) noexcept
385 : {
386 2 : if (impl)
387 2 : peer.h_.reset(impl);
388 2 : }
389 :
390 : private:
391 : capy::execution_context& ctx_;
392 :
393 48 : inline implementation& get() const noexcept
394 : {
395 48 : return *static_cast<implementation*>(h_.get());
396 : }
397 : };
398 :
399 : } // namespace boost::corosio
400 :
401 : #endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
|