add prod_cons<relat::multi, relat::multi, trans::unicast>

This commit is contained in:
mutouyun 2019-01-05 16:09:56 +08:00
parent ea52920b7d
commit 8e104ded0f
2 changed files with 48 additions and 6 deletions

View File

@ -73,7 +73,7 @@ struct prod_cons<relat::single, relat::single, trans::unicast> {
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) { bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
auto cur_wt = detail::index_of(wt_.load(std::memory_order_acquire)); auto cur_wt = detail::index_of(wt_.load(std::memory_order_acquire));
if (cur_wt == detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) { if (cur_wt == detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
return false; return false; // full
} }
std::forward<F>(f)(elem_start + cur_wt); std::forward<F>(f)(elem_start + cur_wt);
wt_.fetch_add(1, std::memory_order_release); wt_.fetch_add(1, std::memory_order_release);
@ -84,7 +84,7 @@ struct prod_cons<relat::single, relat::single, trans::unicast> {
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept { bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
auto cur_rd = detail::index_of(rd_.load(std::memory_order_acquire)); auto cur_rd = detail::index_of(rd_.load(std::memory_order_acquire));
if (cur_rd == detail::index_of(wt_.load(std::memory_order_relaxed))) { if (cur_rd == detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false; return false; // empty
} }
std::forward<F>(f)(elem_start + cur_rd); std::forward<F>(f)(elem_start + cur_rd);
rd_.fetch_add(1, std::memory_order_release); rd_.fetch_add(1, std::memory_order_release);
@ -103,7 +103,7 @@ struct prod_cons<relat::single, relat::multi, trans::unicast>
auto cur_rd = rd_.load(std::memory_order_acquire); auto cur_rd = rd_.load(std::memory_order_acquire);
if (detail::index_of(cur_rd) == if (detail::index_of(cur_rd) ==
detail::index_of(wt_.load(std::memory_order_relaxed))) { detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false; return false; // empty
} }
std::memcpy(buff, elem_start + detail::index_of(cur_rd), sizeof(buff)); std::memcpy(buff, elem_start + detail::index_of(cur_rd), sizeof(buff));
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
@ -115,6 +115,38 @@ struct prod_cons<relat::single, relat::multi, trans::unicast>
} }
}; };
template <>
struct prod_cons<relat::multi, relat::multi, trans::unicast>
: prod_cons<relat::single, relat::multi, trans::unicast> {
std::atomic<detail::u2_t> ct_ { 0 }; // commit index
template <typename E, typename F, std::size_t S>
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
detail::u2_t cur_ct, nxt_ct;
while (1) {
cur_ct = ct_.load(std::memory_order_acquire);
if (detail::index_of(nxt_ct = cur_ct + 1) ==
detail::index_of(rd_.load(std::memory_order_relaxed))) {
return false; // full
}
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
break;
}
std::this_thread::yield();
}
std::forward<F>(f)(elem_start + detail::index_of(cur_ct));
while (1) {
auto exp_wt = cur_ct;
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
break;
}
std::this_thread::yield();
}
return true;
}
};
template <> template <>
struct prod_cons<relat::single, relat::multi, trans::broadcast> { struct prod_cons<relat::single, relat::multi, trans::broadcast> {
std::atomic<detail::u2_t> wt_ { 0 }; // write index std::atomic<detail::u2_t> wt_ { 0 }; // write index

View File

@ -64,9 +64,8 @@ struct test_verify<cq_t> {
} }
}; };
template <> template <ipc::circ::relat Rp>
struct test_verify<ipc::circ::prod_cons< struct test_verify<ipc::circ::prod_cons<Rp,
ipc::circ::relat::single,
ipc::circ::relat::multi, ipc::circ::relat::multi,
ipc::circ::trans::unicast> ipc::circ::trans::unicast>
> : test_verify<cq_t> { > : test_verify<cq_t> {
@ -344,6 +343,17 @@ void Unit::test_prod_cons_1v3() {
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn); benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn); benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn);
ipc::circ::elems_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_mmn;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmn);
benchmark_prod_cons<3, 3, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn);
benchmark_prod_cons<3, 3, LoopCount, void>(&el_arr_mmn);
ipc::circ::elems_array< ipc::circ::elems_array<
sizeof(msg_t), sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::single, ipc::circ::prod_cons<ipc::circ::relat::single,