include/boost/corosio/local_stream_acceptor.hpp

88.2% Lines (45/51) 100.0% List of functions (13/13)
local_stream_acceptor.hpp
f(x) Functions (13)
Function Calls Lines Blocks
boost::corosio::local_stream_acceptor::move_accept_awaitable::move_accept_awaitable(boost::corosio::local_stream_acceptor&) :69 2x 100.0% 100.0% boost::corosio::local_stream_acceptor::move_accept_awaitable::await_ready() const :75 2x 100.0% 100.0% boost::corosio::local_stream_acceptor::move_accept_awaitable::await_resume() const :80 2x 70.0% 62.0% boost::corosio::local_stream_acceptor::move_accept_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :94 2x 100.0% 82.0% boost::corosio::local_stream_acceptor::accept_awaitable::accept_awaitable(boost::corosio::local_stream_acceptor&, boost::corosio::local_stream_socket&) :111 2x 100.0% 100.0% boost::corosio::local_stream_acceptor::accept_awaitable::await_ready() const :118 2x 100.0% 100.0% boost::corosio::local_stream_acceptor::accept_awaitable::await_resume() const :123 2x 83.3% 77.0% boost::corosio::local_stream_acceptor::accept_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :133 2x 100.0% 82.0% boost::corosio::local_stream_acceptor::is_open() const :211 44x 100.0% 100.0% boost::corosio::local_stream_acceptor::accept(boost::corosio::local_stream_socket&) :224 2x 75.0% 80.0% boost::corosio::local_stream_acceptor::accept() :238 2x 75.0% 80.0% boost::corosio::local_stream_acceptor::reset_peer_impl(boost::corosio::local_stream_socket&, boost::corosio::io_object::implementation*) :329 2x 100.0% 100.0% boost::corosio::local_stream_acceptor::get() const :339 48x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/io/io_object.hpp>
16 #include <boost/capy/io_result.hpp>
17 #include <boost/corosio/local_endpoint.hpp>
18 #include <boost/corosio/local_stream.hpp>
19 #include <boost/corosio/local_stream_socket.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/io_env.hpp>
23 #include <boost/capy/concept/executor.hpp>
24
25 #include <system_error>
26
27 #include <cassert>
28 #include <concepts>
29 #include <coroutine>
30 #include <cstddef>
31 #include <stop_token>
32 #include <type_traits>
33
34 namespace boost::corosio {
35
36 /* An asynchronous Unix stream acceptor for coroutine I/O.
37
38 This class provides asynchronous Unix domain stream accept
39 operations that return awaitable types. The acceptor binds
40 to a local endpoint (filesystem path) and listens for
41 incoming connections.
42
43 The library does NOT automatically unlink the socket path
44 on close. Callers are responsible for removing the socket
45 file before bind or after close.
46
47 Thread Safety:
48 Distinct objects: Safe.
49 Shared objects: Unsafe. An acceptor must not have
50 concurrent accept operations.
51 */
52 /** Options for local_stream_acceptor::bind(). */
53 enum class bind_option
54 {
55 none,
56 /// Unlink the socket path before binding (ignored for abstract paths).
57 unlink_existing
58 };
59
60 class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
61 {
62 struct move_accept_awaitable
63 {
64 local_stream_acceptor& acc_;
65 std::stop_token token_;
66 mutable std::error_code ec_;
67 mutable io_object::implementation* peer_impl_ = nullptr;
68
69 2x explicit move_accept_awaitable(
70 local_stream_acceptor& acc) noexcept
71 2x : acc_(acc)
72 {
73 2x }
74
75 2x bool await_ready() const noexcept
76 {
77 2x return token_.stop_requested();
78 }
79
80 2x capy::io_result<local_stream_socket> await_resume() const noexcept
81 {
82 2x if (token_.stop_requested())
83 return {make_error_code(std::errc::operation_canceled),
84 local_stream_socket()};
85
86 2x if (ec_ || !peer_impl_)
87 return {ec_, local_stream_socket()};
88
89 2x local_stream_socket peer(acc_.ctx_);
90 2x reset_peer_impl(peer, peer_impl_);
91 2x return {ec_, std::move(peer)};
92 2x }
93
94 2x auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
95 -> std::coroutine_handle<>
96 {
97 2x token_ = env->stop_token;
98 6x return acc_.get().accept(
99 6x h, env->executor, token_, &ec_, &peer_impl_);
100 }
101 };
102
103 struct accept_awaitable
104 {
105 local_stream_acceptor& acc_;
106 local_stream_socket& peer_;
107 std::stop_token token_;
108 mutable std::error_code ec_;
109 mutable io_object::implementation* peer_impl_ = nullptr;
110
111 2x accept_awaitable(
112 local_stream_acceptor& acc, local_stream_socket& peer) noexcept
113 2x : acc_(acc)
114 2x , peer_(peer)
115 {
116 2x }
117
118 2x bool await_ready() const noexcept
119 {
120 2x return token_.stop_requested();
121 }
122
123 2x capy::io_result<> await_resume() const noexcept
124 {
125 2x if (token_.stop_requested())
126 return {make_error_code(std::errc::operation_canceled)};
127
128 2x if (!ec_ && peer_impl_)
129 2x peer_.h_.reset(peer_impl_);
130 2x return {ec_};
131 }
132
133 2x auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
134 -> std::coroutine_handle<>
135 {
136 2x token_ = env->stop_token;
137 6x return acc_.get().accept(
138 6x h, env->executor, token_, &ec_, &peer_impl_);
139 }
140 };
141
142 public:
143 ~local_stream_acceptor() override;
144
145 explicit local_stream_acceptor(capy::execution_context& ctx);
146
147 template<class Ex>
148 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
149 capy::Executor<Ex>
150 explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
151 {
152 }
153
154 local_stream_acceptor(local_stream_acceptor&& other) noexcept
155 : local_stream_acceptor(other.ctx_, std::move(other))
156 {
157 }
158
159 local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
160 {
161 assert(&ctx_ == &other.ctx_ &&
162 "move-assign requires the same execution_context");
163 if (this != &other)
164 {
165 close();
166 io_object::operator=(std::move(other));
167 }
168 return *this;
169 }
170
171 local_stream_acceptor(local_stream_acceptor const&) = delete;
172 local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
173
174 /** Create the acceptor socket.
175
176 @param proto The protocol. Defaults to local_stream{}.
177
178 @throws std::system_error on failure.
179 */
180 void open(local_stream proto = {});
181
182 /** Bind to a local endpoint.
183
184 @param ep The local endpoint (path) to bind to.
185 @param opt Bind options. Pass bind_option::unlink_existing
186 to unlink the socket path before binding (ignored for
187 abstract sockets and empty endpoints).
188
189 @return An error code on failure, empty on success.
190
191 @throws std::logic_error if the acceptor is not open.
192 */
193 [[nodiscard]] std::error_code
194 bind(corosio::local_endpoint ep,
195 bind_option opt = bind_option::none);
196
197 /** Start listening for incoming connections.
198
199 @param backlog The maximum pending connection queue length.
200
201 @return An error code on failure, empty on success.
202
203 @throws std::logic_error if the acceptor is not open.
204 */
205 [[nodiscard]] std::error_code listen(int backlog = 128);
206
207 /// Close the acceptor.
208 void close();
209
210 /// Check if the acceptor is listening.
211 44x bool is_open() const noexcept
212 {
213 44x return h_ && get().is_open();
214 }
215
216 /** Initiate an asynchronous accept operation.
217
218 @param peer The socket to receive the accepted connection.
219
220 @return An awaitable that completes with io_result<>.
221
222 @throws std::logic_error if the acceptor is not listening.
223 */
224 2x auto accept(local_stream_socket& peer)
225 {
226 2x if (!is_open())
227 detail::throw_logic_error("accept: acceptor not listening");
228 2x return accept_awaitable(*this, peer);
229 }
230
231 /** Initiate an asynchronous accept, returning the socket.
232
233 @return An awaitable that completes with
234 io_result<local_stream_socket>.
235
236 @throws std::logic_error if the acceptor is not listening.
237 */
238 2x auto accept()
239 {
240 2x if (!is_open())
241 detail::throw_logic_error("accept: acceptor not listening");
242 2x return move_accept_awaitable(*this);
243 }
244
245 void cancel();
246
247 /** Release ownership of the native socket handle.
248
249 Deregisters the acceptor from the reactor and cancels
250 pending operations without closing the fd.
251
252 @return The native handle, or -1 if not open.
253
254 @throws std::logic_error if the acceptor is not open.
255 */
256 native_handle_type release();
257
258 corosio::local_endpoint local_endpoint() const noexcept;
259
260 template<class Option>
261 void set_option(Option const& opt)
262 {
263 if (!is_open())
264 detail::throw_logic_error("set_option: acceptor not open");
265 std::error_code ec = get().set_option(
266 Option::level(), Option::name(), opt.data(), opt.size());
267 if (ec)
268 detail::throw_system_error(ec, "local_stream_acceptor::set_option");
269 }
270
271 template<class Option>
272 Option get_option() const
273 {
274 if (!is_open())
275 detail::throw_logic_error("get_option: acceptor not open");
276 Option opt{};
277 std::size_t sz = opt.size();
278 std::error_code ec =
279 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
280 if (ec)
281 detail::throw_system_error(ec, "local_stream_acceptor::get_option");
282 opt.resize(sz);
283 return opt;
284 }
285
286 /** Define backend hooks for local stream acceptor operations. */
287 struct implementation : io_object::implementation
288 {
289 virtual std::coroutine_handle<> accept(
290 std::coroutine_handle<>,
291 capy::executor_ref,
292 std::stop_token,
293 std::error_code*,
294 io_object::implementation**) = 0;
295
296 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
297
298 virtual bool is_open() const noexcept = 0;
299
300 virtual native_handle_type release_socket() noexcept = 0;
301
302 virtual void cancel() noexcept = 0;
303
304 virtual std::error_code set_option(
305 int level,
306 int optname,
307 void const* data,
308 std::size_t size) noexcept = 0;
309
310 virtual std::error_code
311 get_option(int level, int optname, void* data, std::size_t* size)
312 const noexcept = 0;
313 };
314
315 protected:
316 local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
317 : io_object(std::move(h))
318 , ctx_(ctx)
319 {
320 }
321
322 local_stream_acceptor(
323 capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
324 : io_object(std::move(other))
325 , ctx_(ctx)
326 {
327 }
328
329 2x static void reset_peer_impl(
330 local_stream_socket& peer, io_object::implementation* impl) noexcept
331 {
332 2x if (impl)
333 2x peer.h_.reset(impl);
334 2x }
335
336 private:
337 capy::execution_context& ctx_;
338
339 48x inline implementation& get() const noexcept
340 {
341 48x return *static_cast<implementation*>(h_.get());
342 }
343 };
344
345 } // namespace boost::corosio
346
347 #endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
348