LCOV - code coverage report
Current view: top level - capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 99.0 % 102 101 1
Test Date: 2026-06-05 15:12:41 Functions: 83.6 % 67 56 11

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/detail/buffer_array.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/write_stream.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/io_result.hpp>
      22                 : 
      23                 : #include <concepts>
      24                 : #include <coroutine>
      25                 : #include <cstddef>
      26                 : #include <exception>
      27                 : #include <new>
      28                 : #include <span>
      29                 : #include <stop_token>
      30                 : #include <system_error>
      31                 : #include <utility>
      32                 : 
      33                 : namespace boost {
      34                 : namespace capy {
      35                 : 
      36                 : /** Type-erased wrapper for any WriteStream.
      37                 : 
      38                 :     This class provides type erasure for any type satisfying the
      39                 :     @ref WriteStream concept, enabling runtime polymorphism for
      40                 :     write operations. It uses cached awaitable storage to achieve
      41                 :     zero steady-state allocation after construction.
      42                 : 
      43                 :     The wrapper supports two construction modes:
      44                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45                 :       allocates storage and owns the stream.
      46                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      47                 :       pointed-to stream must outlive this wrapper.
      48                 : 
      49                 :     @par Awaitable Preallocation
      50                 :     The constructor preallocates storage for the type-erased awaitable.
      51                 :     This reserves all virtual address space at server startup
      52                 :     so memory usage can be measured up front, rather than
      53                 :     allocating piecemeal as traffic arrives.
      54                 : 
      55                 :     @par Immediate Completion
      56                 :     Operations complete immediately without suspending when the
      57                 :     buffer sequence is empty, or when the underlying stream's
      58                 :     awaitable reports readiness via `await_ready`.
      59                 : 
      60                 :     @par Thread Safety
      61                 :     Not thread-safe. Concurrent operations on the same wrapper
      62                 :     are undefined behavior.
      63                 : 
      64                 :     @par Example
      65                 :     @code
      66                 :     // Owning - takes ownership of the stream
      67                 :     any_write_stream stream(socket{ioc});
      68                 : 
      69                 :     // Reference - wraps without ownership
      70                 :     socket sock(ioc);
      71                 :     any_write_stream stream(&sock);
      72                 : 
      73                 :     const_buffer buf(data, size);
      74                 :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      75                 :     @endcode
      76                 : 
      77                 :     @see any_read_stream, any_stream, WriteStream
      78                 : */
      79                 : class any_write_stream
      80                 : {
      81                 :     struct vtable;
      82                 : 
      83                 :     template<WriteStream S>
      84                 :     struct vtable_for_impl;
      85                 : 
      86                 :     // ordered for cache line coherence
      87                 :     void* stream_ = nullptr;
      88                 :     vtable const* vt_ = nullptr;
      89                 :     void* cached_awaitable_ = nullptr;
      90                 :     void* storage_ = nullptr;
      91                 :     bool awaitable_active_ = false;
      92                 : 
      93                 : public:
      94                 :     /** Destructor.
      95                 : 
      96                 :         Destroys the owned stream (if any) and releases the cached
      97                 :         awaitable storage.
      98                 :     */
      99                 :     ~any_write_stream();
     100                 : 
     101                 :     /** Construct a default instance.
     102                 : 
     103                 :         Constructs an empty wrapper. Operations on a default-constructed
     104                 :         wrapper result in undefined behavior.
     105                 :     */
     106 HIT           3 :     any_write_stream() = default;
     107                 : 
     108                 :     /** Non-copyable.
     109                 : 
     110                 :         The awaitable cache is per-instance and cannot be shared.
     111                 :     */
     112                 :     any_write_stream(any_write_stream const&) = delete;
     113                 :     any_write_stream& operator=(any_write_stream const&) = delete;
     114                 : 
     115                 :     /** Construct by moving.
     116                 : 
     117                 :         Transfers ownership of the wrapped stream (if owned) and
     118                 :         cached awaitable storage from `other`. After the move, `other` is
     119                 :         in a default-constructed state.
     120                 : 
     121                 :         @param other The wrapper to move from.
     122                 :     */
     123               2 :     any_write_stream(any_write_stream&& other) noexcept
     124               2 :         : stream_(std::exchange(other.stream_, nullptr))
     125               2 :         , vt_(std::exchange(other.vt_, nullptr))
     126               2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     127               2 :         , storage_(std::exchange(other.storage_, nullptr))
     128               2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     129                 :     {
     130               2 :     }
     131                 : 
     132                 :     /** Assign by moving.
     133                 : 
     134                 :         Destroys any owned stream and releases existing resources,
     135                 :         then transfers ownership from `other`.
     136                 : 
     137                 :         @param other The wrapper to move from.
     138                 :         @return Reference to this wrapper.
     139                 :     */
     140                 :     any_write_stream&
     141                 :     operator=(any_write_stream&& other) noexcept;
     142                 : 
     143                 :     /** Construct by taking ownership of a WriteStream.
     144                 : 
     145                 :         Allocates storage and moves the stream into this wrapper.
     146                 :         The wrapper owns the stream and will destroy it.
     147                 : 
     148                 :         @param s The stream to take ownership of.
     149                 :     */
     150                 :     template<WriteStream S>
     151                 :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     152                 :     any_write_stream(S s);
     153                 : 
     154                 :     /** Construct by wrapping a WriteStream without ownership.
     155                 : 
     156                 :         Wraps the given stream by pointer. The stream must remain
     157                 :         valid for the lifetime of this wrapper.
     158                 : 
     159                 :         @param s Pointer to the stream to wrap.
     160                 :     */
     161                 :     template<WriteStream S>
     162                 :     any_write_stream(S* s);
     163                 : 
     164                 :     /** Check if the wrapper contains a valid stream.
     165                 : 
     166                 :         @return `true` if wrapping a stream, `false` if default-constructed
     167                 :             or moved-from.
     168                 :     */
     169                 :     bool
     170              26 :     has_value() const noexcept
     171                 :     {
     172              26 :         return stream_ != nullptr;
     173                 :     }
     174                 : 
     175                 :     /** Check if the wrapper contains a valid stream.
     176                 : 
     177                 :         @return `true` if wrapping a stream, `false` if default-constructed
     178                 :             or moved-from.
     179                 :     */
     180                 :     explicit
     181               3 :     operator bool() const noexcept
     182                 :     {
     183               3 :         return has_value();
     184                 :     }
     185                 : 
     186                 :     /** Initiate an asynchronous write operation.
     187                 : 
     188                 :         Writes data from the provided buffer sequence. The operation
     189                 :         completes when at least one byte has been written, or an error
     190                 :         occurs.
     191                 : 
     192                 :         @param buffers The buffer sequence containing data to write.
     193                 :             Passed by value to ensure the sequence lives in the
     194                 :             coroutine frame across suspension points.
     195                 : 
     196                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     197                 : 
     198                 :         @par Immediate Completion
     199                 :         The operation completes immediately without suspending
     200                 :         the calling coroutine when:
     201                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     202                 :         @li The underlying stream's awaitable reports immediate
     203                 :             readiness via `await_ready`.
     204                 : 
     205                 :         @note This is a partial operation and may not process the
     206                 :         entire buffer sequence. Use the composed @ref write algorithm
     207                 :         for guaranteed complete transfer.
     208                 : 
     209                 :         @par Preconditions
     210                 :         The wrapper must contain a valid stream (`has_value() == true`).
     211                 :     */
     212                 :     template<ConstBufferSequence CB>
     213                 :     auto
     214                 :     write_some(CB buffers);
     215                 : 
     216                 : protected:
     217                 :     /** Rebind to a new stream after move.
     218                 : 
     219                 :         Updates the internal pointer to reference a new stream object.
     220                 :         Used by owning wrappers after move assignment when the owned
     221                 :         object has moved to a new location.
     222                 : 
     223                 :         @param new_stream The new stream to bind to. Must be the same
     224                 :             type as the original stream.
     225                 : 
     226                 :         @note Terminates if called with a stream of different type
     227                 :             than the original.
     228                 :     */
     229                 :     template<WriteStream S>
     230                 :     void
     231                 :     rebind(S& new_stream) noexcept
     232                 :     {
     233                 :         if(vt_ != &vtable_for_impl<S>::value)
     234                 :             std::terminate();
     235                 :         stream_ = &new_stream;
     236                 :     }
     237                 : };
     238                 : 
     239                 : struct any_write_stream::vtable
     240                 : {
     241                 :     // ordered by call frequency for cache line coherence
     242                 :     void (*construct_awaitable)(
     243                 :         void* stream,
     244                 :         void* storage,
     245                 :         std::span<const_buffer const> buffers);
     246                 :     bool (*await_ready)(void*);
     247                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     248                 :     io_result<std::size_t> (*await_resume)(void*);
     249                 :     void (*destroy_awaitable)(void*) noexcept;
     250                 :     std::size_t awaitable_size;
     251                 :     std::size_t awaitable_align;
     252                 :     void (*destroy)(void*) noexcept;
     253                 : };
     254                 : 
     255                 : template<WriteStream S>
     256                 : struct any_write_stream::vtable_for_impl
     257                 : {
     258                 :     using Awaitable = decltype(std::declval<S&>().write_some(
     259                 :         std::span<const_buffer const>{}));
     260                 : 
     261                 :     static void
     262               3 :     do_destroy_impl(void* stream) noexcept
     263                 :     {
     264               3 :         static_cast<S*>(stream)->~S();
     265               3 :     }
     266                 : 
     267                 :     static void
     268              75 :     construct_awaitable_impl(
     269                 :         void* stream,
     270                 :         void* storage,
     271                 :         std::span<const_buffer const> buffers)
     272                 :     {
     273              75 :         auto& s = *static_cast<S*>(stream);
     274              75 :         ::new(storage) Awaitable(s.write_some(buffers));
     275              75 :     }
     276                 : 
     277                 :     static constexpr vtable value = {
     278                 :         &construct_awaitable_impl,
     279              75 :         +[](void* p) {
     280              75 :             return static_cast<Awaitable*>(p)->await_ready();
     281                 :         },
     282               2 :         +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     283               2 :             return detail::call_await_suspend(
     284               2 :                 static_cast<Awaitable*>(p), h, env);
     285                 :         },
     286              73 :         +[](void* p) {
     287              73 :             return static_cast<Awaitable*>(p)->await_resume();
     288                 :         },
     289              77 :         +[](void* p) noexcept {
     290              12 :             static_cast<Awaitable*>(p)->~Awaitable();
     291                 :         },
     292                 :         sizeof(Awaitable),
     293                 :         alignof(Awaitable),
     294                 :         &do_destroy_impl
     295                 :     };
     296                 : };
     297                 : 
     298                 : inline
     299             101 : any_write_stream::~any_write_stream()
     300                 : {
     301             101 :     if(storage_)
     302                 :     {
     303               2 :         vt_->destroy(stream_);
     304               2 :         ::operator delete(storage_);
     305                 :     }
     306             101 :     if(cached_awaitable_)
     307                 :     {
     308              87 :         if(awaitable_active_)
     309               1 :             vt_->destroy_awaitable(cached_awaitable_);
     310              87 :         ::operator delete(cached_awaitable_);
     311                 :     }
     312             101 : }
     313                 : 
     314                 : inline any_write_stream&
     315               9 : any_write_stream::operator=(any_write_stream&& other) noexcept
     316                 : {
     317               9 :     if(this != &other)
     318                 :     {
     319               9 :         if(storage_)
     320                 :         {
     321               1 :             vt_->destroy(stream_);
     322               1 :             ::operator delete(storage_);
     323                 :         }
     324               9 :         if(cached_awaitable_)
     325                 :         {
     326               4 :             if(awaitable_active_)
     327               1 :                 vt_->destroy_awaitable(cached_awaitable_);
     328               4 :             ::operator delete(cached_awaitable_);
     329                 :         }
     330               9 :         stream_ = std::exchange(other.stream_, nullptr);
     331               9 :         vt_ = std::exchange(other.vt_, nullptr);
     332               9 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     333               9 :         storage_ = std::exchange(other.storage_, nullptr);
     334               9 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     335                 :     }
     336               9 :     return *this;
     337                 : }
     338                 : 
     339                 : template<WriteStream S>
     340                 :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     341               4 : any_write_stream::any_write_stream(S s)
     342               4 :     : vt_(&vtable_for_impl<S>::value)
     343                 : {
     344                 :     struct guard {
     345                 :         any_write_stream* self;
     346                 :         bool committed = false;
     347               4 :         ~guard() {
     348               4 :             if(!committed && self->storage_) {
     349                 :                 // stream_ is null if the stream move-ctor threw before
     350                 :                 // the placement-new assigned it.
     351               1 :                 if(self->stream_)
     352 MIS           0 :                     self->vt_->destroy(self->stream_);
     353 HIT           1 :                 ::operator delete(self->storage_);
     354               1 :                 self->storage_ = nullptr;
     355               1 :                 self->stream_ = nullptr;
     356                 :             }
     357               4 :         }
     358               4 :     } g{this};
     359                 : 
     360               4 :     storage_ = ::operator new(sizeof(S));
     361               4 :     stream_ = ::new(storage_) S(std::move(s));
     362                 : 
     363                 :     // Preallocate the awaitable storage
     364               3 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     365                 : 
     366               3 :     g.committed = true;
     367               4 : }
     368                 : 
     369                 : template<WriteStream S>
     370              88 : any_write_stream::any_write_stream(S* s)
     371              88 :     : stream_(s)
     372              88 :     , vt_(&vtable_for_impl<S>::value)
     373                 : {
     374                 :     // Preallocate the awaitable storage
     375              88 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     376              88 : }
     377                 : 
     378                 : template<ConstBufferSequence CB>
     379                 : auto
     380              79 : any_write_stream::write_some(CB buffers)
     381                 : {
     382                 :     struct awaitable
     383                 :     {
     384                 :         any_write_stream* self_;
     385                 :         detail::const_buffer_array<detail::max_iovec_> ba_;
     386                 : 
     387              79 :         awaitable(
     388                 :             any_write_stream* self,
     389                 :             CB const& buffers) noexcept
     390              79 :             : self_(self)
     391              79 :             , ba_(buffers)
     392                 :         {
     393              79 :         }
     394                 : 
     395                 :         bool
     396              79 :         await_ready() const noexcept
     397                 :         {
     398              79 :             return ba_.to_span().empty();
     399                 :         }
     400                 : 
     401                 :         std::coroutine_handle<>
     402              75 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     403                 :         {
     404              75 :             self_->vt_->construct_awaitable(
     405              75 :                 self_->stream_,
     406              75 :                 self_->cached_awaitable_,
     407              75 :                 ba_.to_span());
     408              75 :             self_->awaitable_active_ = true;
     409                 : 
     410              75 :             if(self_->vt_->await_ready(self_->cached_awaitable_))
     411              73 :                 return h;
     412                 : 
     413               2 :             return self_->vt_->await_suspend(
     414               2 :                 self_->cached_awaitable_, h, env);
     415                 :         }
     416                 : 
     417                 :         io_result<std::size_t>
     418              77 :         await_resume()
     419                 :         {
     420              77 :             if(!self_->awaitable_active_)
     421               4 :                 return {{}, 0};
     422                 :             struct guard {
     423                 :                 any_write_stream* self;
     424              73 :                 ~guard() {
     425              73 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     426              73 :                     self->awaitable_active_ = false;
     427              73 :                 }
     428              73 :             } g{self_};
     429              73 :             return self_->vt_->await_resume(
     430              73 :                 self_->cached_awaitable_);
     431              73 :         }
     432                 :     };
     433              79 :     return awaitable{this, buffers};
     434                 : }
     435                 : 
     436                 : } // namespace capy
     437                 : } // namespace boost
     438                 : 
     439                 : #endif
        

Generated by: LCOV version 2.3