include/boost/capy/io/any_buffer_sink.hpp

100.0% Lines (220/220) 86.3% List of functions (63/73)
any_buffer_sink.hpp
f(x) Functions (73)
Function Calls Lines Blocks
boost::capy::any_buffer_sink::any_buffer_sink(boost::capy::any_buffer_sink&&) :161 2x 100.0% 100.0% boost::capy::any_buffer_sink::has_value() const :213 26x 100.0% 100.0% boost::capy::any_buffer_sink::operator bool() const :224 3x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::do_destroy_impl(void*) :466 7x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::do_destroy_impl(void*) :466 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::do_destroy_impl(void*) :466 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::do_destroy_impl(void*) :466 11x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 2x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 124x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 103x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 70x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 14x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 12x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 16x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 0 0.0% 0.0% boost::capy::any_buffer_sink::~any_buffer_sink() :733 218x 100.0% 100.0% boost::capy::any_buffer_sink::operator=(boost::capy::any_buffer_sink&&) :745 5x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::buffer_write_sink>(boost::capy::(anonymous namespace)::buffer_write_sink) :768 7x 100.0% 80.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>(boost::capy::(anonymous namespace)::throwing_move_buffer_sink) :768 1x 75.0% 77.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::test::buffer_sink>(boost::capy::test::buffer_sink) :768 11x 100.0% 80.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::buffer_write_sink>(boost::capy::(anonymous namespace)::buffer_write_sink*) :796 49x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::resuming_buffer_sink>(boost::capy::(anonymous namespace)::resuming_buffer_sink*) :796 1x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::test::buffer_sink>(boost::capy::test::buffer_sink*) :796 145x 100.0% 100.0% boost::capy::any_buffer_sink::prepare(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :804 132x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long) :810 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_ready() :818 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :828 1x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_resume() :835 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_resume()::guard::~guard() :839 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long) :852 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_ready() :860 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :870 1x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_resume() :877 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_resume()::guard::~guard() :881 55x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :894 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :903 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :909 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :926 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :930 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :944 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :953 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :959 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :976 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :980 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :994 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :1003 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :1009 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :1026 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :1030 13x 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write_some<boost::capy::const_buffer>(boost::capy::const_buffer) :1045 23x 100.0% 44.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write<boost::capy::const_buffer>(boost::capy::const_buffer) :1078 39x 100.0% 44.0% boost::capy::any_buffer_sink::write_eof() :1130 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_ready() :1137 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :1161 1x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_resume() :1168 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_resume()::guard::~guard() :1172 33x 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write_eof<boost::capy::const_buffer>(boost::capy::const_buffer) :1186 41x 100.0% 44.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any BufferSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref BufferSink concept, enabling runtime polymorphism for
42 buffer sink operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper exposes two interfaces for producing data:
46 the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 and the @ref WriteSink interface (`write_some`, `write`,
48 `write_eof`). Choose the interface that matches how your data
49 is produced:
50
51 @par Choosing an Interface
52
53 Use the **BufferSink** interface when you are a generator that
54 produces data into externally-provided buffers. The sink owns
55 the memory; you call @ref prepare to obtain writable buffers,
56 fill them, then call @ref commit or @ref commit_eof.
57
58 Use the **WriteSink** interface when you already have buffers
59 containing the data to write:
60 - If the entire body is available up front, call
61 @ref write_eof(buffers) to send everything atomically.
62 - If data arrives incrementally, call @ref write or
63 @ref write_some in a loop, then @ref write_eof() when done.
64 Prefer `write` (complete) unless your streaming pattern
65 benefits from partial writes via `write_some`.
66
67 If the wrapped type only satisfies @ref BufferSink, the
68 @ref WriteSink operations are provided automatically.
69
70 @par Construction Modes
71
72 - **Owning**: Pass by value to transfer ownership. The wrapper
73 allocates storage and owns the sink.
74 - **Reference**: Pass a pointer to wrap without ownership. The
75 pointed-to sink must outlive this wrapper.
76
77 @par Awaitable Preallocation
78 The constructor preallocates storage for the type-erased awaitable.
79 This reserves all virtual address space at server startup
80 so memory usage can be measured up front, rather than
81 allocating piecemeal as traffic arrives.
82
83 @par Thread Safety
84 Not thread-safe. Concurrent operations on the same wrapper
85 are undefined behavior.
86
87 @par Example
88 @code
89 // Owning - takes ownership of the sink
90 any_buffer_sink abs(some_buffer_sink{args...});
91
92 // Reference - wraps without ownership
93 some_buffer_sink sink;
94 any_buffer_sink abs(&sink);
95
96 // BufferSink interface: generate into callee-owned buffers
97 mutable_buffer arr[16];
98 auto bufs = abs.prepare(arr);
99 // Write data into bufs[0..bufs.size())
100 auto [ec] = co_await abs.commit(bytes_written);
101 auto [ec2] = co_await abs.commit_eof(0);
102
103 // WriteSink interface: send caller-owned buffers
104 auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 auto [ec4] = co_await abs.write_eof();
106
107 // Or send everything at once
108 auto [ec5, n2] = co_await abs.write_eof(
109 make_buffer(body_data));
110 @endcode
111
112 @see any_buffer_source, BufferSink, WriteSink
113 */
114 class any_buffer_sink
115 {
116 struct vtable;
117 struct awaitable_ops;
118 struct write_awaitable_ops;
119
120 template<BufferSink S>
121 struct vtable_for_impl;
122
123 // hot-path members first for cache locality
124 void* sink_ = nullptr;
125 vtable const* vt_ = nullptr;
126 void* cached_awaitable_ = nullptr;
127 awaitable_ops const* active_ops_ = nullptr;
128 write_awaitable_ops const* active_write_ops_ = nullptr;
129 void* storage_ = nullptr;
130
131 public:
132 /** Destructor.
133
134 Destroys the owned sink (if any) and releases the cached
135 awaitable storage.
136 */
137 ~any_buffer_sink();
138
139 /** Construct a default instance.
140
141 Constructs an empty wrapper. Operations on a default-constructed
142 wrapper result in undefined behavior.
143 */
144 any_buffer_sink() = default;
145
146 /** Non-copyable.
147
148 The awaitable cache is per-instance and cannot be shared.
149 */
150 any_buffer_sink(any_buffer_sink const&) = delete;
151 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152
153 /** Construct by moving.
154
155 Transfers ownership of the wrapped sink (if owned) and
156 cached awaitable storage from `other`. After the move, `other` is
157 in a default-constructed state.
158
159 @param other The wrapper to move from.
160 */
161 2x any_buffer_sink(any_buffer_sink&& other) noexcept
162 2x : sink_(std::exchange(other.sink_, nullptr))
163 2x , vt_(std::exchange(other.vt_, nullptr))
164 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2x , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2x , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2x , storage_(std::exchange(other.storage_, nullptr))
168 {
169 2x }
170
171 /** Assign by moving.
172
173 Destroys any owned sink and releases existing resources,
174 then transfers ownership from `other`.
175
176 @param other The wrapper to move from.
177 @return Reference to this wrapper.
178 */
179 any_buffer_sink&
180 operator=(any_buffer_sink&& other) noexcept;
181
182 /** Construct by taking ownership of a BufferSink.
183
184 Allocates storage and moves the sink into this wrapper.
185 The wrapper owns the sink and will destroy it. If `S` also
186 satisfies @ref WriteSink, native write operations are
187 forwarded through the virtual boundary.
188
189 @param s The sink to take ownership of.
190 */
191 template<BufferSink S>
192 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 any_buffer_sink(S s);
194
195 /** Construct by wrapping a BufferSink without ownership.
196
197 Wraps the given sink by pointer. The sink must remain
198 valid for the lifetime of this wrapper. If `S` also
199 satisfies @ref WriteSink, native write operations are
200 forwarded through the virtual boundary.
201
202 @param s Pointer to the sink to wrap.
203 */
204 template<BufferSink S>
205 any_buffer_sink(S* s);
206
207 /** Check if the wrapper contains a valid sink.
208
209 @return `true` if wrapping a sink, `false` if default-constructed
210 or moved-from.
211 */
212 bool
213 26x has_value() const noexcept
214 {
215 26x return sink_ != nullptr;
216 }
217
218 /** Check if the wrapper contains a valid sink.
219
220 @return `true` if wrapping a sink, `false` if default-constructed
221 or moved-from.
222 */
223 explicit
224 3x operator bool() const noexcept
225 {
226 3x return has_value();
227 }
228
229 /** Prepare writable buffers.
230
231 Fills the provided span with mutable buffer descriptors
232 pointing to the underlying sink's internal storage. This
233 operation is synchronous.
234
235 @param dest Span of mutable_buffer to fill.
236
237 @return A span of filled buffers.
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 std::span<mutable_buffer>
243 prepare(std::span<mutable_buffer> dest);
244
245 /** Commit bytes written to the prepared buffers.
246
247 Commits `n` bytes written to the buffers returned by the
248 most recent call to @ref prepare. The operation may trigger
249 underlying I/O.
250
251 @param n The number of bytes to commit.
252
253 @return An awaitable that await-returns `(error_code)`.
254
255 @par Preconditions
256 The wrapper must contain a valid sink (`has_value() == true`).
257 */
258 auto
259 commit(std::size_t n);
260
261 /** Commit final bytes and signal end-of-stream.
262
263 Commits `n` bytes written to the buffers returned by the
264 most recent call to @ref prepare and finalizes the sink.
265 After success, no further operations are permitted.
266
267 @param n The number of bytes to commit.
268
269 @return An awaitable that await-returns `(error_code)`.
270
271 @par Preconditions
272 The wrapper must contain a valid sink (`has_value() == true`).
273 */
274 auto
275 commit_eof(std::size_t n);
276
277 /** Write some data from a buffer sequence.
278
279 Attempt to write up to `buffer_size( buffers )` bytes from
280 the buffer sequence to the underlying sink. May consume less
281 than the full sequence.
282
283 When the wrapped type provides native @ref WriteSink support,
284 the operation forwards directly. Otherwise it is synthesized
285 from @ref prepare and @ref commit with a buffer copy.
286
287 @param buffers The buffer sequence to write.
288
289 @return An awaitable that await-returns `(error_code,std::size_t)`.
290
291 @par Preconditions
292 The wrapper must contain a valid sink (`has_value() == true`).
293 */
294 template<ConstBufferSequence CB>
295 io_task<std::size_t>
296 write_some(CB buffers);
297
298 /** Write all data from a buffer sequence.
299
300 Writes all data from the buffer sequence to the underlying
301 sink. This method satisfies the @ref WriteSink concept.
302
303 When the wrapped type provides native @ref WriteSink support,
304 each window is forwarded directly. Otherwise the data is
305 copied into the sink via @ref prepare and @ref commit.
306
307 @param buffers The buffer sequence to write.
308
309 @return An awaitable that await-returns `(error_code,std::size_t)`.
310
311 @par Preconditions
312 The wrapper must contain a valid sink (`has_value() == true`).
313 */
314 template<ConstBufferSequence CB>
315 io_task<std::size_t>
316 write(CB buffers);
317
318 /** Atomically write data and signal end-of-stream.
319
320 Writes all data from the buffer sequence to the underlying
321 sink and then signals end-of-stream.
322
323 When the wrapped type provides native @ref WriteSink support,
324 the final window is sent atomically via the underlying
325 `write_eof(buffers)`. Otherwise the data is synthesized
326 through @ref prepare, @ref commit, and @ref commit_eof.
327
328 @param buffers The buffer sequence to write.
329
330 @return An awaitable that await-returns `(error_code,std::size_t)`.
331
332 @par Preconditions
333 The wrapper must contain a valid sink (`has_value() == true`).
334 */
335 template<ConstBufferSequence CB>
336 io_task<std::size_t>
337 write_eof(CB buffers);
338
339 /** Signal end-of-stream.
340
341 Indicates that no more data will be written to the sink.
342 This method satisfies the @ref WriteSink concept.
343
344 When the wrapped type provides native @ref WriteSink support,
345 the underlying `write_eof()` is called. Otherwise the
346 operation is implemented as `commit_eof(0)`.
347
348 @return An awaitable that await-returns `(error_code)`.
349
350 @par Preconditions
351 The wrapper must contain a valid sink (`has_value() == true`).
352 */
353 auto
354 write_eof();
355
356 protected:
357 /** Rebind to a new sink after move.
358
359 Updates the internal pointer to reference a new sink object.
360 Used by owning wrappers after move assignment when the owned
361 object has moved to a new location.
362
363 @param new_sink The new sink to bind to. Must be the same
364 type as the original sink.
365
366 @note Terminates if called with a sink of different type
367 than the original.
368 */
369 template<BufferSink S>
370 void
371 rebind(S& new_sink) noexcept
372 {
373 if(vt_ != &vtable_for_impl<S>::value)
374 std::terminate();
375 sink_ = &new_sink;
376 }
377
378 private:
379 /** Forward a partial write through the vtable.
380
381 Constructs the underlying `write_some` awaitable in
382 cached storage and returns a type-erased awaitable.
383 */
384 auto
385 write_some_(std::span<const_buffer const> buffers);
386
387 /** Forward a complete write through the vtable.
388
389 Constructs the underlying `write` awaitable in
390 cached storage and returns a type-erased awaitable.
391 */
392 auto
393 write_(std::span<const_buffer const> buffers);
394
395 /** Forward an atomic write-with-EOF through the vtable.
396
397 Constructs the underlying `write_eof(buffers)` awaitable
398 in cached storage and returns a type-erased awaitable.
399 */
400 auto
401 write_eof_buffers_(std::span<const_buffer const> buffers);
402 };
403
404 /** Type-erased ops for awaitables that await-return `io_result<>`. */
405 struct any_buffer_sink::awaitable_ops
406 {
407 bool (*await_ready)(void*);
408 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409 io_result<> (*await_resume)(void*);
410 void (*destroy)(void*) noexcept;
411 };
412
413 /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414 struct any_buffer_sink::write_awaitable_ops
415 {
416 bool (*await_ready)(void*);
417 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418 io_result<std::size_t> (*await_resume)(void*);
419 void (*destroy)(void*) noexcept;
420 };
421
422 struct any_buffer_sink::vtable
423 {
424 void (*destroy)(void*) noexcept;
425 std::span<mutable_buffer> (*do_prepare)(
426 void* sink,
427 std::span<mutable_buffer> dest);
428 std::size_t awaitable_size;
429 std::size_t awaitable_align;
430 awaitable_ops const* (*construct_commit_awaitable)(
431 void* sink,
432 void* storage,
433 std::size_t n);
434 awaitable_ops const* (*construct_commit_eof_awaitable)(
435 void* sink,
436 void* storage,
437 std::size_t n);
438
439 // WriteSink forwarding (null when wrapped type is BufferSink-only)
440 write_awaitable_ops const* (*construct_write_some_awaitable)(
441 void* sink,
442 void* storage,
443 std::span<const_buffer const> buffers);
444 write_awaitable_ops const* (*construct_write_awaitable)(
445 void* sink,
446 void* storage,
447 std::span<const_buffer const> buffers);
448 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449 void* sink,
450 void* storage,
451 std::span<const_buffer const> buffers);
452 awaitable_ops const* (*construct_write_eof_awaitable)(
453 void* sink,
454 void* storage);
455 };
456
457 template<BufferSink S>
458 struct any_buffer_sink::vtable_for_impl
459 {
460 using CommitAwaitable = decltype(std::declval<S&>().commit(
461 std::size_t{}));
462 using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463 std::size_t{}));
464
465 static void
466 18x do_destroy_impl(void* sink) noexcept
467 {
468 18x static_cast<S*>(sink)->~S();
469 18x }
470
471 static std::span<mutable_buffer>
472 132x do_prepare_impl(
473 void* sink,
474 std::span<mutable_buffer> dest)
475 {
476 132x auto& s = *static_cast<S*>(sink);
477 132x return s.prepare(dest);
478 }
479
480 static awaitable_ops const*
481 110x construct_commit_awaitable_impl(
482 void* sink,
483 void* storage,
484 std::size_t n)
485 {
486 110x auto& s = *static_cast<S*>(sink);
487 110x ::new(storage) CommitAwaitable(s.commit(n));
488
489 static constexpr awaitable_ops ops = {
490 +[](void* p) {
491 return static_cast<CommitAwaitable*>(p)->await_ready();
492 },
493 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494 return detail::call_await_suspend(
495 static_cast<CommitAwaitable*>(p), h, env);
496 },
497 +[](void* p) {
498 return static_cast<CommitAwaitable*>(p)->await_resume();
499 },
500 +[](void* p) noexcept {
501 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502 }
503 };
504 110x return &ops;
505 }
506
507 static awaitable_ops const*
508 71x construct_commit_eof_awaitable_impl(
509 void* sink,
510 void* storage,
511 std::size_t n)
512 {
513 71x auto& s = *static_cast<S*>(sink);
514 71x ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515
516 static constexpr awaitable_ops ops = {
517 +[](void* p) {
518 return static_cast<CommitEofAwaitable*>(p)->await_ready();
519 },
520 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521 return detail::call_await_suspend(
522 static_cast<CommitEofAwaitable*>(p), h, env);
523 },
524 +[](void* p) {
525 return static_cast<CommitEofAwaitable*>(p)->await_resume();
526 },
527 +[](void* p) noexcept {
528 static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529 }
530 };
531 71x return &ops;
532 }
533
534 static write_awaitable_ops const*
535 7x construct_write_some_awaitable_impl(
536 void* sink,
537 void* storage,
538 std::span<const_buffer const> buffers)
539 requires WriteSink<S>
540 {
541 using Aw = decltype(std::declval<S&>().write_some(
542 std::span<const_buffer const>{}));
543 7x auto& s = *static_cast<S*>(sink);
544 7x ::new(storage) Aw(s.write_some(buffers));
545
546 static constexpr write_awaitable_ops ops = {
547 +[](void* p) {
548 return static_cast<Aw*>(p)->await_ready();
549 },
550 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551 return detail::call_await_suspend(
552 static_cast<Aw*>(p), h, env);
553 },
554 +[](void* p) {
555 return static_cast<Aw*>(p)->await_resume();
556 },
557 +[](void* p) noexcept {
558 static_cast<Aw*>(p)->~Aw();
559 }
560 };
561 7x return &ops;
562 }
563
564 static write_awaitable_ops const*
565 15x construct_write_awaitable_impl(
566 void* sink,
567 void* storage,
568 std::span<const_buffer const> buffers)
569 requires WriteSink<S>
570 {
571 using Aw = decltype(std::declval<S&>().write(
572 std::span<const_buffer const>{}));
573 15x auto& s = *static_cast<S*>(sink);
574 15x ::new(storage) Aw(s.write(buffers));
575
576 static constexpr write_awaitable_ops ops = {
577 +[](void* p) {
578 return static_cast<Aw*>(p)->await_ready();
579 },
580 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581 return detail::call_await_suspend(
582 static_cast<Aw*>(p), h, env);
583 },
584 +[](void* p) {
585 return static_cast<Aw*>(p)->await_resume();
586 },
587 +[](void* p) noexcept {
588 static_cast<Aw*>(p)->~Aw();
589 }
590 };
591 15x return &ops;
592 }
593
594 static write_awaitable_ops const*
595 13x construct_write_eof_buffers_awaitable_impl(
596 void* sink,
597 void* storage,
598 std::span<const_buffer const> buffers)
599 requires WriteSink<S>
600 {
601 using Aw = decltype(std::declval<S&>().write_eof(
602 std::span<const_buffer const>{}));
603 13x auto& s = *static_cast<S*>(sink);
604 13x ::new(storage) Aw(s.write_eof(buffers));
605
606 static constexpr write_awaitable_ops ops = {
607 +[](void* p) {
608 return static_cast<Aw*>(p)->await_ready();
609 },
610 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611 return detail::call_await_suspend(
612 static_cast<Aw*>(p), h, env);
613 },
614 +[](void* p) {
615 return static_cast<Aw*>(p)->await_resume();
616 },
617 +[](void* p) noexcept {
618 static_cast<Aw*>(p)->~Aw();
619 }
620 };
621 13x return &ops;
622 }
623
624 static awaitable_ops const*
625 17x construct_write_eof_awaitable_impl(
626 void* sink,
627 void* storage)
628 requires WriteSink<S>
629 {
630 using Aw = decltype(std::declval<S&>().write_eof());
631 17x auto& s = *static_cast<S*>(sink);
632 17x ::new(storage) Aw(s.write_eof());
633
634 static constexpr awaitable_ops ops = {
635 +[](void* p) {
636 return static_cast<Aw*>(p)->await_ready();
637 },
638 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639 return detail::call_await_suspend(
640 static_cast<Aw*>(p), h, env);
641 },
642 +[](void* p) {
643 return static_cast<Aw*>(p)->await_resume();
644 },
645 +[](void* p) noexcept {
646 static_cast<Aw*>(p)->~Aw();
647 }
648 };
649 17x return &ops;
650 }
651
652 static consteval std::size_t
653 compute_max_size() noexcept
654 {
655 std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656 ? sizeof(CommitAwaitable)
657 : sizeof(CommitEofAwaitable);
658 if constexpr (WriteSink<S>)
659 {
660 using WS = decltype(std::declval<S&>().write_some(
661 std::span<const_buffer const>{}));
662 using W = decltype(std::declval<S&>().write(
663 std::span<const_buffer const>{}));
664 using WEB = decltype(std::declval<S&>().write_eof(
665 std::span<const_buffer const>{}));
666 using WE = decltype(std::declval<S&>().write_eof());
667
668 if(sizeof(WS) > s) s = sizeof(WS);
669 if(sizeof(W) > s) s = sizeof(W);
670 if(sizeof(WEB) > s) s = sizeof(WEB);
671 if(sizeof(WE) > s) s = sizeof(WE);
672 }
673 return s;
674 }
675
676 static consteval std::size_t
677 compute_max_align() noexcept
678 {
679 std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680 ? alignof(CommitAwaitable)
681 : alignof(CommitEofAwaitable);
682 if constexpr (WriteSink<S>)
683 {
684 using WS = decltype(std::declval<S&>().write_some(
685 std::span<const_buffer const>{}));
686 using W = decltype(std::declval<S&>().write(
687 std::span<const_buffer const>{}));
688 using WEB = decltype(std::declval<S&>().write_eof(
689 std::span<const_buffer const>{}));
690 using WE = decltype(std::declval<S&>().write_eof());
691
692 if(alignof(WS) > a) a = alignof(WS);
693 if(alignof(W) > a) a = alignof(W);
694 if(alignof(WEB) > a) a = alignof(WEB);
695 if(alignof(WE) > a) a = alignof(WE);
696 }
697 return a;
698 }
699
700 static consteval vtable
701 make_vtable() noexcept
702 {
703 vtable v{};
704 v.destroy = &do_destroy_impl;
705 v.do_prepare = &do_prepare_impl;
706 v.awaitable_size = compute_max_size();
707 v.awaitable_align = compute_max_align();
708 v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709 v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710 v.construct_write_some_awaitable = nullptr;
711 v.construct_write_awaitable = nullptr;
712 v.construct_write_eof_buffers_awaitable = nullptr;
713 v.construct_write_eof_awaitable = nullptr;
714
715 if constexpr (WriteSink<S>)
716 {
717 v.construct_write_some_awaitable =
718 &construct_write_some_awaitable_impl;
719 v.construct_write_awaitable =
720 &construct_write_awaitable_impl;
721 v.construct_write_eof_buffers_awaitable =
722 &construct_write_eof_buffers_awaitable_impl;
723 v.construct_write_eof_awaitable =
724 &construct_write_eof_awaitable_impl;
725 }
726 return v;
727 }
728
729 static constexpr vtable value = make_vtable();
730 };
731
732 inline
733 218x any_buffer_sink::~any_buffer_sink()
734 {
735 218x if(storage_)
736 {
737 17x vt_->destroy(sink_);
738 17x ::operator delete(storage_);
739 }
740 218x if(cached_awaitable_)
741 211x ::operator delete(cached_awaitable_);
742 218x }
743
744 inline any_buffer_sink&
745 5x any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746 {
747 5x if(this != &other)
748 {
749 4x if(storage_)
750 {
751 1x vt_->destroy(sink_);
752 1x ::operator delete(storage_);
753 }
754 4x if(cached_awaitable_)
755 2x ::operator delete(cached_awaitable_);
756 4x sink_ = std::exchange(other.sink_, nullptr);
757 4x vt_ = std::exchange(other.vt_, nullptr);
758 4x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759 4x storage_ = std::exchange(other.storage_, nullptr);
760 4x active_ops_ = std::exchange(other.active_ops_, nullptr);
761 4x active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762 }
763 5x return *this;
764 }
765
766 template<BufferSink S>
767 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768 19x any_buffer_sink::any_buffer_sink(S s)
769 19x : vt_(&vtable_for_impl<S>::value)
770 {
771 struct guard {
772 any_buffer_sink* self;
773 bool committed = false;
774 ~guard() {
775 if(!committed && self->storage_) {
776 // sink_ is null if the sink move-ctor threw before
777 // the placement-new assigned it.
778 if(self->sink_)
779 self->vt_->destroy(self->sink_);
780 ::operator delete(self->storage_);
781 self->storage_ = nullptr;
782 self->sink_ = nullptr;
783 }
784 }
785 19x } g{this};
786
787 19x storage_ = ::operator new(sizeof(S));
788 19x sink_ = ::new(storage_) S(std::move(s));
789
790 18x cached_awaitable_ = ::operator new(vt_->awaitable_size);
791
792 18x g.committed = true;
793 19x }
794
795 template<BufferSink S>
796 195x any_buffer_sink::any_buffer_sink(S* s)
797 195x : sink_(s)
798 195x , vt_(&vtable_for_impl<S>::value)
799 {
800 195x cached_awaitable_ = ::operator new(vt_->awaitable_size);
801 195x }
802
803 inline std::span<mutable_buffer>
804 132x any_buffer_sink::prepare(std::span<mutable_buffer> dest)
805 {
806 132x return vt_->do_prepare(sink_, dest);
807 }
808
809 inline auto
810 110x any_buffer_sink::commit(std::size_t n)
811 {
812 struct awaitable
813 {
814 any_buffer_sink* self_;
815 std::size_t n_;
816
817 bool
818 110x await_ready()
819 {
820 220x self_->active_ops_ = self_->vt_->construct_commit_awaitable(
821 110x self_->sink_,
822 110x self_->cached_awaitable_,
823 n_);
824 110x return self_->active_ops_->await_ready(self_->cached_awaitable_);
825 }
826
827 std::coroutine_handle<>
828 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
829 {
830 1x return self_->active_ops_->await_suspend(
831 1x self_->cached_awaitable_, h, env);
832 }
833
834 io_result<>
835 110x await_resume()
836 {
837 struct guard {
838 any_buffer_sink* self;
839 110x ~guard() {
840 110x self->active_ops_->destroy(self->cached_awaitable_);
841 110x self->active_ops_ = nullptr;
842 110x }
843 110x } g{self_};
844 110x return self_->active_ops_->await_resume(
845 193x self_->cached_awaitable_);
846 110x }
847 };
848 110x return awaitable{this, n};
849 }
850
851 inline auto
852 55x any_buffer_sink::commit_eof(std::size_t n)
853 {
854 struct awaitable
855 {
856 any_buffer_sink* self_;
857 std::size_t n_;
858
859 bool
860 55x await_ready()
861 {
862 110x self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
863 55x self_->sink_,
864 55x self_->cached_awaitable_,
865 n_);
866 55x return self_->active_ops_->await_ready(self_->cached_awaitable_);
867 }
868
869 std::coroutine_handle<>
870 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
871 {
872 1x return self_->active_ops_->await_suspend(
873 1x self_->cached_awaitable_, h, env);
874 }
875
876 io_result<>
877 55x await_resume()
878 {
879 struct guard {
880 any_buffer_sink* self;
881 55x ~guard() {
882 55x self->active_ops_->destroy(self->cached_awaitable_);
883 55x self->active_ops_ = nullptr;
884 55x }
885 55x } g{self_};
886 55x return self_->active_ops_->await_resume(
887 94x self_->cached_awaitable_);
888 55x }
889 };
890 55x return awaitable{this, n};
891 }
892
893 inline auto
894 7x any_buffer_sink::write_some_(
895 std::span<const_buffer const> buffers)
896 {
897 struct awaitable
898 {
899 any_buffer_sink* self_;
900 std::span<const_buffer const> buffers_;
901
902 bool
903 7x await_ready() const noexcept
904 {
905 7x return false;
906 }
907
908 std::coroutine_handle<>
909 7x await_suspend(std::coroutine_handle<> h, io_env const* env)
910 {
911 14x self_->active_write_ops_ =
912 14x self_->vt_->construct_write_some_awaitable(
913 7x self_->sink_,
914 7x self_->cached_awaitable_,
915 buffers_);
916
917 7x if(self_->active_write_ops_->await_ready(
918 7x self_->cached_awaitable_))
919 6x return h;
920
921 1x return self_->active_write_ops_->await_suspend(
922 1x self_->cached_awaitable_, h, env);
923 }
924
925 io_result<std::size_t>
926 7x await_resume()
927 {
928 struct guard {
929 any_buffer_sink* self;
930 7x ~guard() {
931 7x self->active_write_ops_->destroy(
932 7x self->cached_awaitable_);
933 7x self->active_write_ops_ = nullptr;
934 7x }
935 7x } g{self_};
936 7x return self_->active_write_ops_->await_resume(
937 12x self_->cached_awaitable_);
938 7x }
939 };
940 7x return awaitable{this, buffers};
941 }
942
943 inline auto
944 15x any_buffer_sink::write_(
945 std::span<const_buffer const> buffers)
946 {
947 struct awaitable
948 {
949 any_buffer_sink* self_;
950 std::span<const_buffer const> buffers_;
951
952 bool
953 15x await_ready() const noexcept
954 {
955 15x return false;
956 }
957
958 std::coroutine_handle<>
959 15x await_suspend(std::coroutine_handle<> h, io_env const* env)
960 {
961 30x self_->active_write_ops_ =
962 30x self_->vt_->construct_write_awaitable(
963 15x self_->sink_,
964 15x self_->cached_awaitable_,
965 buffers_);
966
967 15x if(self_->active_write_ops_->await_ready(
968 15x self_->cached_awaitable_))
969 14x return h;
970
971 1x return self_->active_write_ops_->await_suspend(
972 1x self_->cached_awaitable_, h, env);
973 }
974
975 io_result<std::size_t>
976 15x await_resume()
977 {
978 struct guard {
979 any_buffer_sink* self;
980 15x ~guard() {
981 15x self->active_write_ops_->destroy(
982 15x self->cached_awaitable_);
983 15x self->active_write_ops_ = nullptr;
984 15x }
985 15x } g{self_};
986 15x return self_->active_write_ops_->await_resume(
987 26x self_->cached_awaitable_);
988 15x }
989 };
990 15x return awaitable{this, buffers};
991 }
992
993 inline auto
994 13x any_buffer_sink::write_eof_buffers_(
995 std::span<const_buffer const> buffers)
996 {
997 struct awaitable
998 {
999 any_buffer_sink* self_;
1000 std::span<const_buffer const> buffers_;
1001
1002 bool
1003 13x await_ready() const noexcept
1004 {
1005 13x return false;
1006 }
1007
1008 std::coroutine_handle<>
1009 13x await_suspend(std::coroutine_handle<> h, io_env const* env)
1010 {
1011 26x self_->active_write_ops_ =
1012 26x self_->vt_->construct_write_eof_buffers_awaitable(
1013 13x self_->sink_,
1014 13x self_->cached_awaitable_,
1015 buffers_);
1016
1017 13x if(self_->active_write_ops_->await_ready(
1018 13x self_->cached_awaitable_))
1019 12x return h;
1020
1021 1x return self_->active_write_ops_->await_suspend(
1022 1x self_->cached_awaitable_, h, env);
1023 }
1024
1025 io_result<std::size_t>
1026 13x await_resume()
1027 {
1028 struct guard {
1029 any_buffer_sink* self;
1030 13x ~guard() {
1031 13x self->active_write_ops_->destroy(
1032 13x self->cached_awaitable_);
1033 13x self->active_write_ops_ = nullptr;
1034 13x }
1035 13x } g{self_};
1036 13x return self_->active_write_ops_->await_resume(
1037 22x self_->cached_awaitable_);
1038 13x }
1039 };
1040 13x return awaitable{this, buffers};
1041 }
1042
1043 template<ConstBufferSequence CB>
1044 io_task<std::size_t>
1045 23x any_buffer_sink::write_some(CB buffers)
1046 {
1047 buffer_param<CB> bp(buffers);
1048 auto src = bp.data();
1049 if(src.empty())
1050 co_return {{}, 0};
1051
1052 // Native WriteSink path
1053 if(vt_->construct_write_some_awaitable)
1054 co_return co_await write_some_(src);
1055
1056 // Synthesized path: prepare + buffer_copy + commit
1057 mutable_buffer arr[detail::max_iovec_];
1058 auto dst_bufs = prepare(arr);
1059 if(dst_bufs.empty())
1060 {
1061 auto [ec] = co_await commit(0);
1062 if(ec)
1063 co_return {ec, 0};
1064 dst_bufs = prepare(arr);
1065 if(dst_bufs.empty())
1066 co_return {{}, 0};
1067 }
1068
1069 auto n = buffer_copy(dst_bufs, src);
1070 auto [ec] = co_await commit(n);
1071 if(ec)
1072 co_return {ec, 0};
1073 co_return {{}, n};
1074 46x }
1075
1076 template<ConstBufferSequence CB>
1077 io_task<std::size_t>
1078 39x any_buffer_sink::write(CB buffers)
1079 {
1080 buffer_param<CB> bp(buffers);
1081 std::size_t total = 0;
1082
1083 // Native WriteSink path
1084 if(vt_->construct_write_awaitable)
1085 {
1086 for(;;)
1087 {
1088 auto bufs = bp.data();
1089 if(bufs.empty())
1090 break;
1091
1092 auto [ec, n] = co_await write_(bufs);
1093 total += n;
1094 if(ec)
1095 co_return {ec, total};
1096 bp.consume(n);
1097 }
1098 co_return {{}, total};
1099 }
1100
1101 // Synthesized path: prepare + buffer_copy + commit
1102 for(;;)
1103 {
1104 auto src = bp.data();
1105 if(src.empty())
1106 break;
1107
1108 mutable_buffer arr[detail::max_iovec_];
1109 auto dst_bufs = prepare(arr);
1110 if(dst_bufs.empty())
1111 {
1112 auto [ec] = co_await commit(0);
1113 if(ec)
1114 co_return {ec, total};
1115 continue;
1116 }
1117
1118 auto n = buffer_copy(dst_bufs, src);
1119 auto [ec] = co_await commit(n);
1120 if(ec)
1121 co_return {ec, total};
1122 bp.consume(n);
1123 total += n;
1124 }
1125
1126 co_return {{}, total};
1127 78x }
1128
1129 inline auto
1130 33x any_buffer_sink::write_eof()
1131 {
1132 struct awaitable
1133 {
1134 any_buffer_sink* self_;
1135
1136 bool
1137 33x await_ready()
1138 {
1139 33x if(self_->vt_->construct_write_eof_awaitable)
1140 {
1141 // Native WriteSink: forward to underlying write_eof()
1142 34x self_->active_ops_ =
1143 17x self_->vt_->construct_write_eof_awaitable(
1144 17x self_->sink_,
1145 17x self_->cached_awaitable_);
1146 }
1147 else
1148 {
1149 // Synthesized: commit_eof(0)
1150 32x self_->active_ops_ =
1151 16x self_->vt_->construct_commit_eof_awaitable(
1152 16x self_->sink_,
1153 16x self_->cached_awaitable_,
1154 0);
1155 }
1156 66x return self_->active_ops_->await_ready(
1157 33x self_->cached_awaitable_);
1158 }
1159
1160 std::coroutine_handle<>
1161 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
1162 {
1163 1x return self_->active_ops_->await_suspend(
1164 1x self_->cached_awaitable_, h, env);
1165 }
1166
1167 io_result<>
1168 33x await_resume()
1169 {
1170 struct guard {
1171 any_buffer_sink* self;
1172 33x ~guard() {
1173 33x self->active_ops_->destroy(self->cached_awaitable_);
1174 33x self->active_ops_ = nullptr;
1175 33x }
1176 33x } g{self_};
1177 33x return self_->active_ops_->await_resume(
1178 56x self_->cached_awaitable_);
1179 33x }
1180 };
1181 33x return awaitable{this};
1182 }
1183
1184 template<ConstBufferSequence CB>
1185 io_task<std::size_t>
1186 41x any_buffer_sink::write_eof(CB buffers)
1187 {
1188 // Native WriteSink path
1189 if(vt_->construct_write_eof_buffers_awaitable)
1190 {
1191 const_buffer_param<CB> bp(buffers);
1192 std::size_t total = 0;
1193
1194 for(;;)
1195 {
1196 auto bufs = bp.data();
1197 if(bufs.empty())
1198 {
1199 auto [ec] = co_await write_eof();
1200 co_return {ec, total};
1201 }
1202
1203 if(!bp.more())
1204 {
1205 // Last window: send atomically with EOF
1206 auto [ec, n] = co_await write_eof_buffers_(bufs);
1207 total += n;
1208 co_return {ec, total};
1209 }
1210
1211 auto [ec, n] = co_await write_(bufs);
1212 total += n;
1213 if(ec)
1214 co_return {ec, total};
1215 bp.consume(n);
1216 }
1217 }
1218
1219 // Synthesized path: prepare + buffer_copy + commit + commit_eof
1220 buffer_param<CB> bp(buffers);
1221 std::size_t total = 0;
1222
1223 for(;;)
1224 {
1225 auto src = bp.data();
1226 if(src.empty())
1227 break;
1228
1229 mutable_buffer arr[detail::max_iovec_];
1230 auto dst_bufs = prepare(arr);
1231 if(dst_bufs.empty())
1232 {
1233 auto [ec] = co_await commit(0);
1234 if(ec)
1235 co_return {ec, total};
1236 continue;
1237 }
1238
1239 auto n = buffer_copy(dst_bufs, src);
1240 auto [ec] = co_await commit(n);
1241 if(ec)
1242 co_return {ec, total};
1243 bp.consume(n);
1244 total += n;
1245 }
1246
1247 auto [ec] = co_await commit_eof(0);
1248 if(ec)
1249 co_return {ec, total};
1250
1251 co_return {{}, total};
1252 82x }
1253
1254 static_assert(BufferSink<any_buffer_sink>);
1255 static_assert(WriteSink<any_buffer_sink>);
1256
1257 } // namespace capy
1258 } // namespace boost
1259
1260 #endif
1261