perf: replace EventArray mutex+cv with atomic mask

set() is now lock-free; waiters block via std::atomic::wait.
This commit is contained in:
MatthewBeshay 2026-04-09 11:27:12 +10:00
parent cd4b39cf88
commit b7792622a9
2 changed files with 76 additions and 34 deletions

View file

@ -376,77 +376,120 @@ std::uint32_t C4JThread::Event::waitForSignal(int timeoutMs) {
}
C4JThread::EventArray::EventArray(int size, Mode mode)
: m_size(size), m_mode(mode), m_mutex(), m_condition(), m_signaledMask(0U) {
: m_size(size), m_mode(mode), m_signaledMask(0U) {
assert(m_size > 0 && m_size <= 32);
}
void C4JThread::EventArray::set(int index) {
assert(index >= 0 && index < m_size);
{
std::lock_guard lock(m_mutex);
m_signaledMask |= (1U << static_cast<std::uint32_t>(index));
}
m_condition.notify_all();
const std::uint32_t bit = 1U << static_cast<std::uint32_t>(index);
m_signaledMask.fetch_or(bit, std::memory_order_release);
m_signaledMask.notify_all();
}
void C4JThread::EventArray::clear(int index) {
assert(index >= 0 && index < m_size);
std::lock_guard lock(m_mutex);
m_signaledMask &= ~(1U << static_cast<std::uint32_t>(index));
const std::uint32_t bit = 1U << static_cast<std::uint32_t>(index);
m_signaledMask.fetch_and(~bit, std::memory_order_relaxed);
}
void C4JThread::EventArray::setAll() {
{
std::lock_guard lock(m_mutex);
m_signaledMask |= buildMaskForSize(m_size);
}
m_condition.notify_all();
const std::uint32_t mask = buildMaskForSize(m_size);
m_signaledMask.fetch_or(mask, std::memory_order_release);
m_signaledMask.notify_all();
}
void C4JThread::EventArray::clearAll() {
std::lock_guard lock(m_mutex);
m_signaledMask = 0U;
m_signaledMask.store(0U, std::memory_order_relaxed);
}
namespace {
// Polling fallback for finite-timeout waits; std::atomic::wait has no timed
// overload until C++26.
template <class Predicate>
bool waitForMaskTimed(std::atomic<std::uint32_t>& mask, int timeoutMs,
Predicate&& predicate) {
using clock = std::chrono::steady_clock;
const auto deadline = clock::now() + std::chrono::milliseconds(timeoutMs);
auto interval = std::chrono::microseconds(100);
constexpr auto maxInterval = std::chrono::milliseconds(2);
while (clock::now() < deadline) {
if (predicate(mask.load(std::memory_order_acquire))) return true;
std::this_thread::sleep_for(interval);
if (interval < maxInterval) interval *= 2;
}
return predicate(mask.load(std::memory_order_acquire));
}
} // namespace
std::uint32_t C4JThread::EventArray::waitForSingle(int index, int timeoutMs) {
assert(index >= 0 && index < m_size);
const std::uint32_t bitMask = 1U << static_cast<std::uint32_t>(index);
std::unique_lock lock(m_mutex);
const auto predicate = [bitMask](std::uint32_t cur) {
return (cur & bitMask) != 0U;
};
if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] {
return (m_signaledMask & bitMask) != 0U;
})) {
if (timeoutMs == kInfiniteTimeout) {
std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire);
while (!predicate(cur)) {
m_signaledMask.wait(cur, std::memory_order_relaxed);
cur = m_signaledMask.load(std::memory_order_acquire);
}
} else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) {
return WaitResult::Timeout;
}
if (m_mode == Mode::AutoClear) m_signaledMask &= ~bitMask;
if (m_mode == Mode::AutoClear) {
m_signaledMask.fetch_and(~bitMask, std::memory_order_release);
}
return WaitResult::Signaled;
}
std::uint32_t C4JThread::EventArray::waitForAll(int timeoutMs) {
const std::uint32_t bitMask = buildMaskForSize(m_size);
std::unique_lock lock(m_mutex);
const auto predicate = [bitMask](std::uint32_t cur) {
return (cur & bitMask) == bitMask;
};
if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] {
return (m_signaledMask & bitMask) == bitMask;
})) {
if (timeoutMs == kInfiniteTimeout) {
std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire);
while (!predicate(cur)) {
m_signaledMask.wait(cur, std::memory_order_relaxed);
cur = m_signaledMask.load(std::memory_order_acquire);
}
} else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) {
return WaitResult::Timeout;
}
if (m_mode == Mode::AutoClear) m_signaledMask &= ~bitMask;
if (m_mode == Mode::AutoClear) {
m_signaledMask.fetch_and(~bitMask, std::memory_order_release);
}
return WaitResult::Signaled;
}
std::uint32_t C4JThread::EventArray::waitForAny(int timeoutMs) {
const std::uint32_t bitMask = buildMaskForSize(m_size);
std::unique_lock lock(m_mutex);
const auto predicate = [bitMask](std::uint32_t cur) {
return (cur & bitMask) != 0U;
};
if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] {
return (m_signaledMask & bitMask) != 0U;
})) {
if (timeoutMs == kInfiniteTimeout) {
std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire);
while (!predicate(cur)) {
m_signaledMask.wait(cur, std::memory_order_relaxed);
cur = m_signaledMask.load(std::memory_order_acquire);
}
} else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) {
return WaitResult::Timeout;
}
const std::uint32_t readyIndex = firstSetBitIndex(m_signaledMask & bitMask);
if (m_mode == Mode::AutoClear) m_signaledMask &= ~(1U << readyIndex);
const std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire);
const std::uint32_t readyIndex = firstSetBitIndex(cur & bitMask);
if (m_mode == Mode::AutoClear) {
m_signaledMask.fetch_and(~(1U << readyIndex), std::memory_order_release);
}
return WaitResult::Signaled + readyIndex;
}

View file

@ -51,6 +51,7 @@ public:
bool m_signaled;
};
// Lock-free bitmask of up to 32 events; waiters block via std::atomic::wait.
class EventArray {
public:
enum class Mode { AutoClear, ManualClear };
@ -68,9 +69,7 @@ public:
private:
int m_size;
Mode m_mode;
std::mutex m_mutex;
std::condition_variable m_condition;
std::uint32_t m_signaledMask;
std::atomic<std::uint32_t> m_signaledMask;
};
class EventQueue {