☎️ rtchan
Realtime-safe channels (lock-free queues) for C++20
Loading...
Searching...
No Matches
mpmc.hpp
1/*
2 * rtchan - Realtime-safe channels via lock-free ringbuffer queues
3 * Copyright (c) 2025 Fawn <rubiefawn@gmail.com>
4 *
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 */
17
18#pragma once
19#include "rtchan.hpp"
20#include "internal/busy_wait_hint.h"
21#include <array>
22#include <bit> // std::has_single_bit() to ensure the buffer length is a power of 2
23#include <utility>
24
25namespace rtchan {
26
30template<typename T, size_t N>
31requires (0 < N) && (std::has_single_bit(N))
32class mpmc : public chan<T> {
33public:
34 auto try_send(T&& item) noexcept -> bool override {
35 size_t index, next_index;
36
37 size_t write_claimed_expected = write_claimed.load(std::memory_order_relaxed);
38 do {
39 size_t last_read_idx = read_committed.load(std::memory_order_relaxed);
40 const ptrdiff_t occupied = write_claimed_expected - last_read_idx;
41 if (occupied >= N) { return false; } // No space remaining
42 index = write_claimed_expected; next_index = write_claimed_expected + 1;
43 } while (!write_claimed.compare_exchange_strong(write_claimed_expected, next_index)); // TODO: Memory order for success/failure
44
45 buf[index_wrapped(index)] = std::move(item);
46
47 size_t write_committed_expected = index;
48 while (!write_committed.compare_exchange_strong(write_committed_expected, next_index)) { // TODO: Memory order for success/failure
49 write_committed_expected = index; // Reset this as the CAS will have changed it
50 busy_wait_hint();
51 }
52
53 return true;
54 }
55
56 auto try_send(span<const T>&& items) noexcept -> span<const T> override {
57 size_t count = 0;
58 size_t index, next_index;
59
60 size_t write_claimed_expected = write_claimed.load(std::memory_order_relaxed);
61 do {
62 size_t last_read_idx = read_committed.load(std::memory_order_relaxed);
63 const ptrdiff_t occupied = write_claimed_expected - last_read_idx;
64 if (occupied >= N) { return items; } // No space remaining
65 count = std::min(items.size(), static_cast<size_t>(N - occupied));
66 index = write_claimed_expected; next_index = write_claimed_expected + count;
67 } while (!write_claimed.compare_exchange_strong(write_claimed_expected, next_index)); // TODO: Memory order for success/failure
68
69 const size_t i = index_wrapped(index);
70 const ptrdiff_t spill = i + count - N;
71 if (spill > 0) {
72 const auto fits = static_cast<size_t>(count - spill);
73 std::memcpy(&buf[i], items.data(), sizeof(T) * fits);
74 std::memcpy(buf.data(), &items[fits], sizeof(T) * spill);
75 } else { std::memcpy(&buf[i], items.data(), sizeof(T) * count); }
76
77 size_t write_committed_expected = index;
78 while (!write_committed.compare_exchange_strong(write_committed_expected, next_index)) { // TODO: Memory order for success/failure
79 write_committed_expected = index; // Reset this as the CAS will have changed it
80 busy_wait_hint();
81 }
82
83 return items.last(items.size() - count);
84 }
85
86 void send(T&& item) noexcept override {
87
88 }
89
90 void send(span<const T>&& items) noexcept override {
91
92 }
93
94 auto try_recv(T& dest) noexcept -> bool override {
95 size_t index, next_index;
96
97 size_t read_claimed_expected = read_claimed.load(std::memory_order_relaxed);
98 do {
99 size_t last_written_idx = write_committed.load(std::memory_order_relaxed);
100 const ptrdiff_t ready = last_written_idx - read_claimed_expected;
101 if (0 >= ready) { return false; } // Nothing available to receive
102 index = read_claimed_expected, next_index = read_claimed_expected + 1;
103 } while (!read_claimed.compare_exchange_strong(read_claimed_expected, next_index)); // TODO: Memory order for success/failure
104
105 dest = std::move(&buf[index_wrapped(index)]);
106
107 size_t read_committed_expected = index;
108 while (!read_committed.compare_exchange_strong(read_committed_expected, next_index)) { // TODO: Memory order for success/failure
109 read_committed_expected = index; // Reset this as the CAS will have changed it
110 busy_wait_hint();
111 }
112
113 return true;
114 }
115
116 auto try_recv(span<T>& dest) noexcept -> span<T> override {
117 size_t count = 0;
118 size_t index, next_index;
119
120 size_t read_claimed_expected = read_claimed.load(std::memory_order_relaxed);
121 do {
122 size_t last_written_idx = write_committed.load(std::memory_order_relaxed);
123 const ptrdiff_t ready = last_written_idx - read_claimed_expected;
124 if (0 >= ready) { return 0; } // Nothing available to receive
125 count = std::min(dest.size(), static_cast<size_t>(ready));
126 index = read_claimed_expected, next_index = read_claimed_expected + count;
127 } while (!read_claimed.compare_exchange_strong(read_claimed_expected, next_index)); // TODO: Memory order for success/failure
128
129 const size_t i = index_wrapped(index);
130 const ptrdiff_t spill = i + count - N;
131 if (spill > 0) {
132 const auto fits = static_cast<size_t>(count - spill);
133 std::memcpy(dest.data(), &buf[i], sizeof(T) * fits);
134 std::memcpy(&dest[fits], buf.data(), sizeof(T) * spill);
135 } else { std::memcpy(dest.data(), &buf[i], sizeof(T) * count); }
136
137 size_t read_committed_expected = index;
138 while (!read_committed.compare_exchange_strong(read_committed_expected, next_index)) { // TODO: Memory order for success/failure
139 read_committed_expected = index; // Reset this as the CAS will have changed it
140 busy_wait_hint();
141 }
142
143 return dest.first(count);
144 }
145
146 void recv(T& dest) noexcept override {
147
148 }
149
150 void recv(span<T>& dest) noexcept override {
151
152 }
153
154 auto try_reserve_send(size_t count) noexcept -> span<T> override {
155
156 }
157
158 auto reserve_send_up_to(size_t count) noexcept -> span<T> override {
159
160 }
161
162 auto reserve_send(size_t count) noexcept -> span<T> override {
163
164 }
165
166 auto try_commit_send(span<const T>& reservation) noexcept -> bool override {
167
168 }
169
170 void commit_send(span<const T>& reservation) noexcept override {
171
172 }
173
174 auto try_reserve_recv(size_t count) noexcept -> span<T> override {
175
176 }
177
178 auto reserve_recv_up_to(size_t count) noexcept -> span<T> override {
179
180 }
181
182 auto reserve_recv(size_t count) noexcept -> span<T> override {
183
184 }
185
186 auto try_commit_recv(span<const T>& reservation) noexcept -> bool override {
187
188 }
189
190 void commit_recv(span<const T>& reservation) noexcept override {
191
192 }
193
194private:
195 static constexpr auto index_wrapped(size_t index) noexcept -> size_t { return index & (N - 1); };
196 std::array<T, N> buf = {};
197 alignas(std::hardware_destructive_interference_size) std::atomic_size_t read_committed = 0;
198 alignas(std::hardware_destructive_interference_size) std::atomic_size_t read_claimed = 0;
199 alignas(std::hardware_destructive_interference_size) std::atomic_size_t write_committed = 0;
200 alignas(std::hardware_destructive_interference_size) std::atomic_size_t write_claimed = 0;
201};
202
203} // namespace rtchan
Abstract common interface for all queues.
Definition rtchan.hpp:52
Multiple-producer, multiple-consumer queue.
Definition mpmc.hpp:32
auto try_recv(T &dest) noexcept -> bool override
Tries to receive an item from the channel, placing it into dest.
Definition mpmc.hpp:94
void send(span< const T > &&items) noexcept override
Sends all items in items over the channel. If the channel cannot fit all of items,...
Definition mpmc.hpp:90
void send(T &&item) noexcept override
Sends item over the channel. If the channel is full, this function will block until space is availabl...
Definition mpmc.hpp:86
auto reserve_send(size_t count) noexcept -> span< T > override
Reserves space for count items in the channel to be manually written into. This function may spin unt...
Definition mpmc.hpp:162
auto try_commit_recv(span< const T > &reservation) noexcept -> bool override
Tries to release the items in reservation, indicating that they are finished being received and are s...
Definition mpmc.hpp:186
void commit_send(span< const T > &reservation) noexcept override
Commits the items in reservation, indicating that they are finished being sent and are ready to be re...
Definition mpmc.hpp:170
void commit_recv(span< const T > &reservation) noexcept override
Releases the items in reservation, indicating that they are finished being received and are safe to b...
Definition mpmc.hpp:190
auto reserve_recv(size_t count) noexcept -> span< T > override
Reserves space for count items in the channel to be manually read from.
Definition mpmc.hpp:182
auto try_recv(span< T > &dest) noexcept -> span< T > override
Tries to receive items up to the capacity of dest from the channel into dest.
Definition mpmc.hpp:116
void recv(T &dest) noexcept override
Receives an item from the channel into dest. If the channel is empty, this function will block until ...
Definition mpmc.hpp:146
auto try_send(span< const T > &&items) noexcept -> span< const T > override
Tries to send as many items in items over the channel as will fit.
Definition mpmc.hpp:56
auto try_send(T &&item) noexcept -> bool override
Tries to send item over the channel, aborting if there is not enough room.
Definition mpmc.hpp:34
auto try_reserve_recv(size_t count) noexcept -> span< T > override
Tries to reserve space for count items in the channel to be manually read from.
Definition mpmc.hpp:174
void recv(span< T > &dest) noexcept override
Receives items equal to the capacity of dest from the channel into dest. If the channel does not have...
Definition mpmc.hpp:150
auto try_reserve_send(size_t count) noexcept -> span< T > override
Tries to reserve space for count items in the channel to be manually written into.
Definition mpmc.hpp:154
auto try_commit_send(span< const T > &reservation) noexcept -> bool override
Tries to commit the items in reservation, indicating that they are finished being sent and are ready ...
Definition mpmc.hpp:166
auto reserve_send_up_to(size_t count) noexcept -> span< T > override
Tries to reserve space for up to count items in the channel to be manually written into.
Definition mpmc.hpp:158
auto reserve_recv_up_to(size_t count) noexcept -> span< T > override
Tries to reserve space for up to count items in the channel to be manually read from.
Definition mpmc.hpp:178