diff --git a/src/ThreadPipeline.h b/src/ThreadPipeline.h index a1c45ad..422a6c9 100644 --- a/src/ThreadPipeline.h +++ b/src/ThreadPipeline.h @@ -116,7 +116,7 @@ private: // We can safely acquire this many items uint32_t getSafeLen(int stage, int threadIndex, bool mayBlock) { - uint32_t safeLen = -1; + uint32_t safeLen = UINT32_MAX; auto &thread = threadState[stage][threadIndex]; // See if we can determine that there are entries we can acquire entirely // from state local to the thread @@ -141,9 +141,9 @@ private: struct alignas(128) ThreadState { // Where this thread has published up to - std::atomic pops; + std::atomic pops{0}; // Where this thread will publish to the next time it publishes - uint32_t localPops; + uint32_t localPops{0}; // Where the previous stage's threads have published up to last we checked std::vector lastPushRead; }; @@ -230,6 +230,11 @@ public: // `block` is true, then this call will block if the queue is full, until the // queue is not full. Otherwise it will return an empty batch if the queue is // full. + // + // Preconditions: + // - size > 0 (must request at least one slot) + // - size <= slotCount (cannot request more slots than ring buffer capacity) + // Violating preconditions results in program termination via abort(). [[nodiscard]] ProducerGuard push(uint32_t const size, bool block) { if (size == 0) { abort();