not support N:M (prod-cons)

This commit is contained in:
mutouyun 2018-11-27 10:47:23 +08:00
parent db35146542
commit eefea828f0
2 changed files with 26 additions and 82 deletions

View File

@ -18,12 +18,10 @@ namespace circ {
struct alignas(std::max_align_t) elem_array_head { struct alignas(std::max_align_t) elem_array_head {
using ui_t = std::uint8_t; using ui_t = std::uint8_t;
using uc_t = std::uint16_t; using uc_t = std::uint16_t;
using ai_t = std::atomic<ui_t>;
using ac_t = std::atomic<uc_t>; using ac_t = std::atomic<uc_t>;
ac_t cc_ { 0 }; // connection counter, using for broadcast ac_t cc_ { 0 }; // connection counter, using for broadcast
ac_t cr_ { 0 }; // cursor ac_t wt_ { 0 }; // write index
ai_t wt_ { 0 }; // write index
}; };
enum : std::size_t { enum : std::size_t {
@ -36,9 +34,7 @@ enum : std::size_t {
template <std::size_t DataSize> template <std::size_t DataSize>
class elem_array : private elem_array_head { class elem_array : private elem_array_head {
struct head_t { struct head_t {
ac_t rf_; // read flag std::atomic<std::size_t> rc_ { 0 }; // read counter
std::atomic_bool wf_; // write flag
std::atomic_flag acq_; // acquire flag
}; };
public: public:
@ -56,6 +52,7 @@ private:
struct elem_t { struct elem_t {
head_t head_; head_t head_;
byte_t data_[data_size]; byte_t data_[data_size];
elem_t(void) { ::memset(data_, 0, sizeof(data_)); }
}; };
elem_t block_[elem_max]; elem_t block_[elem_max];
@ -80,9 +77,7 @@ private:
} }
public: public:
elem_array(void) { elem_array(void) = default;
::memset(block_, 0, sizeof(block_));
}
~elem_array(void) = delete; ~elem_array(void) = delete;
elem_array(const elem_array&) = delete; elem_array(const elem_array&) = delete;
@ -103,73 +98,28 @@ public:
} }
void* acquire(void) { void* acquire(void) {
elem_t* el; elem_t* el = elem(wt_.load(std::memory_order_acquire));
while (1) {
// searching an available element
el = elem(wt_.fetch_add(1, std::memory_order_acquire));
if (el->head_.acq_.test_and_set(std::memory_order_release)) {
std::this_thread::yield();
continue;
}
// check read finished by all consumers // check read finished by all consumers
while(1) { while(1) {
uc_t expected = 0; std::size_t expected = 0;
std::atomic_thread_fence(std::memory_order_acquire); if (el->head_.rc_.compare_exchange_weak(
if (el->head_.rf_.compare_exchange_weak( expected,
expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) { static_cast<std::size_t>(cc_.load(std::memory_order_relaxed)),
std::memory_order_release)) {
break; break;
} }
std::this_thread::yield(); std::this_thread::yield();
} std::atomic_thread_fence(std::memory_order_acquire);
el->head_.acq_.clear(std::memory_order_release);
break;
} }
return el->data_; return el->data_;
} }
void commit(void* ptr) { void commit(void* /*ptr*/) {
auto el = elem(ptr); // get the commit element wt_.fetch_add(1, std::memory_order_release);
ui_t wi = index_of(el); // get the index of this element (the write index)
do {
bool no_next, cas;
uc_t curr = cr_.load(std::memory_order_consume), next;
do {
next = curr;
if ((no_next = (index_of(curr) != wi)) /* assign & judge */) {
/*
* commit is not the current commit
* set wf_ for the other producer thread which is commiting
* the element matches cr_ could see it has commited
*/
el->head_.wf_.store(true, std::memory_order_release);
}
else {
/*
* commit is the current commit
* so we just increase the cursor & go check the next
*/
++next;
el->head_.wf_.store(false, std::memory_order_release);
}
/*
* it needs to go back and judge again
* when cr_ has been changed by the other producer thread
*/
} while(!(cas = cr_.compare_exchange_weak(curr, next, std::memory_order_acq_rel)) && no_next);
/*
* if compare_exchange failed & !no_next,
* means there is another producer thread updated this commit,
* so in this case we could just return
*/
if (no_next || (!cas/* && !no_next*/)) return;
/*
* check next element has commited or not
*/
} while(el = elem(++wi), el->head_.wf_.exchange(false, std::memory_order_acq_rel));
} }
uc_t cursor(void) const { uc_t cursor(void) const {
return cr_.load(std::memory_order_consume); return wt_.load(std::memory_order_consume);
} }
void* take(uc_t cursor) { void* take(uc_t cursor) {
@ -177,7 +127,7 @@ public:
} }
void put(void* ptr) { void put(void* ptr) {
elem(ptr)->head_.rf_.fetch_sub(1, std::memory_order_release); elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release);
} }
}; };

View File

@ -19,7 +19,6 @@ private slots:
void test_inst(void); void test_inst(void);
void test_prod_cons_1v1(void); void test_prod_cons_1v1(void);
void test_prod_cons_1vN(void); void test_prod_cons_1vN(void);
void test_prod_cons_Nv1(void);
} unit__; } unit__;
#include "test_circ_elem_array.moc" #include "test_circ_elem_array.moc"
@ -34,17 +33,16 @@ void Unit::test_inst(void) {
std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl;
QCOMPARE(static_cast<std::size_t>(cq_t::data_size) , static_cast<std::size_t>(12)); QCOMPARE(static_cast<std::size_t>(cq_t::data_size) , static_cast<std::size_t>(12));
QCOMPARE(static_cast<std::size_t>(cq_t::block_size), static_cast<std::size_t>(4096));
QCOMPARE(sizeof(cq_t), static_cast<std::size_t>(cq_t::block_size + cq_t::head_size)); QCOMPARE(sizeof(cq_t), static_cast<std::size_t>(cq_t::block_size + cq_t::head_size));
cq__ = new cq_t; cq__ = new cq_t;
std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl;
// auto a = cq__->take(1); auto a = cq__->take(1);
// auto b = cq__->take(2); auto b = cq__->take(2);
// QCOMPARE(static_cast<std::size_t>(static_cast<ipc::byte_t*>(b) - QCOMPARE(static_cast<std::size_t>(static_cast<ipc::byte_t*>(b) -
// static_cast<ipc::byte_t*>(a)), static_cast<ipc::byte_t*>(a)),
// static_cast<std::size_t>(cq_t::elem_size)); static_cast<std::size_t>(cq_t::elem_size));
} }
template <int N, int M, int Loops = 1000000> template <int N, int M, int Loops = 1000000>
@ -129,15 +127,11 @@ void test_prod_cons(void) {
} }
void Unit::test_prod_cons_1v1(void) { void Unit::test_prod_cons_1v1(void) {
// test_prod_cons<1, 1>(); test_prod_cons<1, 1>();
} }
void Unit::test_prod_cons_1vN(void) { void Unit::test_prod_cons_1vN(void) {
// test_prod_cons<1, 3>(); test_prod_cons<1, 3>();
}
void Unit::test_prod_cons_Nv1(void) {
test_prod_cons<2, 1>();
} }
} // internal-linkage } // internal-linkage