|
|
|
|
@@ -50,17 +50,17 @@ template <class T> struct ThreadPipeline {
|
|
|
|
|
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1
|
|
|
|
|
// stage-0 worker, 4 stage-1 workers, 2 stage-2 workers)
|
|
|
|
|
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
|
|
|
|
|
: slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1),
|
|
|
|
|
threadState(threadsPerStage.size()), ring(slotCount) {
|
|
|
|
|
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
|
|
|
|
|
threadState(threadsPerStage.size()), ring(slot_count) {
|
|
|
|
|
// Otherwise we can't tell the difference between full and empty.
|
|
|
|
|
assert(!(slotCountMask & 0x80000000));
|
|
|
|
|
assert(!(slot_count_mask & 0x80000000));
|
|
|
|
|
for (size_t i = 0; i < threadsPerStage.size(); ++i) {
|
|
|
|
|
threadState[i] = std::vector<ThreadState>(threadsPerStage[i]);
|
|
|
|
|
for (auto &t : threadState[i]) {
|
|
|
|
|
if (i == 0) {
|
|
|
|
|
t.lastPushRead = std::vector<uint32_t>(1);
|
|
|
|
|
t.last_push_read = std::vector<uint32_t>(1);
|
|
|
|
|
} else {
|
|
|
|
|
t.lastPushRead = std::vector<uint32_t>(threadsPerStage[i - 1]);
|
|
|
|
|
t.last_push_read = std::vector<uint32_t>(threadsPerStage[i - 1]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -187,7 +187,7 @@ template <class T> struct ThreadPipeline {
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
Batch acquireHelper(int stage, int thread, uint32_t maxBatch, bool mayBlock) {
|
|
|
|
|
uint32_t begin = threadState[stage][thread].localPops & slotCountMask;
|
|
|
|
|
uint32_t begin = threadState[stage][thread].local_pops & slot_count_mask;
|
|
|
|
|
uint32_t len = getSafeLen(stage, thread, mayBlock);
|
|
|
|
|
if (maxBatch != 0) {
|
|
|
|
|
len = std::min(len, maxBatch);
|
|
|
|
|
@@ -196,7 +196,7 @@ private:
|
|
|
|
|
return Batch{};
|
|
|
|
|
}
|
|
|
|
|
auto result = Batch{&ring, begin, begin + len};
|
|
|
|
|
threadState[stage][thread].localPops += len;
|
|
|
|
|
threadState[stage][thread].local_pops += len;
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -205,8 +205,8 @@ private:
|
|
|
|
|
// Used for producers to publish
|
|
|
|
|
alignas(128) std::atomic<uint32_t> pushes{0};
|
|
|
|
|
|
|
|
|
|
const uint32_t slotCount;
|
|
|
|
|
const uint32_t slotCountMask;
|
|
|
|
|
const uint32_t slot_count;
|
|
|
|
|
const uint32_t slot_count_mask;
|
|
|
|
|
|
|
|
|
|
// We can safely acquire this many items
|
|
|
|
|
uint32_t getSafeLen(int stage, int threadIndex, bool mayBlock) {
|
|
|
|
|
@@ -214,21 +214,21 @@ private:
|
|
|
|
|
auto &thread = threadState[stage][threadIndex];
|
|
|
|
|
// See if we can determine that there are entries we can acquire entirely
|
|
|
|
|
// from state local to the thread
|
|
|
|
|
for (int i = 0; i < int(thread.lastPushRead.size()); ++i) {
|
|
|
|
|
for (int i = 0; i < int(thread.last_push_read.size()); ++i) {
|
|
|
|
|
auto &lastPush = stage == 0 ? pushes : threadState[stage - 1][i].pops;
|
|
|
|
|
if (thread.lastPushRead[i] == thread.localPops) {
|
|
|
|
|
if (thread.last_push_read[i] == thread.local_pops) {
|
|
|
|
|
// Re-read lastPush with memory order and try again
|
|
|
|
|
thread.lastPushRead[i] = lastPush.load(std::memory_order_acquire);
|
|
|
|
|
if (thread.lastPushRead[i] == thread.localPops) {
|
|
|
|
|
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
|
|
|
|
|
if (thread.last_push_read[i] == thread.local_pops) {
|
|
|
|
|
if (!mayBlock) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
// Wait for lastPush to change and try again
|
|
|
|
|
lastPush.wait(thread.lastPushRead[i], std::memory_order_relaxed);
|
|
|
|
|
thread.lastPushRead[i] = lastPush.load(std::memory_order_acquire);
|
|
|
|
|
lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed);
|
|
|
|
|
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
safeLen = std::min(safeLen, thread.lastPushRead[i] - thread.localPops);
|
|
|
|
|
safeLen = std::min(safeLen, thread.last_push_read[i] - thread.local_pops);
|
|
|
|
|
}
|
|
|
|
|
return safeLen;
|
|
|
|
|
}
|
|
|
|
|
@@ -237,9 +237,9 @@ private:
|
|
|
|
|
// Where this thread has published up to
|
|
|
|
|
alignas(128) std::atomic<uint32_t> pops{0};
|
|
|
|
|
// Where this thread will publish to the next time it publishes
|
|
|
|
|
uint32_t localPops{0};
|
|
|
|
|
uint32_t local_pops{0};
|
|
|
|
|
// Where the previous stage's threads have published up to last we checked
|
|
|
|
|
std::vector<uint32_t> lastPushRead;
|
|
|
|
|
std::vector<uint32_t> last_push_read;
|
|
|
|
|
};
|
|
|
|
|
// threadState[i][j] is the state for thread j in stage i
|
|
|
|
|
std::vector<std::vector<ThreadState>> threadState;
|
|
|
|
|
@@ -252,7 +252,7 @@ public:
|
|
|
|
|
~StageGuard() {
|
|
|
|
|
if (ts != nullptr) {
|
|
|
|
|
// seq_cst so that the notify can't be ordered before the store
|
|
|
|
|
ts->pops.store(localPops, std::memory_order_seq_cst);
|
|
|
|
|
ts->pops.store(local_pops, std::memory_order_seq_cst);
|
|
|
|
|
ts->pops.notify_all();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -260,20 +260,20 @@ public:
|
|
|
|
|
StageGuard(StageGuard const &) = delete;
|
|
|
|
|
StageGuard &operator=(StageGuard const &) = delete;
|
|
|
|
|
StageGuard(StageGuard &&other)
|
|
|
|
|
: batch(other.batch), localPops(other.localPops),
|
|
|
|
|
: batch(other.batch), local_pops(other.local_pops),
|
|
|
|
|
ts(std::exchange(other.ts, nullptr)) {}
|
|
|
|
|
StageGuard &operator=(StageGuard &&other) {
|
|
|
|
|
batch = other.batch;
|
|
|
|
|
localPops = other.localPops;
|
|
|
|
|
local_pops = other.local_pops;
|
|
|
|
|
ts = std::exchange(other.ts, nullptr);
|
|
|
|
|
return *this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
uint32_t localPops;
|
|
|
|
|
uint32_t local_pops;
|
|
|
|
|
friend struct ThreadPipeline;
|
|
|
|
|
StageGuard(Batch batch, ThreadState *ts)
|
|
|
|
|
: batch(batch), localPops(ts->localPops),
|
|
|
|
|
: batch(batch), local_pops(ts->local_pops),
|
|
|
|
|
ts(batch.empty() ? nullptr : ts) {}
|
|
|
|
|
ThreadState *ts;
|
|
|
|
|
};
|
|
|
|
|
@@ -289,13 +289,13 @@ public:
|
|
|
|
|
// implies that all previous slots were also published.
|
|
|
|
|
for (;;) {
|
|
|
|
|
uint32_t p = tp->pushes.load(std::memory_order_acquire);
|
|
|
|
|
if (p == oldSlot) {
|
|
|
|
|
if (p == old_slot) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
tp->pushes.wait(p, std::memory_order_relaxed);
|
|
|
|
|
}
|
|
|
|
|
// Publish. seq_cst so that the notify can't be ordered before the store
|
|
|
|
|
tp->pushes.store(newSlot, std::memory_order_seq_cst);
|
|
|
|
|
tp->pushes.store(new_slot, std::memory_order_seq_cst);
|
|
|
|
|
// We have to notify every time, since we don't know if this is the last
|
|
|
|
|
// push ever
|
|
|
|
|
tp->pushes.notify_all();
|
|
|
|
|
@@ -304,12 +304,12 @@ public:
|
|
|
|
|
private:
|
|
|
|
|
friend struct ThreadPipeline;
|
|
|
|
|
ProducerGuard() : batch(), tp() {}
|
|
|
|
|
ProducerGuard(Batch batch, ThreadPipeline<T> *tp, uint32_t oldSlot,
|
|
|
|
|
uint32_t newSlot)
|
|
|
|
|
: batch(batch), tp(tp), oldSlot(oldSlot), newSlot(newSlot) {}
|
|
|
|
|
ProducerGuard(Batch batch, ThreadPipeline<T> *tp, uint32_t old_slot,
|
|
|
|
|
uint32_t new_slot)
|
|
|
|
|
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
|
|
|
|
|
ThreadPipeline<T> *const tp;
|
|
|
|
|
uint32_t oldSlot;
|
|
|
|
|
uint32_t newSlot;
|
|
|
|
|
uint32_t old_slot;
|
|
|
|
|
uint32_t new_slot;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Acquire a batch of items for processing by a consumer thread.
|
|
|
|
|
@@ -346,7 +346,7 @@ public:
|
|
|
|
|
if (size == 0) {
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
|
if (size > slotCount) {
|
|
|
|
|
if (size > slot_count) {
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
|
// Reserve a slot to construct an item, but don't publish to consumer yet
|
|
|
|
|
@@ -355,11 +355,11 @@ public:
|
|
|
|
|
for (;;) {
|
|
|
|
|
begin_loop:
|
|
|
|
|
slot = slots.load(std::memory_order_relaxed);
|
|
|
|
|
begin = slot & slotCountMask;
|
|
|
|
|
begin = slot & slot_count_mask;
|
|
|
|
|
// Make sure we won't stomp the back of the ring buffer
|
|
|
|
|
for (auto &thread : threadState.back()) {
|
|
|
|
|
uint32_t pops = thread.pops.load(std::memory_order_acquire);
|
|
|
|
|
if (slot + size - pops > slotCount) {
|
|
|
|
|
if (slot + size - pops > slot_count) {
|
|
|
|
|
if (!block) {
|
|
|
|
|
return ProducerGuard{};
|
|
|
|
|
}
|
|
|
|
|
|