TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper exposes two interfaces for producing data:
46 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 : and the @ref WriteSink interface (`write_some`, `write`,
48 : `write_eof`). Choose the interface that matches how your data
49 : is produced:
50 :
51 : @par Choosing an Interface
52 :
53 : Use the **BufferSink** interface when you are a generator that
54 : produces data into externally-provided buffers. The sink owns
55 : the memory; you call @ref prepare to obtain writable buffers,
56 : fill them, then call @ref commit or @ref commit_eof.
57 :
58 : Use the **WriteSink** interface when you already have buffers
59 : containing the data to write:
60 : - If the entire body is available up front, call
61 : @ref write_eof(buffers) to send everything atomically.
62 : - If data arrives incrementally, call @ref write or
63 : @ref write_some in a loop, then @ref write_eof() when done.
64 : Prefer `write` (complete) unless your streaming pattern
65 : benefits from partial writes via `write_some`.
66 :
67 : If the wrapped type only satisfies @ref BufferSink, the
68 : @ref WriteSink operations are provided automatically.
69 :
70 : @par Construction Modes
71 :
72 : - **Owning**: Pass by value to transfer ownership. The wrapper
73 : allocates storage and owns the sink.
74 : - **Reference**: Pass a pointer to wrap without ownership. The
75 : pointed-to sink must outlive this wrapper.
76 :
77 : @par Awaitable Preallocation
78 : The constructor preallocates storage for the type-erased awaitable.
79 : This reserves all virtual address space at server startup
80 : so memory usage can be measured up front, rather than
81 : allocating piecemeal as traffic arrives.
82 :
83 : @par Thread Safety
84 : Not thread-safe. Concurrent operations on the same wrapper
85 : are undefined behavior.
86 :
87 : @par Example
88 : @code
89 : // Owning - takes ownership of the sink
90 : any_buffer_sink abs(some_buffer_sink{args...});
91 :
92 : // Reference - wraps without ownership
93 : some_buffer_sink sink;
94 : any_buffer_sink abs(&sink);
95 :
96 : // BufferSink interface: generate into callee-owned buffers
97 : mutable_buffer arr[16];
98 : auto bufs = abs.prepare(arr);
99 : // Write data into bufs[0..bufs.size())
100 : auto [ec] = co_await abs.commit(bytes_written);
101 : auto [ec2] = co_await abs.commit_eof(0);
102 :
103 : // WriteSink interface: send caller-owned buffers
104 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 : auto [ec4] = co_await abs.write_eof();
106 :
107 : // Or send everything at once
108 : auto [ec5, n2] = co_await abs.write_eof(
109 : make_buffer(body_data));
110 : @endcode
111 :
112 : @see any_buffer_source, BufferSink, WriteSink
113 : */
114 : class any_buffer_sink
115 : {
116 : struct vtable;
117 : struct awaitable_ops;
118 : struct write_awaitable_ops;
119 :
120 : template<BufferSink S>
121 : struct vtable_for_impl;
122 :
123 : // hot-path members first for cache locality
124 : void* sink_ = nullptr;
125 : vtable const* vt_ = nullptr;
126 : void* cached_awaitable_ = nullptr;
127 : awaitable_ops const* active_ops_ = nullptr;
128 : write_awaitable_ops const* active_write_ops_ = nullptr;
129 : void* storage_ = nullptr;
130 :
131 : public:
132 : /** Destructor.
133 :
134 : Destroys the owned sink (if any) and releases the cached
135 : awaitable storage.
136 : */
137 : ~any_buffer_sink();
138 :
139 : /** Construct a default instance.
140 :
141 : Constructs an empty wrapper. Operations on a default-constructed
142 : wrapper result in undefined behavior.
143 : */
144 : any_buffer_sink() = default;
145 :
146 : /** Non-copyable.
147 :
148 : The awaitable cache is per-instance and cannot be shared.
149 : */
150 : any_buffer_sink(any_buffer_sink const&) = delete;
151 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152 :
153 : /** Construct by moving.
154 :
155 : Transfers ownership of the wrapped sink (if owned) and
156 : cached awaitable storage from `other`. After the move, `other` is
157 : in a default-constructed state.
158 :
159 : @param other The wrapper to move from.
160 : */
161 HIT 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : : sink_(std::exchange(other.sink_, nullptr))
163 2 : , vt_(std::exchange(other.vt_, nullptr))
164 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 : , storage_(std::exchange(other.storage_, nullptr))
168 : {
169 2 : }
170 :
171 : /** Assign by moving.
172 :
173 : Destroys any owned sink and releases existing resources,
174 : then transfers ownership from `other`.
175 :
176 : @param other The wrapper to move from.
177 : @return Reference to this wrapper.
178 : */
179 : any_buffer_sink&
180 : operator=(any_buffer_sink&& other) noexcept;
181 :
182 : /** Construct by taking ownership of a BufferSink.
183 :
184 : Allocates storage and moves the sink into this wrapper.
185 : The wrapper owns the sink and will destroy it. If `S` also
186 : satisfies @ref WriteSink, native write operations are
187 : forwarded through the virtual boundary.
188 :
189 : @param s The sink to take ownership of.
190 : */
191 : template<BufferSink S>
192 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 : any_buffer_sink(S s);
194 :
195 : /** Construct by wrapping a BufferSink without ownership.
196 :
197 : Wraps the given sink by pointer. The sink must remain
198 : valid for the lifetime of this wrapper. If `S` also
199 : satisfies @ref WriteSink, native write operations are
200 : forwarded through the virtual boundary.
201 :
202 : @param s Pointer to the sink to wrap.
203 : */
204 : template<BufferSink S>
205 : any_buffer_sink(S* s);
206 :
207 : /** Check if the wrapper contains a valid sink.
208 :
209 : @return `true` if wrapping a sink, `false` if default-constructed
210 : or moved-from.
211 : */
212 : bool
213 26 : has_value() const noexcept
214 : {
215 26 : return sink_ != nullptr;
216 : }
217 :
218 : /** Check if the wrapper contains a valid sink.
219 :
220 : @return `true` if wrapping a sink, `false` if default-constructed
221 : or moved-from.
222 : */
223 : explicit
224 3 : operator bool() const noexcept
225 : {
226 3 : return has_value();
227 : }
228 :
229 : /** Prepare writable buffers.
230 :
231 : Fills the provided span with mutable buffer descriptors
232 : pointing to the underlying sink's internal storage. This
233 : operation is synchronous.
234 :
235 : @param dest Span of mutable_buffer to fill.
236 :
237 : @return A span of filled buffers.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : std::span<mutable_buffer>
243 : prepare(std::span<mutable_buffer> dest);
244 :
245 : /** Commit bytes written to the prepared buffers.
246 :
247 : Commits `n` bytes written to the buffers returned by the
248 : most recent call to @ref prepare. The operation may trigger
249 : underlying I/O.
250 :
251 : @param n The number of bytes to commit.
252 :
253 : @return An awaitable that await-returns `(error_code)`.
254 :
255 : @par Preconditions
256 : The wrapper must contain a valid sink (`has_value() == true`).
257 : */
258 : auto
259 : commit(std::size_t n);
260 :
261 : /** Commit final bytes and signal end-of-stream.
262 :
263 : Commits `n` bytes written to the buffers returned by the
264 : most recent call to @ref prepare and finalizes the sink.
265 : After success, no further operations are permitted.
266 :
267 : @param n The number of bytes to commit.
268 :
269 : @return An awaitable that await-returns `(error_code)`.
270 :
271 : @par Preconditions
272 : The wrapper must contain a valid sink (`has_value() == true`).
273 : */
274 : auto
275 : commit_eof(std::size_t n);
276 :
277 : /** Write some data from a buffer sequence.
278 :
279 : Attempt to write up to `buffer_size( buffers )` bytes from
280 : the buffer sequence to the underlying sink. May consume less
281 : than the full sequence.
282 :
283 : When the wrapped type provides native @ref WriteSink support,
284 : the operation forwards directly. Otherwise it is synthesized
285 : from @ref prepare and @ref commit with a buffer copy.
286 :
287 : @param buffers The buffer sequence to write.
288 :
289 : @return An awaitable that await-returns `(error_code,std::size_t)`.
290 :
291 : @par Preconditions
292 : The wrapper must contain a valid sink (`has_value() == true`).
293 : */
294 : template<ConstBufferSequence CB>
295 : io_task<std::size_t>
296 : write_some(CB buffers);
297 :
298 : /** Write all data from a buffer sequence.
299 :
300 : Writes all data from the buffer sequence to the underlying
301 : sink. This method satisfies the @ref WriteSink concept.
302 :
303 : When the wrapped type provides native @ref WriteSink support,
304 : each window is forwarded directly. Otherwise the data is
305 : copied into the sink via @ref prepare and @ref commit.
306 :
307 : @param buffers The buffer sequence to write.
308 :
309 : @return An awaitable that await-returns `(error_code,std::size_t)`.
310 :
311 : @par Preconditions
312 : The wrapper must contain a valid sink (`has_value() == true`).
313 : */
314 : template<ConstBufferSequence CB>
315 : io_task<std::size_t>
316 : write(CB buffers);
317 :
318 : /** Atomically write data and signal end-of-stream.
319 :
320 : Writes all data from the buffer sequence to the underlying
321 : sink and then signals end-of-stream.
322 :
323 : When the wrapped type provides native @ref WriteSink support,
324 : the final window is sent atomically via the underlying
325 : `write_eof(buffers)`. Otherwise the data is synthesized
326 : through @ref prepare, @ref commit, and @ref commit_eof.
327 :
328 : @param buffers The buffer sequence to write.
329 :
330 : @return An awaitable that await-returns `(error_code,std::size_t)`.
331 :
332 : @par Preconditions
333 : The wrapper must contain a valid sink (`has_value() == true`).
334 : */
335 : template<ConstBufferSequence CB>
336 : io_task<std::size_t>
337 : write_eof(CB buffers);
338 :
339 : /** Signal end-of-stream.
340 :
341 : Indicates that no more data will be written to the sink.
342 : This method satisfies the @ref WriteSink concept.
343 :
344 : When the wrapped type provides native @ref WriteSink support,
345 : the underlying `write_eof()` is called. Otherwise the
346 : operation is implemented as `commit_eof(0)`.
347 :
348 : @return An awaitable that await-returns `(error_code)`.
349 :
350 : @par Preconditions
351 : The wrapper must contain a valid sink (`has_value() == true`).
352 : */
353 : auto
354 : write_eof();
355 :
356 : protected:
357 : /** Rebind to a new sink after move.
358 :
359 : Updates the internal pointer to reference a new sink object.
360 : Used by owning wrappers after move assignment when the owned
361 : object has moved to a new location.
362 :
363 : @param new_sink The new sink to bind to. Must be the same
364 : type as the original sink.
365 :
366 : @note Terminates if called with a sink of different type
367 : than the original.
368 : */
369 : template<BufferSink S>
370 : void
371 : rebind(S& new_sink) noexcept
372 : {
373 : if(vt_ != &vtable_for_impl<S>::value)
374 : std::terminate();
375 : sink_ = &new_sink;
376 : }
377 :
378 : private:
379 : /** Forward a partial write through the vtable.
380 :
381 : Constructs the underlying `write_some` awaitable in
382 : cached storage and returns a type-erased awaitable.
383 : */
384 : auto
385 : write_some_(std::span<const_buffer const> buffers);
386 :
387 : /** Forward a complete write through the vtable.
388 :
389 : Constructs the underlying `write` awaitable in
390 : cached storage and returns a type-erased awaitable.
391 : */
392 : auto
393 : write_(std::span<const_buffer const> buffers);
394 :
395 : /** Forward an atomic write-with-EOF through the vtable.
396 :
397 : Constructs the underlying `write_eof(buffers)` awaitable
398 : in cached storage and returns a type-erased awaitable.
399 : */
400 : auto
401 : write_eof_buffers_(std::span<const_buffer const> buffers);
402 : };
403 :
404 : /** Type-erased ops for awaitables that await-return `io_result<>`. */
405 : struct any_buffer_sink::awaitable_ops
406 : {
407 : bool (*await_ready)(void*);
408 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409 : io_result<> (*await_resume)(void*);
410 : void (*destroy)(void*) noexcept;
411 : };
412 :
413 : /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414 : struct any_buffer_sink::write_awaitable_ops
415 : {
416 : bool (*await_ready)(void*);
417 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418 : io_result<std::size_t> (*await_resume)(void*);
419 : void (*destroy)(void*) noexcept;
420 : };
421 :
422 : struct any_buffer_sink::vtable
423 : {
424 : void (*destroy)(void*) noexcept;
425 : std::span<mutable_buffer> (*do_prepare)(
426 : void* sink,
427 : std::span<mutable_buffer> dest);
428 : std::size_t awaitable_size;
429 : std::size_t awaitable_align;
430 : awaitable_ops const* (*construct_commit_awaitable)(
431 : void* sink,
432 : void* storage,
433 : std::size_t n);
434 : awaitable_ops const* (*construct_commit_eof_awaitable)(
435 : void* sink,
436 : void* storage,
437 : std::size_t n);
438 :
439 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
440 : write_awaitable_ops const* (*construct_write_some_awaitable)(
441 : void* sink,
442 : void* storage,
443 : std::span<const_buffer const> buffers);
444 : write_awaitable_ops const* (*construct_write_awaitable)(
445 : void* sink,
446 : void* storage,
447 : std::span<const_buffer const> buffers);
448 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449 : void* sink,
450 : void* storage,
451 : std::span<const_buffer const> buffers);
452 : awaitable_ops const* (*construct_write_eof_awaitable)(
453 : void* sink,
454 : void* storage);
455 : };
456 :
457 : template<BufferSink S>
458 : struct any_buffer_sink::vtable_for_impl
459 : {
460 : using CommitAwaitable = decltype(std::declval<S&>().commit(
461 : std::size_t{}));
462 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463 : std::size_t{}));
464 :
465 : static void
466 18 : do_destroy_impl(void* sink) noexcept
467 : {
468 18 : static_cast<S*>(sink)->~S();
469 18 : }
470 :
471 : static std::span<mutable_buffer>
472 132 : do_prepare_impl(
473 : void* sink,
474 : std::span<mutable_buffer> dest)
475 : {
476 132 : auto& s = *static_cast<S*>(sink);
477 132 : return s.prepare(dest);
478 : }
479 :
480 : static awaitable_ops const*
481 110 : construct_commit_awaitable_impl(
482 : void* sink,
483 : void* storage,
484 : std::size_t n)
485 : {
486 110 : auto& s = *static_cast<S*>(sink);
487 110 : ::new(storage) CommitAwaitable(s.commit(n));
488 :
489 : static constexpr awaitable_ops ops = {
490 110 : +[](void* p) {
491 110 : return static_cast<CommitAwaitable*>(p)->await_ready();
492 : },
493 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494 1 : return detail::call_await_suspend(
495 1 : static_cast<CommitAwaitable*>(p), h, env);
496 : },
497 110 : +[](void* p) {
498 110 : return static_cast<CommitAwaitable*>(p)->await_resume();
499 : },
500 110 : +[](void* p) noexcept {
501 110 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502 : }
503 : };
504 110 : return &ops;
505 : }
506 :
507 : static awaitable_ops const*
508 71 : construct_commit_eof_awaitable_impl(
509 : void* sink,
510 : void* storage,
511 : std::size_t n)
512 : {
513 71 : auto& s = *static_cast<S*>(sink);
514 71 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515 :
516 : static constexpr awaitable_ops ops = {
517 71 : +[](void* p) {
518 71 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
519 : },
520 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521 1 : return detail::call_await_suspend(
522 1 : static_cast<CommitEofAwaitable*>(p), h, env);
523 : },
524 71 : +[](void* p) {
525 71 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
526 : },
527 71 : +[](void* p) noexcept {
528 71 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529 : }
530 : };
531 71 : return &ops;
532 : }
533 :
534 : static write_awaitable_ops const*
535 7 : construct_write_some_awaitable_impl(
536 : void* sink,
537 : void* storage,
538 : std::span<const_buffer const> buffers)
539 : requires WriteSink<S>
540 : {
541 : using Aw = decltype(std::declval<S&>().write_some(
542 : std::span<const_buffer const>{}));
543 7 : auto& s = *static_cast<S*>(sink);
544 7 : ::new(storage) Aw(s.write_some(buffers));
545 :
546 : static constexpr write_awaitable_ops ops = {
547 7 : +[](void* p) {
548 7 : return static_cast<Aw*>(p)->await_ready();
549 : },
550 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551 1 : return detail::call_await_suspend(
552 1 : static_cast<Aw*>(p), h, env);
553 : },
554 7 : +[](void* p) {
555 7 : return static_cast<Aw*>(p)->await_resume();
556 : },
557 7 : +[](void* p) noexcept {
558 7 : static_cast<Aw*>(p)->~Aw();
559 : }
560 : };
561 7 : return &ops;
562 : }
563 :
564 : static write_awaitable_ops const*
565 15 : construct_write_awaitable_impl(
566 : void* sink,
567 : void* storage,
568 : std::span<const_buffer const> buffers)
569 : requires WriteSink<S>
570 : {
571 : using Aw = decltype(std::declval<S&>().write(
572 : std::span<const_buffer const>{}));
573 15 : auto& s = *static_cast<S*>(sink);
574 15 : ::new(storage) Aw(s.write(buffers));
575 :
576 : static constexpr write_awaitable_ops ops = {
577 15 : +[](void* p) {
578 15 : return static_cast<Aw*>(p)->await_ready();
579 : },
580 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581 1 : return detail::call_await_suspend(
582 1 : static_cast<Aw*>(p), h, env);
583 : },
584 15 : +[](void* p) {
585 15 : return static_cast<Aw*>(p)->await_resume();
586 : },
587 15 : +[](void* p) noexcept {
588 15 : static_cast<Aw*>(p)->~Aw();
589 : }
590 : };
591 15 : return &ops;
592 : }
593 :
594 : static write_awaitable_ops const*
595 13 : construct_write_eof_buffers_awaitable_impl(
596 : void* sink,
597 : void* storage,
598 : std::span<const_buffer const> buffers)
599 : requires WriteSink<S>
600 : {
601 : using Aw = decltype(std::declval<S&>().write_eof(
602 : std::span<const_buffer const>{}));
603 13 : auto& s = *static_cast<S*>(sink);
604 13 : ::new(storage) Aw(s.write_eof(buffers));
605 :
606 : static constexpr write_awaitable_ops ops = {
607 13 : +[](void* p) {
608 13 : return static_cast<Aw*>(p)->await_ready();
609 : },
610 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611 1 : return detail::call_await_suspend(
612 1 : static_cast<Aw*>(p), h, env);
613 : },
614 13 : +[](void* p) {
615 13 : return static_cast<Aw*>(p)->await_resume();
616 : },
617 13 : +[](void* p) noexcept {
618 13 : static_cast<Aw*>(p)->~Aw();
619 : }
620 : };
621 13 : return &ops;
622 : }
623 :
624 : static awaitable_ops const*
625 17 : construct_write_eof_awaitable_impl(
626 : void* sink,
627 : void* storage)
628 : requires WriteSink<S>
629 : {
630 : using Aw = decltype(std::declval<S&>().write_eof());
631 17 : auto& s = *static_cast<S*>(sink);
632 17 : ::new(storage) Aw(s.write_eof());
633 :
634 : static constexpr awaitable_ops ops = {
635 17 : +[](void* p) {
636 17 : return static_cast<Aw*>(p)->await_ready();
637 : },
638 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639 1 : return detail::call_await_suspend(
640 1 : static_cast<Aw*>(p), h, env);
641 : },
642 17 : +[](void* p) {
643 17 : return static_cast<Aw*>(p)->await_resume();
644 : },
645 17 : +[](void* p) noexcept {
646 17 : static_cast<Aw*>(p)->~Aw();
647 : }
648 : };
649 17 : return &ops;
650 : }
651 :
652 : static consteval std::size_t
653 : compute_max_size() noexcept
654 : {
655 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656 : ? sizeof(CommitAwaitable)
657 : : sizeof(CommitEofAwaitable);
658 : if constexpr (WriteSink<S>)
659 : {
660 : using WS = decltype(std::declval<S&>().write_some(
661 : std::span<const_buffer const>{}));
662 : using W = decltype(std::declval<S&>().write(
663 : std::span<const_buffer const>{}));
664 : using WEB = decltype(std::declval<S&>().write_eof(
665 : std::span<const_buffer const>{}));
666 : using WE = decltype(std::declval<S&>().write_eof());
667 :
668 : if(sizeof(WS) > s) s = sizeof(WS);
669 : if(sizeof(W) > s) s = sizeof(W);
670 : if(sizeof(WEB) > s) s = sizeof(WEB);
671 : if(sizeof(WE) > s) s = sizeof(WE);
672 : }
673 : return s;
674 : }
675 :
676 : static consteval std::size_t
677 : compute_max_align() noexcept
678 : {
679 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680 : ? alignof(CommitAwaitable)
681 : : alignof(CommitEofAwaitable);
682 : if constexpr (WriteSink<S>)
683 : {
684 : using WS = decltype(std::declval<S&>().write_some(
685 : std::span<const_buffer const>{}));
686 : using W = decltype(std::declval<S&>().write(
687 : std::span<const_buffer const>{}));
688 : using WEB = decltype(std::declval<S&>().write_eof(
689 : std::span<const_buffer const>{}));
690 : using WE = decltype(std::declval<S&>().write_eof());
691 :
692 : if(alignof(WS) > a) a = alignof(WS);
693 : if(alignof(W) > a) a = alignof(W);
694 : if(alignof(WEB) > a) a = alignof(WEB);
695 : if(alignof(WE) > a) a = alignof(WE);
696 : }
697 : return a;
698 : }
699 :
700 : static consteval vtable
701 : make_vtable() noexcept
702 : {
703 : vtable v{};
704 : v.destroy = &do_destroy_impl;
705 : v.do_prepare = &do_prepare_impl;
706 : v.awaitable_size = compute_max_size();
707 : v.awaitable_align = compute_max_align();
708 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710 : v.construct_write_some_awaitable = nullptr;
711 : v.construct_write_awaitable = nullptr;
712 : v.construct_write_eof_buffers_awaitable = nullptr;
713 : v.construct_write_eof_awaitable = nullptr;
714 :
715 : if constexpr (WriteSink<S>)
716 : {
717 : v.construct_write_some_awaitable =
718 : &construct_write_some_awaitable_impl;
719 : v.construct_write_awaitable =
720 : &construct_write_awaitable_impl;
721 : v.construct_write_eof_buffers_awaitable =
722 : &construct_write_eof_buffers_awaitable_impl;
723 : v.construct_write_eof_awaitable =
724 : &construct_write_eof_awaitable_impl;
725 : }
726 : return v;
727 : }
728 :
729 : static constexpr vtable value = make_vtable();
730 : };
731 :
732 : inline
733 218 : any_buffer_sink::~any_buffer_sink()
734 : {
735 218 : if(storage_)
736 : {
737 17 : vt_->destroy(sink_);
738 17 : ::operator delete(storage_);
739 : }
740 218 : if(cached_awaitable_)
741 211 : ::operator delete(cached_awaitable_);
742 218 : }
743 :
744 : inline any_buffer_sink&
745 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746 : {
747 5 : if(this != &other)
748 : {
749 4 : if(storage_)
750 : {
751 1 : vt_->destroy(sink_);
752 1 : ::operator delete(storage_);
753 : }
754 4 : if(cached_awaitable_)
755 2 : ::operator delete(cached_awaitable_);
756 4 : sink_ = std::exchange(other.sink_, nullptr);
757 4 : vt_ = std::exchange(other.vt_, nullptr);
758 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759 4 : storage_ = std::exchange(other.storage_, nullptr);
760 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
761 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762 : }
763 5 : return *this;
764 : }
765 :
766 : template<BufferSink S>
767 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768 19 : any_buffer_sink::any_buffer_sink(S s)
769 19 : : vt_(&vtable_for_impl<S>::value)
770 : {
771 : struct guard {
772 : any_buffer_sink* self;
773 : bool committed = false;
774 19 : ~guard() {
775 19 : if(!committed && self->storage_) {
776 : // sink_ is null if the sink move-ctor threw before
777 : // the placement-new assigned it.
778 1 : if(self->sink_)
779 MIS 0 : self->vt_->destroy(self->sink_);
780 HIT 1 : ::operator delete(self->storage_);
781 1 : self->storage_ = nullptr;
782 1 : self->sink_ = nullptr;
783 : }
784 19 : }
785 19 : } g{this};
786 :
787 19 : storage_ = ::operator new(sizeof(S));
788 19 : sink_ = ::new(storage_) S(std::move(s));
789 :
790 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
791 :
792 18 : g.committed = true;
793 19 : }
794 :
795 : template<BufferSink S>
796 195 : any_buffer_sink::any_buffer_sink(S* s)
797 195 : : sink_(s)
798 195 : , vt_(&vtable_for_impl<S>::value)
799 : {
800 195 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
801 195 : }
802 :
803 : inline std::span<mutable_buffer>
804 132 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
805 : {
806 132 : return vt_->do_prepare(sink_, dest);
807 : }
808 :
809 : inline auto
810 110 : any_buffer_sink::commit(std::size_t n)
811 : {
812 : struct awaitable
813 : {
814 : any_buffer_sink* self_;
815 : std::size_t n_;
816 :
817 : bool
818 110 : await_ready()
819 : {
820 220 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
821 110 : self_->sink_,
822 110 : self_->cached_awaitable_,
823 : n_);
824 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
825 : }
826 :
827 : std::coroutine_handle<>
828 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
829 : {
830 1 : return self_->active_ops_->await_suspend(
831 1 : self_->cached_awaitable_, h, env);
832 : }
833 :
834 : io_result<>
835 110 : await_resume()
836 : {
837 : struct guard {
838 : any_buffer_sink* self;
839 110 : ~guard() {
840 110 : self->active_ops_->destroy(self->cached_awaitable_);
841 110 : self->active_ops_ = nullptr;
842 110 : }
843 110 : } g{self_};
844 110 : return self_->active_ops_->await_resume(
845 193 : self_->cached_awaitable_);
846 110 : }
847 : };
848 110 : return awaitable{this, n};
849 : }
850 :
851 : inline auto
852 55 : any_buffer_sink::commit_eof(std::size_t n)
853 : {
854 : struct awaitable
855 : {
856 : any_buffer_sink* self_;
857 : std::size_t n_;
858 :
859 : bool
860 55 : await_ready()
861 : {
862 110 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
863 55 : self_->sink_,
864 55 : self_->cached_awaitable_,
865 : n_);
866 55 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
867 : }
868 :
869 : std::coroutine_handle<>
870 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
871 : {
872 1 : return self_->active_ops_->await_suspend(
873 1 : self_->cached_awaitable_, h, env);
874 : }
875 :
876 : io_result<>
877 55 : await_resume()
878 : {
879 : struct guard {
880 : any_buffer_sink* self;
881 55 : ~guard() {
882 55 : self->active_ops_->destroy(self->cached_awaitable_);
883 55 : self->active_ops_ = nullptr;
884 55 : }
885 55 : } g{self_};
886 55 : return self_->active_ops_->await_resume(
887 94 : self_->cached_awaitable_);
888 55 : }
889 : };
890 55 : return awaitable{this, n};
891 : }
892 :
893 : inline auto
894 7 : any_buffer_sink::write_some_(
895 : std::span<const_buffer const> buffers)
896 : {
897 : struct awaitable
898 : {
899 : any_buffer_sink* self_;
900 : std::span<const_buffer const> buffers_;
901 :
902 : bool
903 7 : await_ready() const noexcept
904 : {
905 7 : return false;
906 : }
907 :
908 : std::coroutine_handle<>
909 7 : await_suspend(std::coroutine_handle<> h, io_env const* env)
910 : {
911 14 : self_->active_write_ops_ =
912 14 : self_->vt_->construct_write_some_awaitable(
913 7 : self_->sink_,
914 7 : self_->cached_awaitable_,
915 : buffers_);
916 :
917 7 : if(self_->active_write_ops_->await_ready(
918 7 : self_->cached_awaitable_))
919 6 : return h;
920 :
921 1 : return self_->active_write_ops_->await_suspend(
922 1 : self_->cached_awaitable_, h, env);
923 : }
924 :
925 : io_result<std::size_t>
926 7 : await_resume()
927 : {
928 : struct guard {
929 : any_buffer_sink* self;
930 7 : ~guard() {
931 7 : self->active_write_ops_->destroy(
932 7 : self->cached_awaitable_);
933 7 : self->active_write_ops_ = nullptr;
934 7 : }
935 7 : } g{self_};
936 7 : return self_->active_write_ops_->await_resume(
937 12 : self_->cached_awaitable_);
938 7 : }
939 : };
940 7 : return awaitable{this, buffers};
941 : }
942 :
943 : inline auto
944 15 : any_buffer_sink::write_(
945 : std::span<const_buffer const> buffers)
946 : {
947 : struct awaitable
948 : {
949 : any_buffer_sink* self_;
950 : std::span<const_buffer const> buffers_;
951 :
952 : bool
953 15 : await_ready() const noexcept
954 : {
955 15 : return false;
956 : }
957 :
958 : std::coroutine_handle<>
959 15 : await_suspend(std::coroutine_handle<> h, io_env const* env)
960 : {
961 30 : self_->active_write_ops_ =
962 30 : self_->vt_->construct_write_awaitable(
963 15 : self_->sink_,
964 15 : self_->cached_awaitable_,
965 : buffers_);
966 :
967 15 : if(self_->active_write_ops_->await_ready(
968 15 : self_->cached_awaitable_))
969 14 : return h;
970 :
971 1 : return self_->active_write_ops_->await_suspend(
972 1 : self_->cached_awaitable_, h, env);
973 : }
974 :
975 : io_result<std::size_t>
976 15 : await_resume()
977 : {
978 : struct guard {
979 : any_buffer_sink* self;
980 15 : ~guard() {
981 15 : self->active_write_ops_->destroy(
982 15 : self->cached_awaitable_);
983 15 : self->active_write_ops_ = nullptr;
984 15 : }
985 15 : } g{self_};
986 15 : return self_->active_write_ops_->await_resume(
987 26 : self_->cached_awaitable_);
988 15 : }
989 : };
990 15 : return awaitable{this, buffers};
991 : }
992 :
993 : inline auto
994 13 : any_buffer_sink::write_eof_buffers_(
995 : std::span<const_buffer const> buffers)
996 : {
997 : struct awaitable
998 : {
999 : any_buffer_sink* self_;
1000 : std::span<const_buffer const> buffers_;
1001 :
1002 : bool
1003 13 : await_ready() const noexcept
1004 : {
1005 13 : return false;
1006 : }
1007 :
1008 : std::coroutine_handle<>
1009 13 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1010 : {
1011 26 : self_->active_write_ops_ =
1012 26 : self_->vt_->construct_write_eof_buffers_awaitable(
1013 13 : self_->sink_,
1014 13 : self_->cached_awaitable_,
1015 : buffers_);
1016 :
1017 13 : if(self_->active_write_ops_->await_ready(
1018 13 : self_->cached_awaitable_))
1019 12 : return h;
1020 :
1021 1 : return self_->active_write_ops_->await_suspend(
1022 1 : self_->cached_awaitable_, h, env);
1023 : }
1024 :
1025 : io_result<std::size_t>
1026 13 : await_resume()
1027 : {
1028 : struct guard {
1029 : any_buffer_sink* self;
1030 13 : ~guard() {
1031 13 : self->active_write_ops_->destroy(
1032 13 : self->cached_awaitable_);
1033 13 : self->active_write_ops_ = nullptr;
1034 13 : }
1035 13 : } g{self_};
1036 13 : return self_->active_write_ops_->await_resume(
1037 22 : self_->cached_awaitable_);
1038 13 : }
1039 : };
1040 13 : return awaitable{this, buffers};
1041 : }
1042 :
1043 : template<ConstBufferSequence CB>
1044 : io_task<std::size_t>
1045 23 : any_buffer_sink::write_some(CB buffers)
1046 : {
1047 : buffer_param<CB> bp(buffers);
1048 : auto src = bp.data();
1049 : if(src.empty())
1050 : co_return {{}, 0};
1051 :
1052 : // Native WriteSink path
1053 : if(vt_->construct_write_some_awaitable)
1054 : co_return co_await write_some_(src);
1055 :
1056 : // Synthesized path: prepare + buffer_copy + commit
1057 : mutable_buffer arr[detail::max_iovec_];
1058 : auto dst_bufs = prepare(arr);
1059 : if(dst_bufs.empty())
1060 : {
1061 : auto [ec] = co_await commit(0);
1062 : if(ec)
1063 : co_return {ec, 0};
1064 : dst_bufs = prepare(arr);
1065 : if(dst_bufs.empty())
1066 : co_return {{}, 0};
1067 : }
1068 :
1069 : auto n = buffer_copy(dst_bufs, src);
1070 : auto [ec] = co_await commit(n);
1071 : if(ec)
1072 : co_return {ec, 0};
1073 : co_return {{}, n};
1074 46 : }
1075 :
1076 : template<ConstBufferSequence CB>
1077 : io_task<std::size_t>
1078 39 : any_buffer_sink::write(CB buffers)
1079 : {
1080 : buffer_param<CB> bp(buffers);
1081 : std::size_t total = 0;
1082 :
1083 : // Native WriteSink path
1084 : if(vt_->construct_write_awaitable)
1085 : {
1086 : for(;;)
1087 : {
1088 : auto bufs = bp.data();
1089 : if(bufs.empty())
1090 : break;
1091 :
1092 : auto [ec, n] = co_await write_(bufs);
1093 : total += n;
1094 : if(ec)
1095 : co_return {ec, total};
1096 : bp.consume(n);
1097 : }
1098 : co_return {{}, total};
1099 : }
1100 :
1101 : // Synthesized path: prepare + buffer_copy + commit
1102 : for(;;)
1103 : {
1104 : auto src = bp.data();
1105 : if(src.empty())
1106 : break;
1107 :
1108 : mutable_buffer arr[detail::max_iovec_];
1109 : auto dst_bufs = prepare(arr);
1110 : if(dst_bufs.empty())
1111 : {
1112 : auto [ec] = co_await commit(0);
1113 : if(ec)
1114 : co_return {ec, total};
1115 : continue;
1116 : }
1117 :
1118 : auto n = buffer_copy(dst_bufs, src);
1119 : auto [ec] = co_await commit(n);
1120 : if(ec)
1121 : co_return {ec, total};
1122 : bp.consume(n);
1123 : total += n;
1124 : }
1125 :
1126 : co_return {{}, total};
1127 78 : }
1128 :
1129 : inline auto
1130 33 : any_buffer_sink::write_eof()
1131 : {
1132 : struct awaitable
1133 : {
1134 : any_buffer_sink* self_;
1135 :
1136 : bool
1137 33 : await_ready()
1138 : {
1139 33 : if(self_->vt_->construct_write_eof_awaitable)
1140 : {
1141 : // Native WriteSink: forward to underlying write_eof()
1142 34 : self_->active_ops_ =
1143 17 : self_->vt_->construct_write_eof_awaitable(
1144 17 : self_->sink_,
1145 17 : self_->cached_awaitable_);
1146 : }
1147 : else
1148 : {
1149 : // Synthesized: commit_eof(0)
1150 32 : self_->active_ops_ =
1151 16 : self_->vt_->construct_commit_eof_awaitable(
1152 16 : self_->sink_,
1153 16 : self_->cached_awaitable_,
1154 : 0);
1155 : }
1156 66 : return self_->active_ops_->await_ready(
1157 33 : self_->cached_awaitable_);
1158 : }
1159 :
1160 : std::coroutine_handle<>
1161 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1162 : {
1163 1 : return self_->active_ops_->await_suspend(
1164 1 : self_->cached_awaitable_, h, env);
1165 : }
1166 :
1167 : io_result<>
1168 33 : await_resume()
1169 : {
1170 : struct guard {
1171 : any_buffer_sink* self;
1172 33 : ~guard() {
1173 33 : self->active_ops_->destroy(self->cached_awaitable_);
1174 33 : self->active_ops_ = nullptr;
1175 33 : }
1176 33 : } g{self_};
1177 33 : return self_->active_ops_->await_resume(
1178 56 : self_->cached_awaitable_);
1179 33 : }
1180 : };
1181 33 : return awaitable{this};
1182 : }
1183 :
1184 : template<ConstBufferSequence CB>
1185 : io_task<std::size_t>
1186 41 : any_buffer_sink::write_eof(CB buffers)
1187 : {
1188 : // Native WriteSink path
1189 : if(vt_->construct_write_eof_buffers_awaitable)
1190 : {
1191 : const_buffer_param<CB> bp(buffers);
1192 : std::size_t total = 0;
1193 :
1194 : for(;;)
1195 : {
1196 : auto bufs = bp.data();
1197 : if(bufs.empty())
1198 : {
1199 : auto [ec] = co_await write_eof();
1200 : co_return {ec, total};
1201 : }
1202 :
1203 : if(!bp.more())
1204 : {
1205 : // Last window: send atomically with EOF
1206 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1207 : total += n;
1208 : co_return {ec, total};
1209 : }
1210 :
1211 : auto [ec, n] = co_await write_(bufs);
1212 : total += n;
1213 : if(ec)
1214 : co_return {ec, total};
1215 : bp.consume(n);
1216 : }
1217 : }
1218 :
1219 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1220 : buffer_param<CB> bp(buffers);
1221 : std::size_t total = 0;
1222 :
1223 : for(;;)
1224 : {
1225 : auto src = bp.data();
1226 : if(src.empty())
1227 : break;
1228 :
1229 : mutable_buffer arr[detail::max_iovec_];
1230 : auto dst_bufs = prepare(arr);
1231 : if(dst_bufs.empty())
1232 : {
1233 : auto [ec] = co_await commit(0);
1234 : if(ec)
1235 : co_return {ec, total};
1236 : continue;
1237 : }
1238 :
1239 : auto n = buffer_copy(dst_bufs, src);
1240 : auto [ec] = co_await commit(n);
1241 : if(ec)
1242 : co_return {ec, total};
1243 : bp.consume(n);
1244 : total += n;
1245 : }
1246 :
1247 : auto [ec] = co_await commit_eof(0);
1248 : if(ec)
1249 : co_return {ec, total};
1250 :
1251 : co_return {{}, total};
1252 82 : }
1253 :
1254 : static_assert(BufferSink<any_buffer_sink>);
1255 : static_assert(WriteSink<any_buffer_sink>);
1256 :
1257 : } // namespace capy
1258 : } // namespace boost
1259 :
1260 : #endif
|