diff --git a/src/circ_elem_array.h b/src/circ_elem_array.h index 2756955..79ec151 100644 --- a/src/circ_elem_array.h +++ b/src/circ_elem_array.h @@ -36,8 +36,9 @@ enum : std::size_t { template class elem_array : private elem_array_head { struct head_t { - ac_t rf_; // read flag - ac_t wf_; // write flag + ac_t rf_; // read flag + std::atomic_bool wf_; // write flag + std::atomic_flag acq_; // acquire flag }; public: @@ -102,18 +103,27 @@ public: } void* acquire(void) { - auto el = elem(wt_.load(std::memory_order_consume)); - // check read finished by all consumers - do { - uc_t expected = 0; - std::atomic_thread_fence(std::memory_order_acquire); - if (el->head_.rf_.compare_exchange_weak( - expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) { - break; + elem_t* el; + while (1) { + // searching an available element + el = elem(wt_.fetch_add(1, std::memory_order_acquire)); + if (el->head_.acq_.test_and_set(std::memory_order_release)) { + std::this_thread::yield(); + continue; } - std::this_thread::yield(); - } while(1); - wt_.fetch_add(1, std::memory_order_release); + // check read finished by all consumers + while(1) { + uc_t expected = 0; + std::atomic_thread_fence(std::memory_order_acquire); + if (el->head_.rf_.compare_exchange_weak( + expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) { + break; + } + std::this_thread::yield(); + } + el->head_.acq_.clear(std::memory_order_release); + break; + } return el->data_; } @@ -131,7 +141,7 @@ public: * set wf_ for the other producer thread which is commiting * the element matches cr_ could see it has commited */ - el->head_.wf_.store(1, std::memory_order_release); + el->head_.wf_.store(true, std::memory_order_release); } else { /* @@ -139,7 +149,7 @@ public: * so we just increase the cursor & go check the next */ ++next; - el->head_.wf_.store(0, std::memory_order_release); + el->head_.wf_.store(false, std::memory_order_release); } /* * it needs to go back and judge again @@ -155,7 +165,7 @@ public: /* * check next element has commited or not */ - } while(el = elem(++wi), el->head_.wf_.exchange(0, std::memory_order_acq_rel)); + } while(el = elem(++wi), el->head_.wf_.exchange(false, std::memory_order_acq_rel)); } uc_t cursor(void) const {