20#include "internal/busy_wait_hint.h"
30template<
typename T,
size_t N>
31requires (0 < N) && (std::has_single_bit(N))
34 auto try_send(T&& item)
noexcept ->
bool override {
35 size_t index, next_index;
37 size_t write_claimed_expected = write_claimed.load(std::memory_order_relaxed);
39 size_t last_read_idx = read_committed.load(std::memory_order_relaxed);
40 const ptrdiff_t occupied = write_claimed_expected - last_read_idx;
41 if (occupied >= N) {
return false; }
42 index = write_claimed_expected; next_index = write_claimed_expected + 1;
43 }
while (!write_claimed.compare_exchange_strong(write_claimed_expected, next_index));
45 buf[index_wrapped(index)] = std::move(item);
47 size_t write_committed_expected = index;
48 while (!write_committed.compare_exchange_strong(write_committed_expected, next_index)) {
49 write_committed_expected = index;
56 auto try_send(span<const T>&& items)
noexcept -> span<const T>
override {
58 size_t index, next_index;
60 size_t write_claimed_expected = write_claimed.load(std::memory_order_relaxed);
62 size_t last_read_idx = read_committed.load(std::memory_order_relaxed);
63 const ptrdiff_t occupied = write_claimed_expected - last_read_idx;
64 if (occupied >= N) {
return items; }
65 count = std::min(items.size(),
static_cast<size_t>(N - occupied));
66 index = write_claimed_expected; next_index = write_claimed_expected + count;
67 }
while (!write_claimed.compare_exchange_strong(write_claimed_expected, next_index));
69 const size_t i = index_wrapped(index);
70 const ptrdiff_t spill = i + count - N;
72 const auto fits =
static_cast<size_t>(count - spill);
73 std::memcpy(&buf[i], items.data(),
sizeof(T) * fits);
74 std::memcpy(buf.data(), &items[fits],
sizeof(T) * spill);
75 }
else { std::memcpy(&buf[i], items.data(),
sizeof(T) * count); }
77 size_t write_committed_expected = index;
78 while (!write_committed.compare_exchange_strong(write_committed_expected, next_index)) {
79 write_committed_expected = index;
83 return items.last(items.size() - count);
86 void send(T&& item)
noexcept override {
90 void send(span<const T>&& items)
noexcept override {
94 auto try_recv(T& dest)
noexcept ->
bool override {
95 size_t index, next_index;
97 size_t read_claimed_expected = read_claimed.load(std::memory_order_relaxed);
99 size_t last_written_idx = write_committed.load(std::memory_order_relaxed);
100 const ptrdiff_t ready = last_written_idx - read_claimed_expected;
101 if (0 >= ready) {
return false; }
102 index = read_claimed_expected, next_index = read_claimed_expected + 1;
103 }
while (!read_claimed.compare_exchange_strong(read_claimed_expected, next_index));
105 dest = std::move(&buf[index_wrapped(index)]);
107 size_t read_committed_expected = index;
108 while (!read_committed.compare_exchange_strong(read_committed_expected, next_index)) {
109 read_committed_expected = index;
116 auto try_recv(span<T>& dest)
noexcept -> span<T>
override {
118 size_t index, next_index;
120 size_t read_claimed_expected = read_claimed.load(std::memory_order_relaxed);
122 size_t last_written_idx = write_committed.load(std::memory_order_relaxed);
123 const ptrdiff_t ready = last_written_idx - read_claimed_expected;
124 if (0 >= ready) {
return 0; }
125 count = std::min(dest.size(),
static_cast<size_t>(ready));
126 index = read_claimed_expected, next_index = read_claimed_expected + count;
127 }
while (!read_claimed.compare_exchange_strong(read_claimed_expected, next_index));
129 const size_t i = index_wrapped(index);
130 const ptrdiff_t spill = i + count - N;
132 const auto fits =
static_cast<size_t>(count - spill);
133 std::memcpy(dest.data(), &buf[i],
sizeof(T) * fits);
134 std::memcpy(&dest[fits], buf.data(),
sizeof(T) * spill);
135 }
else { std::memcpy(dest.data(), &buf[i],
sizeof(T) * count); }
137 size_t read_committed_expected = index;
138 while (!read_committed.compare_exchange_strong(read_committed_expected, next_index)) {
139 read_committed_expected = index;
143 return dest.first(count);
146 void recv(T& dest)
noexcept override {
150 void recv(span<T>& dest)
noexcept override {
195 static constexpr auto index_wrapped(
size_t index)
noexcept ->
size_t {
return index & (N - 1); };
196 std::array<T, N> buf = {};
197 alignas(std::hardware_destructive_interference_size) std::atomic_size_t read_committed = 0;
198 alignas(std::hardware_destructive_interference_size) std::atomic_size_t read_claimed = 0;
199 alignas(std::hardware_destructive_interference_size) std::atomic_size_t write_committed = 0;
200 alignas(std::hardware_destructive_interference_size) std::atomic_size_t write_claimed = 0;
Abstract common interface for all queues.
Definition rtchan.hpp:52
Multiple-producer, multiple-consumer queue.
Definition mpmc.hpp:32
auto try_recv(T &dest) noexcept -> bool override
Tries to receive an item from the channel, placing it into dest.
Definition mpmc.hpp:94
void send(span< const T > &&items) noexcept override
Sends all items in items over the channel. If the channel cannot fit all of items,...
Definition mpmc.hpp:90
void send(T &&item) noexcept override
Sends item over the channel. If the channel is full, this function will block until space is availabl...
Definition mpmc.hpp:86
auto reserve_send(size_t count) noexcept -> span< T > override
Reserves space for count items in the channel to be manually written into. This function may spin unt...
Definition mpmc.hpp:162
auto try_commit_recv(span< const T > &reservation) noexcept -> bool override
Tries to release the items in reservation, indicating that they are finished being received and are s...
Definition mpmc.hpp:186
void commit_send(span< const T > &reservation) noexcept override
Commits the items in reservation, indicating that they are finished being sent and are ready to be re...
Definition mpmc.hpp:170
void commit_recv(span< const T > &reservation) noexcept override
Releases the items in reservation, indicating that they are finished being received and are safe to b...
Definition mpmc.hpp:190
auto reserve_recv(size_t count) noexcept -> span< T > override
Reserves space for count items in the channel to be manually read from.
Definition mpmc.hpp:182
auto try_recv(span< T > &dest) noexcept -> span< T > override
Tries to receive items up to the capacity of dest from the channel into dest.
Definition mpmc.hpp:116
void recv(T &dest) noexcept override
Receives an item from the channel into dest. If the channel is empty, this function will block until ...
Definition mpmc.hpp:146
auto try_send(span< const T > &&items) noexcept -> span< const T > override
Tries to send as many items in items over the channel as will fit.
Definition mpmc.hpp:56
auto try_send(T &&item) noexcept -> bool override
Tries to send item over the channel, aborting if there is not enough room.
Definition mpmc.hpp:34
auto try_reserve_recv(size_t count) noexcept -> span< T > override
Tries to reserve space for count items in the channel to be manually read from.
Definition mpmc.hpp:174
void recv(span< T > &dest) noexcept override
Receives items equal to the capacity of dest from the channel into dest. If the channel does not have...
Definition mpmc.hpp:150
auto try_reserve_send(size_t count) noexcept -> span< T > override
Tries to reserve space for count items in the channel to be manually written into.
Definition mpmc.hpp:154
auto try_commit_send(span< const T > &reservation) noexcept -> bool override
Tries to commit the items in reservation, indicating that they are finished being sent and are ready ...
Definition mpmc.hpp:166
auto reserve_send_up_to(size_t count) noexcept -> span< T > override
Tries to reserve space for up to count items in the channel to be manually written into.
Definition mpmc.hpp:158
auto reserve_recv_up_to(size_t count) noexcept -> span< T > override
Tries to reserve space for up to count items in the channel to be manually read from.
Definition mpmc.hpp:178