1 +
//
 
2 +
// Copyright (c) 2026 Michael Vandeberg
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
11 +
#define BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/config.hpp>
 
14 +
#include <boost/corosio/detail/except.hpp>
 
15 +
#include <boost/corosio/io/io_object.hpp>
 
16 +
#include <boost/capy/io_result.hpp>
 
17 +
#include <boost/corosio/local_endpoint.hpp>
 
18 +
#include <boost/corosio/local_stream.hpp>
 
19 +
#include <boost/corosio/local_stream_socket.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <boost/capy/ex/execution_context.hpp>
 
22 +
#include <boost/capy/ex/io_env.hpp>
 
23 +
#include <boost/capy/concept/executor.hpp>
 
24 +

 
25 +
#include <system_error>
 
26 +

 
27 +
#include <cassert>
 
28 +
#include <concepts>
 
29 +
#include <coroutine>
 
30 +
#include <cstddef>
 
31 +
#include <stop_token>
 
32 +
#include <type_traits>
 
33 +

 
34 +
namespace boost::corosio {
 
35 +

 
36 +
/** Options for @ref local_stream_acceptor::bind. */
 
37 +
enum class bind_option
 
38 +
{
 
39 +
    /// Default: do not unlink the socket path.
 
40 +
    none,
 
41 +
    /// Unlink the socket path before binding (ignored for abstract paths).
 
42 +
    unlink_existing
 
43 +
};
 
44 +

 
45 +
/** An asynchronous Unix stream acceptor for coroutine I/O.
 
46 +

 
47 +
    This class provides asynchronous Unix domain stream accept
 
48 +
    operations that return awaitable types. The acceptor binds
 
49 +
    to a local endpoint (filesystem path) and listens for
 
50 +
    incoming connections.
 
51 +

 
52 +
    The library does NOT automatically unlink the socket path
 
53 +
    on close. Callers are responsible for removing the socket
 
54 +
    file before bind or after close, or pass
 
55 +
    @ref bind_option::unlink_existing to @ref bind.
 
56 +

 
57 +
    @par Thread Safety
 
58 +
    Distinct objects: Safe.@n
 
59 +
    Shared objects: Unsafe. An acceptor must not have concurrent
 
60 +
    accept operations.
 
61 +

 
62 +
    @par Semantics
 
63 +
    Wraps the platform Unix domain socket listener. Operations
 
64 +
    dispatch to OS accept APIs via the io_context reactor.
 
65 +
    Cancellation propagates through the IoAwaitable stop_token
 
66 +
    or via @ref cancel; cancelled operations resume with
 
67 +
    `errc::operation_canceled`.
 
68 +

 
69 +
    @par Example
 
70 +
    @code
 
71 +
    io_context ioc;
 
72 +
    local_stream_acceptor acc(ioc);
 
73 +
    acc.open();
 
74 +
    if (auto ec = acc.bind(
 
75 +
            local_endpoint("/tmp/my.sock"),
 
76 +
            bind_option::unlink_existing))
 
77 +
        return ec;
 
78 +
    if (auto ec = acc.listen())
 
79 +
        return ec;
 
80 +

 
81 +
    local_stream_socket peer(ioc);
 
82 +
    auto [ec] = co_await acc.accept(peer);
 
83 +
    if (!ec) {
 
84 +
        // peer is now connected
 
85 +
    }
 
86 +
    @endcode
 
87 +

 
88 +
    @see local_stream_socket, local_endpoint, local_stream
 
89 +
*/
 
90 +
class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
 
91 +
{
 
92 +
    struct move_accept_awaitable
 
93 +
    {
 
94 +
        local_stream_acceptor& acc_;
 
95 +
        std::stop_token token_;
 
96 +
        mutable std::error_code ec_;
 
97 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
98 +

 
99 +
        explicit move_accept_awaitable(
 
100 +
            local_stream_acceptor& acc) noexcept
 
101 +
            : acc_(acc)
 
102 +
        {
 
103 +
        }
 
104 +

 
105 +
        bool await_ready() const noexcept
 
106 +
        {
 
107 +
            return token_.stop_requested();
 
108 +
        }
 
109 +

 
110 +
        capy::io_result<local_stream_socket> await_resume() const noexcept
 
111 +
        {
 
112 +
            if (token_.stop_requested())
 
113 +
                return {make_error_code(std::errc::operation_canceled),
 
114 +
                        local_stream_socket()};
 
115 +

 
116 +
            if (ec_ || !peer_impl_)
 
117 +
                return {ec_, local_stream_socket()};
 
118 +

 
119 +
            local_stream_socket peer(acc_.ctx_);
 
120 +
            reset_peer_impl(peer, peer_impl_);
 
121 +
            return {ec_, std::move(peer)};
 
122 +
        }
 
123 +

 
124 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
125 +
            -> std::coroutine_handle<>
 
126 +
        {
 
127 +
            token_ = env->stop_token;
 
128 +
            if (token_.stop_requested())
 
129 +
            {
 
130 +
                ec_ = make_error_code(std::errc::operation_canceled);
 
131 +
                return h;
 
132 +
            }
 
133 +
            return acc_.get().accept(
 
134 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
135 +
        }
 
136 +
    };
 
137 +

 
138 +
    struct accept_awaitable
 
139 +
    {
 
140 +
        local_stream_acceptor& acc_;
 
141 +
        local_stream_socket& peer_;
 
142 +
        std::stop_token token_;
 
143 +
        mutable std::error_code ec_;
 
144 +
        mutable io_object::implementation* peer_impl_ = nullptr;
 
145 +

 
146 +
        accept_awaitable(
 
147 +
            local_stream_acceptor& acc, local_stream_socket& peer) noexcept
 
148 +
            : acc_(acc)
 
149 +
            , peer_(peer)
 
150 +
        {
 
151 +
        }
 
152 +

 
153 +
        bool await_ready() const noexcept
 
154 +
        {
 
155 +
            return token_.stop_requested();
 
156 +
        }
 
157 +

 
158 +
        capy::io_result<> await_resume() const noexcept
 
159 +
        {
 
160 +
            if (token_.stop_requested())
 
161 +
                return {make_error_code(std::errc::operation_canceled)};
 
162 +

 
163 +
            if (!ec_ && peer_impl_)
 
164 +
                peer_.h_.reset(peer_impl_);
 
165 +
            return {ec_};
 
166 +
        }
 
167 +

 
168 +
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
169 +
            -> std::coroutine_handle<>
 
170 +
        {
 
171 +
            token_ = env->stop_token;
 
172 +
            if (token_.stop_requested())
 
173 +
            {
 
174 +
                ec_ = make_error_code(std::errc::operation_canceled);
 
175 +
                return h;
 
176 +
            }
 
177 +
            return acc_.get().accept(
 
178 +
                h, env->executor, token_, &ec_, &peer_impl_);
 
179 +
        }
 
180 +
    };
 
181 +

 
182 +
public:
 
183 +
    ~local_stream_acceptor() override;
 
184 +

 
185 +
    explicit local_stream_acceptor(capy::execution_context& ctx);
 
186 +

 
187 +
    template<class Ex>
 
188 +
        requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_acceptor>) &&
 
189 +
        capy::Executor<Ex>
 
190 +
    explicit local_stream_acceptor(Ex const& ex) : local_stream_acceptor(ex.context())
 
191 +
    {
 
192 +
    }
 
193 +

 
194 +
    local_stream_acceptor(local_stream_acceptor&& other) noexcept
 
195 +
        : local_stream_acceptor(other.ctx_, std::move(other))
 
196 +
    {
 
197 +
    }
 
198 +

 
199 +
    local_stream_acceptor& operator=(local_stream_acceptor&& other) noexcept
 
200 +
    {
 
201 +
        assert(&ctx_ == &other.ctx_ &&
 
202 +
            "move-assign requires the same execution_context");
 
203 +
        if (this != &other)
 
204 +
        {
 
205 +
            close();
 
206 +
            io_object::operator=(std::move(other));
 
207 +
        }
 
208 +
        return *this;
 
209 +
    }
 
210 +

 
211 +
    local_stream_acceptor(local_stream_acceptor const&)            = delete;
 
212 +
    local_stream_acceptor& operator=(local_stream_acceptor const&) = delete;
 
213 +

 
214 +
    /** Create the acceptor socket.
 
215 +

 
216 +
        @param proto The protocol. Defaults to local_stream{}.
 
217 +

 
218 +
        @throws std::system_error on failure.
 
219 +
    */
 
220 +
    void open(local_stream proto = {});
 
221 +

 
222 +
    /** Bind to a local endpoint.
 
223 +

 
224 +
        @param ep The local endpoint (path) to bind to.
 
225 +
        @param opt Bind options. Pass bind_option::unlink_existing
 
226 +
            to unlink the socket path before binding (ignored for
 
227 +
            abstract sockets and empty endpoints).
 
228 +

 
229 +
        @return An error code on failure, empty on success.
 
230 +

 
231 +
        @throws std::logic_error if the acceptor is not open.
 
232 +
    */
 
233 +
    [[nodiscard]] std::error_code
 
234 +
    bind(corosio::local_endpoint ep,
 
235 +
         bind_option opt = bind_option::none);
 
236 +

 
237 +
    /** Start listening for incoming connections.
 
238 +

 
239 +
        @param backlog The maximum pending connection queue length.
 
240 +

 
241 +
        @return An error code on failure, empty on success.
 
242 +

 
243 +
        @throws std::logic_error if the acceptor is not open.
 
244 +
    */
 
245 +
    [[nodiscard]] std::error_code listen(int backlog = 128);
 
246 +

 
247 +
    /// Close the acceptor.
 
248 +
    void close();
 
249 +

 
250 +
    /** Check if the acceptor has a native handle.
 
251 +

 
252 +
        Returns true once @ref open succeeds and until @ref close is
 
253 +
        called. This does not indicate that @ref listen has been
 
254 +
        invoked — an open-but-not-listening acceptor will still
 
255 +
        report `true`.
 
256 +
    */
 
257 +
    bool is_open() const noexcept
 
258 +
    {
 
259 +
        return h_ && get().is_open();
 
260 +
    }
 
261 +

 
262 +
    /** Initiate an asynchronous accept operation.
 
263 +

 
264 +
        @param peer The socket to receive the accepted connection.
 
265 +

 
266 +
        @return An awaitable that completes with io_result<>.
 
267 +

 
268 +
        @throws std::logic_error if the native acceptor handle is
 
269 +
            absent (i.e., `!is_open()`). Calling accept on an
 
270 +
            open-but-not-listening acceptor does not throw; the
 
271 +
            awaitable completes with a kernel error such as
 
272 +
            `errc::invalid_argument` (EINVAL).
 
273 +
    */
 
274 +
    auto accept(local_stream_socket& peer)
 
275 +
    {
 
276 +
        if (!is_open())
 
277 +
            detail::throw_logic_error("accept: acceptor not open");
 
278 +
        return accept_awaitable(*this, peer);
 
279 +
    }
 
280 +

 
281 +
    /** Initiate an asynchronous accept, returning the socket.
 
282 +

 
283 +
        @return An awaitable that completes with
 
284 +
            io_result<local_stream_socket>.
 
285 +

 
286 +
        @throws std::logic_error if the native acceptor handle is
 
287 +
            absent (i.e., `!is_open()`). Calling accept on an
 
288 +
            open-but-not-listening acceptor does not throw; the
 
289 +
            awaitable completes with a kernel error such as
 
290 +
            `errc::invalid_argument` (EINVAL).
 
291 +
    */
 
292 +
    auto accept()
 
293 +
    {
 
294 +
        if (!is_open())
 
295 +
            detail::throw_logic_error("accept: acceptor not open");
 
296 +
        return move_accept_awaitable(*this);
 
297 +
    }
 
298 +

 
299 +
    void cancel();
 
300 +

 
301 +
    /** Release ownership of the native socket handle.
 
302 +

 
303 +
        Deregisters the acceptor from the reactor and cancels
 
304 +
        pending operations without closing the fd.
 
305 +

 
306 +
        @return The native handle.
 
307 +

 
308 +
        @throws std::logic_error if the acceptor is not open.
 
309 +
    */
 
310 +
    native_handle_type release();
 
311 +

 
312 +
    corosio::local_endpoint local_endpoint() const noexcept;
 
313 +

 
314 +
    template<class Option>
 
315 +
    void set_option(Option const& opt)
 
316 +
    {
 
317 +
        if (!is_open())
 
318 +
            detail::throw_logic_error("set_option: acceptor not open");
 
319 +
        std::error_code ec = get().set_option(
 
320 +
            Option::level(), Option::name(), opt.data(), opt.size());
 
321 +
        if (ec)
 
322 +
            detail::throw_system_error(ec, "local_stream_acceptor::set_option");
 
323 +
    }
 
324 +

 
325 +
    template<class Option>
 
326 +
    Option get_option() const
 
327 +
    {
 
328 +
        if (!is_open())
 
329 +
            detail::throw_logic_error("get_option: acceptor not open");
 
330 +
        Option opt{};
 
331 +
        std::size_t sz = opt.size();
 
332 +
        std::error_code ec =
 
333 +
            get().get_option(Option::level(), Option::name(), opt.data(), &sz);
 
334 +
        if (ec)
 
335 +
            detail::throw_system_error(ec, "local_stream_acceptor::get_option");
 
336 +
        opt.resize(sz);
 
337 +
        return opt;
 
338 +
    }
 
339 +

 
340 +
    /** Define backend hooks for local stream acceptor operations. */
 
341 +
    struct implementation : io_object::implementation
 
342 +
    {
 
343 +
        virtual std::coroutine_handle<> accept(
 
344 +
            std::coroutine_handle<>,
 
345 +
            capy::executor_ref,
 
346 +
            std::stop_token,
 
347 +
            std::error_code*,
 
348 +
            io_object::implementation**) = 0;
 
349 +

 
350 +
        virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
 
351 +

 
352 +
        virtual bool is_open() const noexcept = 0;
 
353 +

 
354 +
        virtual native_handle_type release_socket() noexcept = 0;
 
355 +

 
356 +
        virtual void cancel() noexcept = 0;
 
357 +

 
358 +
        virtual std::error_code set_option(
 
359 +
            int level,
 
360 +
            int optname,
 
361 +
            void const* data,
 
362 +
            std::size_t size) noexcept = 0;
 
363 +

 
364 +
        virtual std::error_code
 
365 +
        get_option(int level, int optname, void* data, std::size_t* size)
 
366 +
            const noexcept = 0;
 
367 +
    };
 
368 +

 
369 +
protected:
 
370 +
    local_stream_acceptor(handle h, capy::execution_context& ctx) noexcept
 
371 +
        : io_object(std::move(h))
 
372 +
        , ctx_(ctx)
 
373 +
    {
 
374 +
    }
 
375 +

 
376 +
    local_stream_acceptor(
 
377 +
        capy::execution_context& ctx, local_stream_acceptor&& other) noexcept
 
378 +
        : io_object(std::move(other))
 
379 +
        , ctx_(ctx)
 
380 +
    {
 
381 +
    }
 
382 +

 
383 +
    static void reset_peer_impl(
 
384 +
        local_stream_socket& peer, io_object::implementation* impl) noexcept
 
385 +
    {
 
386 +
        if (impl)
 
387 +
            peer.h_.reset(impl);
 
388 +
    }
 
389 +

 
390 +
private:
 
391 +
    capy::execution_context& ctx_;
 
392 +

 
393 +
    inline implementation& get() const noexcept
 
394 +
    {
 
395 +
        return *static_cast<implementation*>(h_.get());
 
396 +
    }
 
397 +
};
 
398 +

 
399 +
} // namespace boost::corosio
 
400 +

 
401 +
#endif // BOOST_COROSIO_LOCAL_STREAM_ACCEPTOR_HPP