summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/common/bounded_threadsafe_queue.h202
1 files changed, 62 insertions, 140 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 0fb2f42d1..bd87aa09b 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -22,52 +22,38 @@ class SPSCQueue {
static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");
public:
- bool TryPush(T&& t) {
- return Push<PushMode::Try>(std::move(t));
- }
-
template <typename... Args>
bool TryEmplace(Args&&... args) {
return Emplace<PushMode::Try>(std::forward<Args>(args)...);
}
- void PushWait(T&& t) {
- Push<PushMode::Wait>(std::move(t));
- }
-
template <typename... Args>
void EmplaceWait(Args&&... args) {
Emplace<PushMode::Wait>(std::forward<Args>(args)...);
}
bool TryPop(T& t) {
- return Pop(t);
+ return Pop<PopMode::Try>(t);
+ }
+
+ void PopWait(T& t) {
+ Pop<PopMode::Wait>(t);
}
void PopWait(T& t, std::stop_token stop_token) {
- ConsumerWait(stop_token);
- Pop(t);
+ Pop<PopMode::WaitWithStopToken>(t, stop_token);
}
- T PopWait(std::stop_token stop_token) {
- ConsumerWait(stop_token);
+ T PopWait() {
T t;
- Pop(t);
+ Pop<PopMode::Wait>(t);
return t;
}
- void Clear() {
- while (!Empty()) {
- Pop();
- }
- }
-
- bool Empty() const {
- return m_read_index.load() == m_write_index.load();
- }
-
- size_t Size() const {
- return m_write_index.load() - m_read_index.load();
+ T PopWait(std::stop_token stop_token) {
+ T t;
+ Pop<PopMode::WaitWithStopToken>(t, stop_token);
+ return t;
}
private:
@@ -77,55 +63,27 @@ private:
Count,
};
- template <PushMode Mode>
- bool Push(T&& t) {
- const size_t write_index = m_write_index.load();
-
- if constexpr (Mode == PushMode::Try) {
- // Check if we have free slots to write to.
- if ((write_index - m_read_index.load()) == Capacity) {
- return false;
- }
- } else if constexpr (Mode == PushMode::Wait) {
- // Wait until we have free slots to write to.
- std::unique_lock lock{producer_cv_mutex};
- producer_cv.wait(lock, [this, write_index] {
- return (write_index - m_read_index.load()) < Capacity;
- });
- } else {
- static_assert(Mode < PushMode::Count, "Invalid PushMode.");
- }
-
- // Determine the position to write to.
- const size_t pos = write_index % Capacity;
-
- // Push into the queue.
- m_data[pos] = std::move(t);
-
- // Increment the write index.
- ++m_write_index;
-
- // Notify the consumer that we have pushed into the queue.
- std::scoped_lock lock{consumer_cv_mutex};
- consumer_cv.notify_one();
-
- return true;
- }
+ enum class PopMode {
+ Try,
+ Wait,
+ WaitWithStopToken,
+ Count,
+ };
template <PushMode Mode, typename... Args>
bool Emplace(Args&&... args) {
- const size_t write_index = m_write_index.load();
+ const size_t write_index = m_write_index.load(std::memory_order::relaxed);
if constexpr (Mode == PushMode::Try) {
// Check if we have free slots to write to.
- if ((write_index - m_read_index.load()) == Capacity) {
+ if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) {
return false;
}
} else if constexpr (Mode == PushMode::Wait) {
// Wait until we have free slots to write to.
std::unique_lock lock{producer_cv_mutex};
producer_cv.wait(lock, [this, write_index] {
- return (write_index - m_read_index.load()) < Capacity;
+ return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity;
});
} else {
static_assert(Mode < PushMode::Count, "Invalid PushMode.");
@@ -147,34 +105,32 @@ private:
return true;
}
- void Pop() {
- const size_t read_index = m_read_index.load();
-
- // Check if the queue is empty.
- if (read_index == m_write_index.load()) {
- return;
- }
-
- // Determine the position to read from.
- const size_t pos = read_index % Capacity;
-
- // Pop the data off the queue, deleting it.
- std::destroy_at(std::addressof(m_data[pos]));
-
- // Increment the read index.
- ++m_read_index;
-
- // Notify the producer that we have popped off the queue.
- std::unique_lock lock{producer_cv_mutex};
- producer_cv.notify_one();
- }
-
- bool Pop(T& t) {
- const size_t read_index = m_read_index.load();
+ template <PopMode Mode>
+ bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) {
+ const size_t read_index = m_read_index.load(std::memory_order::relaxed);
- // Check if the queue is empty.
- if (read_index == m_write_index.load()) {
- return false;
+ if constexpr (Mode == PopMode::Try) {
+ // Check if the queue is empty.
+ if (read_index == m_write_index.load(std::memory_order::acquire)) {
+ return false;
+ }
+ } else if constexpr (Mode == PopMode::Wait) {
+ // Wait until the queue is not empty.
+ std::unique_lock lock{consumer_cv_mutex};
+ consumer_cv.wait(lock, [this, read_index] {
+ return read_index != m_write_index.load(std::memory_order::acquire);
+ });
+ } else if constexpr (Mode == PopMode::WaitWithStopToken) {
+ // Wait until the queue is not empty.
+ std::unique_lock lock{consumer_cv_mutex};
+ Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] {
+ return read_index != m_write_index.load(std::memory_order::acquire);
+ });
+ if (stop_token.stop_requested()) {
+ return false;
+ }
+ } else {
+ static_assert(Mode < PopMode::Count, "Invalid PopMode.");
}
// Determine the position to read from.
@@ -193,11 +149,6 @@ private:
return true;
}
- void ConsumerWait(std::stop_token stop_token) {
- std::unique_lock lock{consumer_cv_mutex};
- Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
- }
-
alignas(128) std::atomic_size_t m_read_index{0};
alignas(128) std::atomic_size_t m_write_index{0};
@@ -212,22 +163,12 @@ private:
template <typename T, size_t Capacity = detail::DefaultCapacity>
class MPSCQueue {
public:
- bool TryPush(T&& t) {
- std::scoped_lock lock{write_mutex};
- return spsc_queue.TryPush(std::move(t));
- }
-
template <typename... Args>
bool TryEmplace(Args&&... args) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryEmplace(std::forward<Args>(args)...);
}
- void PushWait(T&& t) {
- std::scoped_lock lock{write_mutex};
- spsc_queue.PushWait(std::move(t));
- }
-
template <typename... Args>
void EmplaceWait(Args&&... args) {
std::scoped_lock lock{write_mutex};
@@ -238,24 +179,20 @@ public:
return spsc_queue.TryPop(t);
}
- void PopWait(T& t, std::stop_token stop_token) {
- spsc_queue.PopWait(t, stop_token);
+ void PopWait(T& t) {
+ spsc_queue.PopWait(t);
}
- T PopWait(std::stop_token stop_token) {
- return spsc_queue.PopWait(stop_token);
- }
-
- void Clear() {
- spsc_queue.Clear();
+ void PopWait(T& t, std::stop_token stop_token) {
+ spsc_queue.PopWait(t, stop_token);
}
- bool Empty() {
- return spsc_queue.Empty();
+ T PopWait() {
+ return spsc_queue.PopWait();
}
- size_t Size() {
- return spsc_queue.Size();
+ T PopWait(std::stop_token stop_token) {
+ return spsc_queue.PopWait(stop_token);
}
private:
@@ -266,22 +203,12 @@ private:
template <typename T, size_t Capacity = detail::DefaultCapacity>
class MPMCQueue {
public:
- bool TryPush(T&& t) {
- std::scoped_lock lock{write_mutex};
- return spsc_queue.TryPush(std::move(t));
- }
-
template <typename... Args>
bool TryEmplace(Args&&... args) {
std::scoped_lock lock{write_mutex};
return spsc_queue.TryEmplace(std::forward<Args>(args)...);
}
- void PushWait(T&& t) {
- std::scoped_lock lock{write_mutex};
- spsc_queue.PushWait(std::move(t));
- }
-
template <typename... Args>
void EmplaceWait(Args&&... args) {
std::scoped_lock lock{write_mutex};
@@ -293,29 +220,24 @@ public:
return spsc_queue.TryPop(t);
}
- void PopWait(T& t, std::stop_token stop_token) {
- std::scoped_lock lock{read_mutex};
- spsc_queue.PopWait(t, stop_token);
- }
-
- T PopWait(std::stop_token stop_token) {
+ void PopWait(T& t) {
std::scoped_lock lock{read_mutex};
- return spsc_queue.PopWait(stop_token);
+ spsc_queue.PopWait(t);
}
- void Clear() {
+ void PopWait(T& t, std::stop_token stop_token) {
std::scoped_lock lock{read_mutex};
- spsc_queue.Clear();
+ spsc_queue.PopWait(t, stop_token);
}
- bool Empty() {
+ T PopWait() {
std::scoped_lock lock{read_mutex};
- return spsc_queue.Empty();
+ return spsc_queue.PopWait();
}
- size_t Size() {
+ T PopWait(std::stop_token stop_token) {
std::scoped_lock lock{read_mutex};
- return spsc_queue.Size();
+ return spsc_queue.PopWait(stop_token);
}
private: