use spin lock for multi-producers

This commit is contained in:
mutouyun 2018-11-30 11:38:50 +08:00
parent 9540842ba7
commit b3e2c80fc0
2 changed files with 52 additions and 21 deletions

View File

@ -18,6 +18,8 @@ struct alignas(std::max_align_t) elem_array_head {
ac_t cc_ { 0 }; // connection counter, using for broadcast ac_t cc_ { 0 }; // connection counter, using for broadcast
ac_t wt_ { 0 }; // write index ac_t wt_ { 0 }; // write index
std::atomic_flag lc_ ATOMIC_FLAG_INIT; // write spin lock flag
}; };
enum : std::size_t { enum : std::size_t {
@ -91,7 +93,10 @@ public:
} }
void* acquire(void) { void* acquire(void) {
elem_t* el = elem(wt_.load(std::memory_order_acquire)); while (lc_.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
}
elem_t* el = elem(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading // check all consumers have finished reading
while(1) { while(1) {
std::uint32_t expected = 0; std::uint32_t expected = 0;
@ -108,7 +113,8 @@ public:
} }
void commit(void* /*ptr*/) { void commit(void* /*ptr*/) {
wt_.fetch_add(1, std::memory_order_release); wt_.fetch_add(1, std::memory_order_relaxed);
lc_.clear(std::memory_order_release);
} }
uc_t cursor(void) const { uc_t cursor(void) const {

View File

@ -24,6 +24,7 @@ private slots:
void test_inst(void); void test_inst(void);
void test_prod_cons_1v1(void); void test_prod_cons_1v1(void);
void test_prod_cons_1v3(void); void test_prod_cons_1v3(void);
void test_prod_cons_3v1(void);
void test_prod_cons_performance(void); void test_prod_cons_performance(void);
void test_queue(void); void test_queue(void);
@ -82,18 +83,18 @@ struct test_stopwatch {
} }
}; };
template <bool Confirmation> template <bool V>
struct test_confirm { struct test_verify {
std::unordered_map<int, std::vector<int>>* list_; std::unordered_map<int, std::vector<int>>* list_;
int lcount_; int lcount_;
test_confirm(int M) { test_verify(int M) {
list_ = new std::remove_reference_t<decltype(*list_)>[ list_ = new std::remove_reference_t<decltype(*list_)>[
static_cast<std::size_t>(lcount_ = M) static_cast<std::size_t>(lcount_ = M)
]; ];
} }
~test_confirm(void) { ~test_verify(void) {
delete [] list_; delete [] list_;
} }
@ -106,7 +107,7 @@ struct test_confirm {
} }
void verify(int N, int Loops) { void verify(int N, int Loops) {
std::cout << "confirming..." << std::endl; std::cout << "verifying..." << std::endl;
for (int m = 0; m < lcount_; ++m) { for (int m = 0; m < lcount_; ++m) {
auto& cons_vec = list_[m]; auto& cons_vec = list_[m];
for (int n = 0; n < N; ++n) { for (int n = 0; n < N; ++n) {
@ -123,8 +124,8 @@ struct test_confirm {
}; };
template <> template <>
struct test_confirm<false> { struct test_verify<false> {
test_confirm (int) {} test_verify (int) {}
void prepare (void*) {} void prepare (void*) {}
void push_data(int, msg_t const &) {} void push_data(int, msg_t const &) {}
void verify (int, int) {} void verify (int, int) {}
@ -227,7 +228,7 @@ struct test_cq<ipc::circ::queue<T>> {
} }
}; };
template <int N, int M, bool C = true, int Loops = 1000000, typename T> template <int N, int M, bool V = true, int Loops = 1000000, typename T>
void test_prod_cons(T* cq) { void test_prod_cons(T* cq) {
test_cq<T> tcq { cq }; test_cq<T> tcq { cq };
@ -236,21 +237,21 @@ void test_prod_cons(T* cq) {
std::atomic_int fini { 0 }; std::atomic_int fini { 0 };
test_stopwatch sw; test_stopwatch sw;
test_confirm<C> cf { M }; test_verify<V> vf { M };
int cid = 0; int cid = 0;
for (auto& t : consumers) { for (auto& t : consumers) {
t = std::thread{[&, cid] { t = std::thread{[&, cid] {
cf.prepare(&t); vf.prepare(&t);
auto cn = tcq.connect(); auto cn = tcq.connect();
using namespace std::placeholders; using namespace std::placeholders;
tcq.recv(cn, std::bind(&test_confirm<C>::push_data, &cf, cid, _1)); tcq.recv(cn, std::bind(&test_verify<V>::push_data, &vf, cid, _1));
tcq.disconnect(cn); tcq.disconnect(cn);
if (++fini != std::extent<decltype(consumers)>::value) return; if (++fini != std::extent<decltype(consumers)>::value) return;
sw.print_elapsed(N, M, Loops); sw.print_elapsed(N, M, Loops);
cf.verify(N, Loops); vf.verify(N, Loops);
}}; }};
++cid; ++cid;
} }
@ -275,9 +276,9 @@ void test_prod_cons(T* cq) {
for (auto& t : consumers) t.join(); for (auto& t : consumers) t.join();
} }
template <int N, int M, bool C = true, int Loops = 1000000> template <int N, int M, bool V = true, int Loops = 1000000>
void test_prod_cons(void) { void test_prod_cons(void) {
test_prod_cons<N, M, C, Loops>(cq__); test_prod_cons<N, M, V, Loops>(cq__);
} }
void Unit::test_prod_cons_1v1(void) { void Unit::test_prod_cons_1v1(void) {
@ -288,23 +289,45 @@ void Unit::test_prod_cons_1v3(void) {
test_prod_cons<1, 3>(); test_prod_cons<1, 3>();
} }
void Unit::test_prod_cons_3v1(void) {
test_prod_cons<3, 1>();
}
template <int B, int E> template <int B, int E>
struct test_performance { struct test_cons_performance {
static void start(void) { static void start(void) {
test_prod_cons<1, B, false>(); test_prod_cons<1, B, false>();
test_performance<B + 1, E>::start(); test_cons_performance<B + 1, E>::start();
} }
}; };
template <int E> template <int E>
struct test_performance<E, E> { struct test_cons_performance<E, E> {
static void start(void) { static void start(void) {
test_prod_cons<1, E, false>(); test_prod_cons<1, E, false>();
} }
}; };
template <int B, int E>
struct test_prod_performance {
static void start(void) {
test_prod_cons<B, 1, false>();
test_prod_performance<B + 1, E>::start();
}
};
template <int E>
struct test_prod_performance<E, E> {
static void start(void) {
test_prod_cons<E, 1, false>();
}
};
void Unit::test_prod_cons_performance(void) { void Unit::test_prod_cons_performance(void) {
test_performance<1, 10>::start(); test_cons_performance<1, 10>::start();
test_prod_performance<1, 10>::start();
test_prod_cons<3, 3, false>(); // just test
test_prod_cons<5, 5>(); // test & verify
} }
void Unit::test_queue(void) { void Unit::test_queue(void) {
@ -318,6 +341,8 @@ void Unit::test_queue(void) {
QVERIFY(queue.detach() != nullptr); QVERIFY(queue.detach() != nullptr);
test_prod_cons<1, 3>((ipc::circ::queue<msg_t>*)nullptr); test_prod_cons<1, 3>((ipc::circ::queue<msg_t>*)nullptr);
test_prod_cons<3, 1>((ipc::circ::queue<msg_t>*)nullptr);
test_prod_cons<3, 3>((ipc::circ::queue<msg_t>*)nullptr);
} }
} // internal-linkage } // internal-linkage