diff --git a/targets/platform/C4JThread.cpp b/targets/platform/C4JThread.cpp index a0838a2fe..4a577ff02 100644 --- a/targets/platform/C4JThread.cpp +++ b/targets/platform/C4JThread.cpp @@ -376,77 +376,120 @@ std::uint32_t C4JThread::Event::waitForSignal(int timeoutMs) { } C4JThread::EventArray::EventArray(int size, Mode mode) - : m_size(size), m_mode(mode), m_mutex(), m_condition(), m_signaledMask(0U) { + : m_size(size), m_mode(mode), m_signaledMask(0U) { assert(m_size > 0 && m_size <= 32); } void C4JThread::EventArray::set(int index) { assert(index >= 0 && index < m_size); - { - std::lock_guard lock(m_mutex); - m_signaledMask |= (1U << static_cast(index)); - } - m_condition.notify_all(); + const std::uint32_t bit = 1U << static_cast(index); + m_signaledMask.fetch_or(bit, std::memory_order_release); + m_signaledMask.notify_all(); } void C4JThread::EventArray::clear(int index) { assert(index >= 0 && index < m_size); - std::lock_guard lock(m_mutex); - m_signaledMask &= ~(1U << static_cast(index)); + const std::uint32_t bit = 1U << static_cast(index); + m_signaledMask.fetch_and(~bit, std::memory_order_relaxed); } void C4JThread::EventArray::setAll() { - { - std::lock_guard lock(m_mutex); - m_signaledMask |= buildMaskForSize(m_size); - } - m_condition.notify_all(); + const std::uint32_t mask = buildMaskForSize(m_size); + m_signaledMask.fetch_or(mask, std::memory_order_release); + m_signaledMask.notify_all(); } void C4JThread::EventArray::clearAll() { - std::lock_guard lock(m_mutex); - m_signaledMask = 0U; + m_signaledMask.store(0U, std::memory_order_relaxed); } +namespace { + +// Polling fallback for finite-timeout waits; std::atomic::wait has no timed +// overload until C++26. +template +bool waitForMaskTimed(std::atomic& mask, int timeoutMs, + Predicate&& predicate) { + using clock = std::chrono::steady_clock; + const auto deadline = clock::now() + std::chrono::milliseconds(timeoutMs); + auto interval = std::chrono::microseconds(100); + constexpr auto maxInterval = std::chrono::milliseconds(2); + while (clock::now() < deadline) { + if (predicate(mask.load(std::memory_order_acquire))) return true; + std::this_thread::sleep_for(interval); + if (interval < maxInterval) interval *= 2; + } + return predicate(mask.load(std::memory_order_acquire)); +} + +} // namespace + std::uint32_t C4JThread::EventArray::waitForSingle(int index, int timeoutMs) { assert(index >= 0 && index < m_size); const std::uint32_t bitMask = 1U << static_cast(index); - std::unique_lock lock(m_mutex); + const auto predicate = [bitMask](std::uint32_t cur) { + return (cur & bitMask) != 0U; + }; - if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] { - return (m_signaledMask & bitMask) != 0U; - })) { + if (timeoutMs == kInfiniteTimeout) { + std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire); + while (!predicate(cur)) { + m_signaledMask.wait(cur, std::memory_order_relaxed); + cur = m_signaledMask.load(std::memory_order_acquire); + } + } else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) { return WaitResult::Timeout; } - if (m_mode == Mode::AutoClear) m_signaledMask &= ~bitMask; + + if (m_mode == Mode::AutoClear) { + m_signaledMask.fetch_and(~bitMask, std::memory_order_release); + } return WaitResult::Signaled; } std::uint32_t C4JThread::EventArray::waitForAll(int timeoutMs) { const std::uint32_t bitMask = buildMaskForSize(m_size); - std::unique_lock lock(m_mutex); + const auto predicate = [bitMask](std::uint32_t cur) { + return (cur & bitMask) == bitMask; + }; - if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] { - return (m_signaledMask & bitMask) == bitMask; - })) { + if (timeoutMs == kInfiniteTimeout) { + std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire); + while (!predicate(cur)) { + m_signaledMask.wait(cur, std::memory_order_relaxed); + cur = m_signaledMask.load(std::memory_order_acquire); + } + } else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) { return WaitResult::Timeout; } - if (m_mode == Mode::AutoClear) m_signaledMask &= ~bitMask; + + if (m_mode == Mode::AutoClear) { + m_signaledMask.fetch_and(~bitMask, std::memory_order_release); + } return WaitResult::Signaled; } std::uint32_t C4JThread::EventArray::waitForAny(int timeoutMs) { const std::uint32_t bitMask = buildMaskForSize(m_size); - std::unique_lock lock(m_mutex); + const auto predicate = [bitMask](std::uint32_t cur) { + return (cur & bitMask) != 0U; + }; - if (!waitForCondition(m_condition, lock, timeoutMs, [this, bitMask] { - return (m_signaledMask & bitMask) != 0U; - })) { + if (timeoutMs == kInfiniteTimeout) { + std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire); + while (!predicate(cur)) { + m_signaledMask.wait(cur, std::memory_order_relaxed); + cur = m_signaledMask.load(std::memory_order_acquire); + } + } else if (!waitForMaskTimed(m_signaledMask, timeoutMs, predicate)) { return WaitResult::Timeout; } - const std::uint32_t readyIndex = firstSetBitIndex(m_signaledMask & bitMask); - if (m_mode == Mode::AutoClear) m_signaledMask &= ~(1U << readyIndex); + const std::uint32_t cur = m_signaledMask.load(std::memory_order_acquire); + const std::uint32_t readyIndex = firstSetBitIndex(cur & bitMask); + if (m_mode == Mode::AutoClear) { + m_signaledMask.fetch_and(~(1U << readyIndex), std::memory_order_release); + } return WaitResult::Signaled + readyIndex; } diff --git a/targets/platform/C4JThread.h b/targets/platform/C4JThread.h index 54e08f0f2..ee6d3ded8 100644 --- a/targets/platform/C4JThread.h +++ b/targets/platform/C4JThread.h @@ -51,6 +51,7 @@ public: bool m_signaled; }; + // Lock-free bitmask of up to 32 events; waiters block via std::atomic::wait. class EventArray { public: enum class Mode { AutoClear, ManualClear }; @@ -68,9 +69,7 @@ public: private: int m_size; Mode m_mode; - std::mutex m_mutex; - std::condition_variable m_condition; - std::uint32_t m_signaledMask; + std::atomic m_signaledMask; }; class EventQueue {