Adding new mpmc queue#187
Conversation
MaciejKaszynski
commented
Apr 28, 2026
- Adding new queue
License Check Results🚀 The license check job ran with the Bazel command: bazel run --lockfile_mode=error //:license-checkStatus: Click to expand output |
e1bcf7a to
50f3cec
Compare
|
The created documentation from the pull request is available at: docu-html |
There was a problem hiding this comment.
Pull request overview
This PR replaces the existing JobQueue with a new MPMCConcurrentQueue implementation and wires it into the Launch Manager daemon’s process-group worker execution flow.
Changes:
- Introduces
MPMCConcurrentQueue(MPMC ring buffer + semaphore blocking) plus Helgrind annotations and Bazel targets/tests. - Updates
WorkerThread,ProcessGroupManager,Graph, andProcessInfoNodeto enqueue/dequeue work via the new queue API (push/pop/stop). - Removes the old
JobQueueimplementation and adjusts Bazel coverage/test filtering and OSAL source selection.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
src/launch_manager_daemon/src/process_group_manager/workerthread.hpp |
Switches worker thread pool to use MPMCConcurrentQueue and adds stop() API. |
src/launch_manager_daemon/src/process_group_manager/workerthread.cpp |
Implements new pop-driven worker loop and queue stop behavior. |
src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp |
Migrates successor job enqueueing to push(..., kMaxQueueDelay). |
src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp |
Replaces JobQueue type with WorkerQueue alias over MPMCConcurrentQueue. |
src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp |
Constructs the new queue and updates shutdown timeout behavior to stop workers. |
src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp |
Removes old job queue header. |
src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp |
Removes old job queue implementation. |
src/launch_manager_daemon/src/process_group_manager/graph.cpp |
Updates node queuing to push(..., kMaxQueueDelay) and logging. |
src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp |
Adds the new concurrent queue implementation. |
src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp |
Adds optional Helgrind annotation macros. |
src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp |
Adds unit tests for queue correctness and concurrency behaviors. |
src/launch_manager_daemon/src/concurrency/BUILD |
Adds Bazel targets for the queue library and tests (including helgrind/tsan variants). |
src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp |
Minor iterator declaration tweak. |
src/launch_manager_daemon/common/BUILD |
Refactors OSAL source globs to include posix + OS-specific sources. |
src/launch_manager_daemon/BUILD |
Adds dependency on the new concurrency queue library. |
.bazelrc |
Excludes no-coverage tagged tests from coverage runs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp:195
- In
queueTerminationSuccessorJobs(), the retry loop keeps running as long as the graph is in transition, but it doesn't handle non-timeout failures frompush()(e.g.,ConcurrencyErrc::kStopped/kOsError). If the worker queue has been stopped,push()will fail immediately and this loop can spin indefinitely. Handle the error code (retry on timeout, break/abort on stopped/OS error) to avoid a potential hang during shutdown/timeout paths.
if (successor_node->is_included_ && successor_node->dependencies_ > 0U && --successor_node->dependencies_ == 0U)
{
while (graph_->getState() == GraphState::kInTransition)
{
if (graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay))
{
graph_->markNodeInFlight();
break;
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| [[nodiscard]] score::Result<void> stop() noexcept | ||
| { | ||
| m_stopped.store(true, std::memory_order_relaxed); | ||
|
|
||
| // signal to consumers and publishers to wakeup | ||
| if (m_items.post() != osal::OsalReturnType::kSuccess) | ||
| { | ||
| return score::MakeUnexpected(ConcurrencyErrc::kOsError); | ||
| } | ||
|
|
||
| if (m_spaces.post() != osal::OsalReturnType::kSuccess) | ||
| { | ||
| return score::MakeUnexpected(ConcurrencyErrc::kOsError); | ||
| } | ||
|
|
||
| return {}; | ||
| } | ||
|
|
||
| /// @brief Blocks until an item is available or stop() is called. | ||
| /// @details Consumers claim slots via fetch_add on m_head and sleep | ||
| /// inside m_items.wait() when the queue is empty. | ||
| /// When stopped returns std::nullopt. | ||
| /// @param timeout Maximum time to wait for an item. Zero means wait forever. | ||
| /// @return The next item, or error. | ||
| [[nodiscard]] score::Result<T> pop(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) | ||
| { | ||
| const auto wait_result = | ||
| (timeout == std::chrono::milliseconds{0}) ? m_items.wait() : m_items.timedWait(timeout); | ||
|
|
||
| if(wait_result == osal::OsalReturnType::kTimeout) | ||
| { | ||
| return score::MakeUnexpected(ConcurrencyErrc::kTimeout); | ||
| } | ||
| else if (wait_result != osal::OsalReturnType::kSuccess) | ||
| { | ||
| return score::MakeUnexpected(ConcurrencyErrc::kOsError); | ||
| } | ||
|
|
||
| if (m_stopped.load(std::memory_order_relaxed)) | ||
| { | ||
| static_cast<void>(m_items.post()); | ||
| return score::MakeUnexpected(ConcurrencyErrc::kStopped); | ||
| } |
There was a problem hiding this comment.
stop() sets m_stopped using memory_order_relaxed, and push()/pop() also read it with memory_order_relaxed. Because the semaphore wait/post calls are outside the C++ memory model, relaxed ordering can allow the compiler/CPU to reorder such that a woken thread still observes m_stopped == false and continues consuming/producing after stop(). Use at least release on the store in stop() and acquire on the loads in push()/pop() (or an explicit fence) to make the stop flag reliably observable before/after the wakeups.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp:195
- In
queueTerminationSuccessorJobs(), thewhile (graph_->getState() == GraphState::kInTransition)loop retriespush()on any failure. If the worker queue has been stopped (push returnsConcurrencyErrc::kStopped) or another non-timeout error occurs,push()can fail immediately and this becomes a tight infinite loop, potentially deadlocking shutdown/abort paths. Handle thepush()result (e.g., continue only onkTimeout, and break/return onkStoppedor other errors, optionally logging once).
if (successor_node->is_included_ && successor_node->dependencies_ > 0U && --successor_node->dependencies_ == 0U)
{
while (graph_->getState() == GraphState::kInTransition)
{
if (graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay))
{
graph_->markNodeInFlight();
break;
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// @brief Lock-free MPMC ring buffer with semaphore-based blocking. | ||
| /// Producers and consumers each atomically claim independent slots via | ||
| /// fetch_add, so multiple producers and multiple consumers run concurrently. | ||
| /// @warning T must be default-constructible. | ||
| template <typename T, std::size_t Capacity> | ||
| class MPMCConcurrentQueue | ||
| { | ||
| static_assert(Capacity > 0U, "Capacity must be at least 1"); | ||
|
|
||
| static_assert(Capacity <= std::numeric_limits<std::uint32_t>::max(), | ||
| "Capacity exceeds uint32_t range used by the semaphore"); | ||
|
|
||
| static_assert(std::is_default_constructible_v<T>, "T must be default-constructible for in-place slot storage"); | ||
|
|
||
| static_assert(std::is_nothrow_destructible_v<T>, | ||
| "T must be nothrow-destructible to allow consume_slot to be noexcept"); | ||
|
|
||
| static_assert(std::is_nothrow_move_constructible_v<T>, | ||
| "T must be nothrow-move-constructible to wrap into std::optional in pop()"); | ||
|
|
There was a problem hiding this comment.
The template constraints/documentation say only that T must be default-constructible (and nothrow-move-constructible), but push_impl() assigns into an already-constructed slot.item (slot.item = ...). That also requires T to be move-assignable (and copy-assignable for the push(const T&) overload). Consider either tightening the static_asserts / docs to reflect the real requirements, or switching slot storage to placement-new / std::optional<T> so non-assignable-but-move-constructible types work as well.