LCOV - code coverage report
Current view: top level - capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 98.6 % 213 210 3
Test Date: 2026-06-05 15:12:41 Functions: 68.0 % 122 83 39

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/detail/buffer_array.hpp>
      17                 : #include <boost/capy/buffers/buffer_param.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <boost/capy/concept/write_sink.hpp>
      20                 : #include <coroutine>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/io_result.hpp>
      23                 : #include <boost/capy/io_task.hpp>
      24                 : 
      25                 : #include <concepts>
      26                 : #include <coroutine>
      27                 : #include <cstddef>
      28                 : #include <exception>
      29                 : #include <new>
      30                 : #include <span>
      31                 : #include <stop_token>
      32                 : #include <system_error>
      33                 : #include <utility>
      34                 : 
      35                 : namespace boost {
      36                 : namespace capy {
      37                 : 
      38                 : /** Type-erased wrapper for any WriteSink.
      39                 : 
      40                 :     This class provides type erasure for any type satisfying the
      41                 :     @ref WriteSink concept, enabling runtime polymorphism for
      42                 :     sink write operations. It uses cached awaitable storage to achieve
      43                 :     zero steady-state allocation after construction.
      44                 : 
      45                 :     The wrapper supports two construction modes:
      46                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      47                 :       allocates storage and owns the sink.
      48                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      49                 :       pointed-to sink must outlive this wrapper.
      50                 : 
      51                 :     @par Awaitable Preallocation
      52                 :     The constructor preallocates storage for the type-erased awaitable.
      53                 :     This reserves all virtual address space at server startup
      54                 :     so memory usage can be measured up front, rather than
      55                 :     allocating piecemeal as traffic arrives.
      56                 : 
      57                 :     @par Immediate Completion
      58                 :     Operations complete immediately without suspending when the
      59                 :     buffer sequence is empty, or when the underlying sink's
      60                 :     awaitable reports readiness via `await_ready`.
      61                 : 
      62                 :     @par Thread Safety
      63                 :     Not thread-safe. Concurrent operations on the same wrapper
      64                 :     are undefined behavior.
      65                 : 
      66                 :     @par Example
      67                 :     @code
      68                 :     // Owning - takes ownership of the sink
      69                 :     any_write_sink ws(some_sink{args...});
      70                 : 
      71                 :     // Reference - wraps without ownership
      72                 :     some_sink sink;
      73                 :     any_write_sink ws(&sink);
      74                 : 
      75                 :     const_buffer buf(data, size);
      76                 :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      77                 :     auto [ec2] = co_await ws.write_eof();
      78                 :     @endcode
      79                 : 
      80                 :     @see any_write_stream, WriteSink
      81                 : */
      82                 : class any_write_sink
      83                 : {
      84                 :     struct vtable;
      85                 :     struct write_awaitable_ops;
      86                 :     struct eof_awaitable_ops;
      87                 : 
      88                 :     template<WriteSink S>
      89                 :     struct vtable_for_impl;
      90                 : 
      91                 :     void* sink_ = nullptr;
      92                 :     vtable const* vt_ = nullptr;
      93                 :     void* cached_awaitable_ = nullptr;
      94                 :     void* storage_ = nullptr;
      95                 :     write_awaitable_ops const* active_write_ops_ = nullptr;
      96                 :     eof_awaitable_ops const* active_eof_ops_ = nullptr;
      97                 : 
      98                 : public:
      99                 :     /** Destructor.
     100                 : 
     101                 :         Destroys the owned sink (if any) and releases the cached
     102                 :         awaitable storage.
     103                 :     */
     104                 :     ~any_write_sink();
     105                 : 
     106                 :     /** Construct a default instance.
     107                 : 
     108                 :         Constructs an empty wrapper. Operations on a default-constructed
     109                 :         wrapper result in undefined behavior.
     110                 :     */
     111                 :     any_write_sink() = default;
     112                 : 
     113                 :     /** Non-copyable.
     114                 : 
     115                 :         The awaitable cache is per-instance and cannot be shared.
     116                 :     */
     117                 :     any_write_sink(any_write_sink const&) = delete;
     118                 :     any_write_sink& operator=(any_write_sink const&) = delete;
     119                 : 
     120                 :     /** Construct by moving.
     121                 : 
     122                 :         Transfers ownership of the wrapped sink (if owned) and
     123                 :         cached awaitable storage from `other`. After the move, `other` is
     124                 :         in a default-constructed state.
     125                 : 
     126                 :         @param other The wrapper to move from.
     127                 :     */
     128 HIT           1 :     any_write_sink(any_write_sink&& other) noexcept
     129               1 :         : sink_(std::exchange(other.sink_, nullptr))
     130               1 :         , vt_(std::exchange(other.vt_, nullptr))
     131               1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     132               1 :         , storage_(std::exchange(other.storage_, nullptr))
     133               1 :         , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
     134               1 :         , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
     135                 :     {
     136               1 :     }
     137                 : 
     138                 :     /** Assign by moving.
     139                 : 
     140                 :         Destroys any owned sink and releases existing resources,
     141                 :         then transfers ownership from `other`.
     142                 : 
     143                 :         @param other The wrapper to move from.
     144                 :         @return Reference to this wrapper.
     145                 :     */
     146                 :     any_write_sink&
     147                 :     operator=(any_write_sink&& other) noexcept;
     148                 : 
     149                 :     /** Construct by taking ownership of a WriteSink.
     150                 : 
     151                 :         Allocates storage and moves the sink into this wrapper.
     152                 :         The wrapper owns the sink and will destroy it.
     153                 : 
     154                 :         @param s The sink to take ownership of.
     155                 :     */
     156                 :     template<WriteSink S>
     157                 :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     158                 :     any_write_sink(S s);
     159                 : 
     160                 :     /** Construct by wrapping a WriteSink without ownership.
     161                 : 
     162                 :         Wraps the given sink by pointer. The sink must remain
     163                 :         valid for the lifetime of this wrapper.
     164                 : 
     165                 :         @param s Pointer to the sink to wrap.
     166                 :     */
     167                 :     template<WriteSink S>
     168                 :     any_write_sink(S* s);
     169                 : 
     170                 :     /** Check if the wrapper contains a valid sink.
     171                 : 
     172                 :         @return `true` if wrapping a sink, `false` if default-constructed
     173                 :             or moved-from.
     174                 :     */
     175                 :     bool
     176              18 :     has_value() const noexcept
     177                 :     {
     178              18 :         return sink_ != nullptr;
     179                 :     }
     180                 : 
     181                 :     /** Check if the wrapper contains a valid sink.
     182                 : 
     183                 :         @return `true` if wrapping a sink, `false` if default-constructed
     184                 :             or moved-from.
     185                 :     */
     186                 :     explicit
     187               2 :     operator bool() const noexcept
     188                 :     {
     189               2 :         return has_value();
     190                 :     }
     191                 : 
     192                 :     /** Initiate a partial write operation.
     193                 : 
     194                 :         Attempt to write up to `buffer_size( buffers )` bytes from
     195                 :         the provided buffer sequence. May consume less than the
     196                 :         full sequence.
     197                 : 
     198                 :         @param buffers The buffer sequence containing data to write.
     199                 : 
     200                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     201                 : 
     202                 :         @par Immediate Completion
     203                 :         The operation completes immediately without suspending
     204                 :         the calling coroutine when:
     205                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     206                 :         @li The underlying sink's awaitable reports immediate
     207                 :             readiness via `await_ready`.
     208                 : 
     209                 :         @note This is a partial operation and may not process the
     210                 :         entire buffer sequence. Use @ref write for guaranteed
     211                 :         complete transfer.
     212                 : 
     213                 :         @par Preconditions
     214                 :         The wrapper must contain a valid sink (`has_value() == true`).
     215                 :     */
     216                 :     template<ConstBufferSequence CB>
     217                 :     auto
     218                 :     write_some(CB buffers);
     219                 : 
     220                 :     /** Initiate a complete write operation.
     221                 : 
     222                 :         Writes data from the provided buffer sequence. The operation
     223                 :         completes when all bytes have been consumed, or an error
     224                 :         occurs. Forwards to the underlying sink's `write` operation,
     225                 :         windowed through @ref buffer_param when the sequence exceeds
     226                 :         the per-call buffer limit.
     227                 : 
     228                 :         @param buffers The buffer sequence containing data to write.
     229                 : 
     230                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     231                 : 
     232                 :         @par Immediate Completion
     233                 :         The operation completes immediately without suspending
     234                 :         the calling coroutine when:
     235                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     236                 :         @li Every underlying `write` call completes
     237                 :             immediately (the wrapped sink reports readiness
     238                 :             via `await_ready` on each iteration).
     239                 : 
     240                 :         @par Preconditions
     241                 :         The wrapper must contain a valid sink (`has_value() == true`).
     242                 :     */
     243                 :     template<ConstBufferSequence CB>
     244                 :     io_task<std::size_t>
     245                 :     write(CB buffers);
     246                 : 
     247                 :     /** Atomically write data and signal end-of-stream.
     248                 : 
     249                 :         Writes all data from the buffer sequence and then signals
     250                 :         end-of-stream. The implementation decides how to partition
     251                 :         the data across calls to the underlying sink's @ref write
     252                 :         and `write_eof`. When the caller's buffer sequence is
     253                 :         non-empty, the final call to the underlying sink is always
     254                 :         `write_eof` with a non-empty buffer sequence. When the
     255                 :         caller's buffer sequence is empty, only `write_eof()` with
     256                 :         no data is called.
     257                 : 
     258                 :         @param buffers The buffer sequence containing data to write.
     259                 : 
     260                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     261                 : 
     262                 :         @par Immediate Completion
     263                 :         The operation completes immediately without suspending
     264                 :         the calling coroutine when:
     265                 :         @li The buffer sequence is empty. Only the @ref write_eof()
     266                 :             call is performed.
     267                 :         @li All underlying operations complete immediately (the
     268                 :             wrapped sink reports readiness via `await_ready`).
     269                 : 
     270                 :         @par Preconditions
     271                 :         The wrapper must contain a valid sink (`has_value() == true`).
     272                 :     */
     273                 :     template<ConstBufferSequence CB>
     274                 :     io_task<std::size_t>
     275                 :     write_eof(CB buffers);
     276                 : 
     277                 :     /** Signal end of data.
     278                 : 
     279                 :         Indicates that no more data will be written to the sink.
     280                 :         The operation completes when the sink is finalized, or
     281                 :         an error occurs.
     282                 : 
     283                 :         @return An awaitable that await-returns `(error_code)`.
     284                 : 
     285                 :         @par Immediate Completion
     286                 :         The operation completes immediately without suspending
     287                 :         the calling coroutine when the underlying sink's awaitable
     288                 :         reports immediate readiness via `await_ready`.
     289                 : 
     290                 :         @par Preconditions
     291                 :         The wrapper must contain a valid sink (`has_value() == true`).
     292                 :     */
     293                 :     auto
     294                 :     write_eof();
     295                 : 
     296                 : protected:
     297                 :     /** Rebind to a new sink after move.
     298                 : 
     299                 :         Updates the internal pointer to reference a new sink object.
     300                 :         Used by owning wrappers after move assignment when the owned
     301                 :         object has moved to a new location.
     302                 : 
     303                 :         @param new_sink The new sink to bind to. Must be the same
     304                 :             type as the original sink.
     305                 : 
     306                 :         @note Terminates if called with a sink of different type
     307                 :             than the original.
     308                 :     */
     309                 :     template<WriteSink S>
     310                 :     void
     311                 :     rebind(S& new_sink) noexcept
     312                 :     {
     313                 :         if(vt_ != &vtable_for_impl<S>::value)
     314                 :             std::terminate();
     315                 :         sink_ = &new_sink;
     316                 :     }
     317                 : 
     318                 : private:
     319                 :     auto
     320                 :     write_some_(std::span<const_buffer const> buffers);
     321                 : 
     322                 :     auto
     323                 :     write_(std::span<const_buffer const> buffers);
     324                 : 
     325                 :     auto
     326                 :     write_eof_buffers_(std::span<const_buffer const> buffers);
     327                 : };
     328                 : 
     329                 : struct any_write_sink::write_awaitable_ops
     330                 : {
     331                 :     bool (*await_ready)(void*);
     332                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     333                 :     io_result<std::size_t> (*await_resume)(void*);
     334                 :     void (*destroy)(void*) noexcept;
     335                 : };
     336                 : 
     337                 : struct any_write_sink::eof_awaitable_ops
     338                 : {
     339                 :     bool (*await_ready)(void*);
     340                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     341                 :     io_result<> (*await_resume)(void*);
     342                 :     void (*destroy)(void*) noexcept;
     343                 : };
     344                 : 
     345                 : struct any_write_sink::vtable
     346                 : {
     347                 :     write_awaitable_ops const* (*construct_write_some_awaitable)(
     348                 :         void* sink,
     349                 :         void* storage,
     350                 :         std::span<const_buffer const> buffers);
     351                 :     write_awaitable_ops const* (*construct_write_awaitable)(
     352                 :         void* sink,
     353                 :         void* storage,
     354                 :         std::span<const_buffer const> buffers);
     355                 :     write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
     356                 :         void* sink,
     357                 :         void* storage,
     358                 :         std::span<const_buffer const> buffers);
     359                 :     eof_awaitable_ops const* (*construct_eof_awaitable)(
     360                 :         void* sink,
     361                 :         void* storage);
     362                 :     std::size_t awaitable_size;
     363                 :     std::size_t awaitable_align;
     364                 :     void (*destroy)(void*) noexcept;
     365                 : };
     366                 : 
     367                 : template<WriteSink S>
     368                 : struct any_write_sink::vtable_for_impl
     369                 : {
     370                 :     using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
     371                 :         std::span<const_buffer const>{}));
     372                 :     using WriteAwaitable = decltype(std::declval<S&>().write(
     373                 :         std::span<const_buffer const>{}));
     374                 :     using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
     375                 :         std::span<const_buffer const>{}));
     376                 :     using EofAwaitable = decltype(std::declval<S&>().write_eof());
     377                 : 
     378                 :     static void
     379               8 :     do_destroy_impl(void* sink) noexcept
     380                 :     {
     381               8 :         static_cast<S*>(sink)->~S();
     382               8 :     }
     383                 : 
     384                 :     static write_awaitable_ops const*
     385              41 :     construct_write_some_awaitable_impl(
     386                 :         void* sink,
     387                 :         void* storage,
     388                 :         std::span<const_buffer const> buffers)
     389                 :     {
     390              41 :         auto& s = *static_cast<S*>(sink);
     391              41 :         ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
     392                 : 
     393                 :         static constexpr write_awaitable_ops ops = {
     394              41 :             +[](void* p) {
     395              41 :                 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
     396                 :             },
     397               3 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     398               3 :                 return detail::call_await_suspend(
     399               3 :                     static_cast<WriteSomeAwaitable*>(p), h, env);
     400                 :             },
     401              39 :             +[](void* p) {
     402              39 :                 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
     403                 :             },
     404              43 :             +[](void* p) noexcept {
     405               2 :                 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
     406                 :             }
     407                 :         };
     408              41 :         return &ops;
     409                 :     }
     410                 : 
     411                 :     static write_awaitable_ops const*
     412              79 :     construct_write_awaitable_impl(
     413                 :         void* sink,
     414                 :         void* storage,
     415                 :         std::span<const_buffer const> buffers)
     416                 :     {
     417              79 :         auto& s = *static_cast<S*>(sink);
     418              79 :         ::new(storage) WriteAwaitable(s.write(buffers));
     419                 : 
     420                 :         static constexpr write_awaitable_ops ops = {
     421              79 :             +[](void* p) {
     422              79 :                 return static_cast<WriteAwaitable*>(p)->await_ready();
     423                 :             },
     424               1 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     425               1 :                 return detail::call_await_suspend(
     426               1 :                     static_cast<WriteAwaitable*>(p), h, env);
     427                 :             },
     428              79 :             +[](void* p) {
     429              79 :                 return static_cast<WriteAwaitable*>(p)->await_resume();
     430                 :             },
     431              79 :             +[](void* p) noexcept {
     432 MIS           0 :                 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
     433                 :             }
     434                 :         };
     435 HIT          79 :         return &ops;
     436                 :     }
     437                 : 
     438                 :     static write_awaitable_ops const*
     439              17 :     construct_write_eof_buffers_awaitable_impl(
     440                 :         void* sink,
     441                 :         void* storage,
     442                 :         std::span<const_buffer const> buffers)
     443                 :     {
     444              17 :         auto& s = *static_cast<S*>(sink);
     445              17 :         ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
     446                 : 
     447                 :         static constexpr write_awaitable_ops ops = {
     448              17 :             +[](void* p) {
     449              17 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
     450                 :             },
     451               1 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     452               1 :                 return detail::call_await_suspend(
     453               1 :                     static_cast<WriteEofBuffersAwaitable*>(p), h, env);
     454                 :             },
     455              17 :             +[](void* p) {
     456              17 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
     457                 :             },
     458              17 :             +[](void* p) noexcept {
     459 MIS           0 :                 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
     460                 :             }
     461                 :         };
     462 HIT          17 :         return &ops;
     463                 :     }
     464                 : 
     465                 :     static eof_awaitable_ops const*
     466              19 :     construct_eof_awaitable_impl(
     467                 :         void* sink,
     468                 :         void* storage)
     469                 :     {
     470              19 :         auto& s = *static_cast<S*>(sink);
     471              19 :         ::new(storage) EofAwaitable(s.write_eof());
     472                 : 
     473                 :         static constexpr eof_awaitable_ops ops = {
     474              19 :             +[](void* p) {
     475              19 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     476                 :             },
     477               3 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     478               3 :                 return detail::call_await_suspend(
     479               3 :                     static_cast<EofAwaitable*>(p), h, env);
     480                 :             },
     481              17 :             +[](void* p) {
     482              17 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     483                 :             },
     484              21 :             +[](void* p) noexcept {
     485               2 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     486                 :             }
     487                 :         };
     488              19 :         return &ops;
     489                 :     }
     490                 : 
     491                 :     static constexpr std::size_t max4(
     492                 :         std::size_t a, std::size_t b,
     493                 :         std::size_t c, std::size_t d) noexcept
     494                 :     {
     495                 :         std::size_t ab = a > b ? a : b;
     496                 :         std::size_t cd = c > d ? c : d;
     497                 :         return ab > cd ? ab : cd;
     498                 :     }
     499                 : 
     500                 :     static constexpr std::size_t max_awaitable_size =
     501                 :         max4(sizeof(WriteSomeAwaitable),
     502                 :              sizeof(WriteAwaitable),
     503                 :              sizeof(WriteEofBuffersAwaitable),
     504                 :              sizeof(EofAwaitable));
     505                 : 
     506                 :     static constexpr std::size_t max_awaitable_align =
     507                 :         max4(alignof(WriteSomeAwaitable),
     508                 :              alignof(WriteAwaitable),
     509                 :              alignof(WriteEofBuffersAwaitable),
     510                 :              alignof(EofAwaitable));
     511                 : 
     512                 :     static constexpr vtable value = {
     513                 :         &construct_write_some_awaitable_impl,
     514                 :         &construct_write_awaitable_impl,
     515                 :         &construct_write_eof_buffers_awaitable_impl,
     516                 :         &construct_eof_awaitable_impl,
     517                 :         max_awaitable_size,
     518                 :         max_awaitable_align,
     519                 :         &do_destroy_impl
     520                 :     };
     521                 : };
     522                 : 
     523                 : inline
     524             134 : any_write_sink::~any_write_sink()
     525                 : {
     526             134 :     if(storage_)
     527                 :     {
     528               7 :         vt_->destroy(sink_);
     529               7 :         ::operator delete(storage_);
     530                 :     }
     531             134 :     if(cached_awaitable_)
     532                 :     {
     533             126 :         if(active_write_ops_)
     534               1 :             active_write_ops_->destroy(cached_awaitable_);
     535             125 :         else if(active_eof_ops_)
     536               1 :             active_eof_ops_->destroy(cached_awaitable_);
     537             126 :         ::operator delete(cached_awaitable_);
     538                 :     }
     539             134 : }
     540                 : 
     541                 : inline any_write_sink&
     542               4 : any_write_sink::operator=(any_write_sink&& other) noexcept
     543                 : {
     544               4 :     if(this != &other)
     545                 :     {
     546               4 :         if(storage_)
     547                 :         {
     548               1 :             vt_->destroy(sink_);
     549               1 :             ::operator delete(storage_);
     550                 :         }
     551               4 :         if(cached_awaitable_)
     552                 :         {
     553               3 :             if(active_write_ops_)
     554               1 :                 active_write_ops_->destroy(cached_awaitable_);
     555               2 :             else if(active_eof_ops_)
     556               1 :                 active_eof_ops_->destroy(cached_awaitable_);
     557               3 :             ::operator delete(cached_awaitable_);
     558                 :         }
     559               4 :         sink_ = std::exchange(other.sink_, nullptr);
     560               4 :         vt_ = std::exchange(other.vt_, nullptr);
     561               4 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     562               4 :         storage_ = std::exchange(other.storage_, nullptr);
     563               4 :         active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
     564               4 :         active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
     565                 :     }
     566               4 :     return *this;
     567                 : }
     568                 : 
     569                 : template<WriteSink S>
     570                 :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     571               9 : any_write_sink::any_write_sink(S s)
     572               9 :     : vt_(&vtable_for_impl<S>::value)
     573                 : {
     574                 :     struct guard {
     575                 :         any_write_sink* self;
     576                 :         bool committed = false;
     577               9 :         ~guard() {
     578               9 :             if(!committed && self->storage_) {
     579                 :                 // sink_ is null if the sink move-ctor threw before
     580                 :                 // the placement-new assigned it.
     581               1 :                 if(self->sink_)
     582 MIS           0 :                     self->vt_->destroy(self->sink_);
     583 HIT           1 :                 ::operator delete(self->storage_);
     584               1 :                 self->storage_ = nullptr;
     585               1 :                 self->sink_ = nullptr;
     586                 :             }
     587               9 :         }
     588               9 :     } g{this};
     589                 : 
     590               9 :     storage_ = ::operator new(sizeof(S));
     591               9 :     sink_ = ::new(storage_) S(std::move(s));
     592                 : 
     593                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     594               8 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     595                 : 
     596               8 :     g.committed = true;
     597               9 : }
     598                 : 
     599                 : template<WriteSink S>
     600             121 : any_write_sink::any_write_sink(S* s)
     601             121 :     : sink_(s)
     602             121 :     , vt_(&vtable_for_impl<S>::value)
     603                 : {
     604                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     605             121 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     606             121 : }
     607                 : 
     608                 : inline auto
     609                 : any_write_sink::write_some_(
     610                 :     std::span<const_buffer const> buffers)
     611                 : {
     612                 :     struct awaitable
     613                 :     {
     614                 :         any_write_sink* self_;
     615                 :         std::span<const_buffer const> buffers_;
     616                 : 
     617                 :         bool
     618                 :         await_ready() const noexcept
     619                 :         {
     620                 :             return false;
     621                 :         }
     622                 : 
     623                 :         std::coroutine_handle<>
     624                 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     625                 :         {
     626                 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     627                 :                 self_->sink_,
     628                 :                 self_->cached_awaitable_,
     629                 :                 buffers_);
     630                 : 
     631                 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     632                 :                 return h;
     633                 : 
     634                 :             return self_->active_write_ops_->await_suspend(
     635                 :                 self_->cached_awaitable_, h, env);
     636                 :         }
     637                 : 
     638                 :         io_result<std::size_t>
     639                 :         await_resume()
     640                 :         {
     641                 :             struct guard {
     642                 :                 any_write_sink* self;
     643                 :                 ~guard() {
     644                 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     645                 :                     self->active_write_ops_ = nullptr;
     646                 :                 }
     647                 :             } g{self_};
     648                 :             return self_->active_write_ops_->await_resume(
     649                 :                 self_->cached_awaitable_);
     650                 :         }
     651                 :     };
     652                 :     return awaitable{this, buffers};
     653                 : }
     654                 : 
     655                 : inline auto
     656              79 : any_write_sink::write_(
     657                 :     std::span<const_buffer const> buffers)
     658                 : {
     659                 :     struct awaitable
     660                 :     {
     661                 :         any_write_sink* self_;
     662                 :         std::span<const_buffer const> buffers_;
     663                 : 
     664                 :         bool
     665              79 :         await_ready() const noexcept
     666                 :         {
     667              79 :             return false;
     668                 :         }
     669                 : 
     670                 :         std::coroutine_handle<>
     671              79 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     672                 :         {
     673             158 :             self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
     674              79 :                 self_->sink_,
     675              79 :                 self_->cached_awaitable_,
     676                 :                 buffers_);
     677                 : 
     678              79 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     679              78 :                 return h;
     680                 : 
     681               1 :             return self_->active_write_ops_->await_suspend(
     682               1 :                 self_->cached_awaitable_, h, env);
     683                 :         }
     684                 : 
     685                 :         io_result<std::size_t>
     686              79 :         await_resume()
     687                 :         {
     688                 :             struct guard {
     689                 :                 any_write_sink* self;
     690              79 :                 ~guard() {
     691              79 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     692              79 :                     self->active_write_ops_ = nullptr;
     693              79 :                 }
     694              79 :             } g{self_};
     695              79 :             return self_->active_write_ops_->await_resume(
     696             137 :                 self_->cached_awaitable_);
     697              79 :         }
     698                 :     };
     699              79 :     return awaitable{this, buffers};
     700                 : }
     701                 : 
     702                 : inline auto
     703              19 : any_write_sink::write_eof()
     704                 : {
     705                 :     struct awaitable
     706                 :     {
     707                 :         any_write_sink* self_;
     708                 : 
     709                 :         bool
     710              19 :         await_ready() const noexcept
     711                 :         {
     712              19 :             return false;
     713                 :         }
     714                 : 
     715                 :         std::coroutine_handle<>
     716              19 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     717                 :         {
     718                 :             // Construct the underlying awaitable into cached storage
     719              38 :             self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
     720              19 :                 self_->sink_,
     721              19 :                 self_->cached_awaitable_);
     722                 : 
     723                 :             // Check if underlying is immediately ready
     724              19 :             if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
     725              16 :                 return h;
     726                 : 
     727                 :             // Forward to underlying awaitable
     728               3 :             return self_->active_eof_ops_->await_suspend(
     729               3 :                 self_->cached_awaitable_, h, env);
     730                 :         }
     731                 : 
     732                 :         io_result<>
     733              17 :         await_resume()
     734                 :         {
     735                 :             struct guard {
     736                 :                 any_write_sink* self;
     737              17 :                 ~guard() {
     738              17 :                     self->active_eof_ops_->destroy(self->cached_awaitable_);
     739              17 :                     self->active_eof_ops_ = nullptr;
     740              17 :                 }
     741              17 :             } g{self_};
     742              17 :             return self_->active_eof_ops_->await_resume(
     743              29 :                 self_->cached_awaitable_);
     744              17 :         }
     745                 :     };
     746              19 :     return awaitable{this};
     747                 : }
     748                 : 
     749                 : inline auto
     750              17 : any_write_sink::write_eof_buffers_(
     751                 :     std::span<const_buffer const> buffers)
     752                 : {
     753                 :     struct awaitable
     754                 :     {
     755                 :         any_write_sink* self_;
     756                 :         std::span<const_buffer const> buffers_;
     757                 : 
     758                 :         bool
     759              17 :         await_ready() const noexcept
     760                 :         {
     761              17 :             return false;
     762                 :         }
     763                 : 
     764                 :         std::coroutine_handle<>
     765              17 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     766                 :         {
     767              34 :             self_->active_write_ops_ =
     768              34 :                 self_->vt_->construct_write_eof_buffers_awaitable(
     769              17 :                     self_->sink_,
     770              17 :                     self_->cached_awaitable_,
     771                 :                     buffers_);
     772                 : 
     773              17 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     774              16 :                 return h;
     775                 : 
     776               1 :             return self_->active_write_ops_->await_suspend(
     777               1 :                 self_->cached_awaitable_, h, env);
     778                 :         }
     779                 : 
     780                 :         io_result<std::size_t>
     781              17 :         await_resume()
     782                 :         {
     783                 :             struct guard {
     784                 :                 any_write_sink* self;
     785              17 :                 ~guard() {
     786              17 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     787              17 :                     self->active_write_ops_ = nullptr;
     788              17 :                 }
     789              17 :             } g{self_};
     790              17 :             return self_->active_write_ops_->await_resume(
     791              29 :                 self_->cached_awaitable_);
     792              17 :         }
     793                 :     };
     794              17 :     return awaitable{this, buffers};
     795                 : }
     796                 : 
     797                 : template<ConstBufferSequence CB>
     798                 : auto
     799              43 : any_write_sink::write_some(CB buffers)
     800                 : {
     801                 :     struct awaitable
     802                 :     {
     803                 :         any_write_sink* self_;
     804                 :         detail::const_buffer_array<detail::max_iovec_> ba_;
     805                 : 
     806              43 :         awaitable(
     807                 :             any_write_sink* self,
     808                 :             CB const& buffers)
     809              43 :             : self_(self)
     810              43 :             , ba_(buffers)
     811                 :         {
     812              43 :         }
     813                 : 
     814                 :         bool
     815              43 :         await_ready() const noexcept
     816                 :         {
     817              43 :             return ba_.to_span().empty();
     818                 :         }
     819                 : 
     820                 :         std::coroutine_handle<>
     821              41 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     822                 :         {
     823              41 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     824              41 :                 self_->sink_,
     825              41 :                 self_->cached_awaitable_,
     826              41 :                 ba_.to_span());
     827                 : 
     828              41 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     829              38 :                 return h;
     830                 : 
     831               3 :             return self_->active_write_ops_->await_suspend(
     832               3 :                 self_->cached_awaitable_, h, env);
     833                 :         }
     834                 : 
     835                 :         io_result<std::size_t>
     836              41 :         await_resume()
     837                 :         {
     838              41 :             if(ba_.to_span().empty())
     839               2 :                 return {{}, 0};
     840                 : 
     841                 :             struct guard {
     842                 :                 any_write_sink* self;
     843              39 :                 ~guard() {
     844              39 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     845              39 :                     self->active_write_ops_ = nullptr;
     846              39 :                 }
     847              39 :             } g{self_};
     848              39 :             return self_->active_write_ops_->await_resume(
     849              39 :                 self_->cached_awaitable_);
     850              39 :         }
     851                 :     };
     852              43 :     return awaitable{this, buffers};
     853                 : }
     854                 : 
     855                 : template<ConstBufferSequence CB>
     856                 : io_task<std::size_t>
     857              69 : any_write_sink::write(CB buffers)
     858                 : {
     859                 :     buffer_param<CB> bp(buffers);
     860                 :     std::size_t total = 0;
     861                 : 
     862                 :     for(;;)
     863                 :     {
     864                 :         auto bufs = bp.data();
     865                 :         if(bufs.empty())
     866                 :             break;
     867                 : 
     868                 :         auto [ec, n] = co_await write_(bufs);
     869                 :         total += n;
     870                 :         if(ec)
     871                 :             co_return {ec, total};
     872                 :         bp.consume(n);
     873                 :     }
     874                 : 
     875                 :     co_return {{}, total};
     876             138 : }
     877                 : 
     878                 : template<ConstBufferSequence CB>
     879                 : io_task<std::size_t>
     880              27 : any_write_sink::write_eof(CB buffers)
     881                 : {
     882                 :     const_buffer_param<CB> bp(buffers);
     883                 :     std::size_t total = 0;
     884                 : 
     885                 :     for(;;)
     886                 :     {
     887                 :         auto bufs = bp.data();
     888                 :         if(bufs.empty())
     889                 :         {
     890                 :             auto [ec] = co_await write_eof();
     891                 :             co_return {ec, total};
     892                 :         }
     893                 : 
     894                 :         if(! bp.more())
     895                 :         {
     896                 :             // Last window — send atomically with EOF
     897                 :             auto [ec, n] = co_await write_eof_buffers_(bufs);
     898                 :             total += n;
     899                 :             co_return {ec, total};
     900                 :         }
     901                 : 
     902                 :         auto [ec, n] = co_await write_(bufs);
     903                 :         total += n;
     904                 :         if(ec)
     905                 :             co_return {ec, total};
     906                 :         bp.consume(n);
     907                 :     }
     908              54 : }
     909                 : 
     910                 : } // namespace capy
     911                 : } // namespace boost
     912                 : 
     913                 : #endif
        

Generated by: LCOV version 2.3