add prod_cons<relat::multi, relat::multi, trans::broadcast>

This commit is contained in:
mutouyun 2019-01-07 19:10:56 +08:00
parent 7813e20a5b
commit 164402b21b
3 changed files with 125 additions and 53 deletions

View File

@ -125,7 +125,7 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
template <typename E, typename F, std::size_t S>
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
detail::u2_t cur_ct, nxt_ct;
for (unsigned k = 0;;) {
while(1) {
cur_ct = ct_.load(std::memory_order_acquire);
if (detail::index_of(nxt_ct = cur_ct + 1) ==
detail::index_of(rd_.load(std::memory_order_relaxed))) {
@ -134,15 +134,15 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
break;
}
ipc::sleep(k);
std::this_thread::yield();
}
std::forward<F>(f)(elem_start + detail::index_of(cur_ct));
for (unsigned k = 0;;) {
while(1) {
auto exp_wt = cur_ct;
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
break;
}
ipc::sleep(k);
std::this_thread::yield();
}
return true;
}
@ -171,13 +171,13 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
if (conn_cnt == 0) return false;
auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
for (unsigned k = 0;;) {
while(1) {
rc_t expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
break;
}
ipc::sleep(k);
std::this_thread::yield();
conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
}
@ -205,6 +205,42 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
}
};
template <>
struct prod_cons<relat::multi, relat::multi, trans::broadcast>
: prod_cons<relat::single, relat::multi, trans::broadcast> {
std::atomic<detail::u2_t> ct_ { 0 }; // commit index
template <typename E, typename F, std::size_t S>
bool push(E* elems, F&& f, detail::elem_t<S>* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
nxt_ct = cur_ct + 1;
auto el = elem_start + detail::index_of(cur_ct);
// check all consumers have finished reading this element
while(1) {
rc_t expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
break;
}
std::this_thread::yield();
conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
}
std::forward<F>(f)(el->data_);
while(1) {
auto exp_wt = cur_ct;
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
break;
}
std::this_thread::yield();
}
return true;
}
};
////////////////////////////////////////////////////////////////
/// element-array implementation
////////////////////////////////////////////////////////////////

View File

@ -57,7 +57,19 @@
namespace ipc {
template <std::size_t N = 8192, typename K>
template <typename K>
inline void yield(K& k) noexcept {
if (k < 4) { /* Do nothing */ }
else
if (k < 16) { IPC_LOCK_PAUSE_(); }
else {
std::this_thread::yield();
return;
}
++k;
}
template <std::size_t N = 4096, typename K>
inline void sleep(K& k) noexcept {
if (k < static_cast<K>(N)) {
std::this_thread::yield();
@ -69,18 +81,6 @@ inline void sleep(K& k) noexcept {
++k;
}
template <std::size_t N = 32, typename K>
inline void yield(K& k) noexcept {
if (k < 4) { /* Do nothing */ }
else
if (k < 16) { IPC_LOCK_PAUSE_(); }
else {
ipc::sleep<N>(k);
return;
}
++k;
}
} // namespace ipc
#pragma pop_macro("IPC_LOCK_PAUSE_")

View File

@ -266,29 +266,39 @@ void Unit::test_prod_cons_1v1() {
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::single,
ipc::circ::trans::unicast>
> el_arr_ss;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ss);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ss);
> el_arr_ssu;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_smn;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smn);
> el_arr_smu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_mmn;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmn);
> el_arr_mmu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
test_prod_cons<1, 1>();
test_prod_cons<1, 1, false>();
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
}
void Unit::test_prod_cons_1v3() {
@ -297,20 +307,30 @@ void Unit::test_prod_cons_1v3() {
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_smn;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn);
> el_arr_smu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_mmn;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmn);
> el_arr_mmu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
test_prod_cons<1, 3>();
test_prod_cons<1, 3, false>();
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb);
}
void Unit::test_prod_cons_performance() {
@ -319,31 +339,47 @@ void Unit::test_prod_cons_performance() {
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_smn;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_smn](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smn);
});
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_mmn;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmn);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, 1, LoopCount, void>(&el_arr_mmn);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmn);
> el_arr_smu;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_smu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [](auto index) {
test_prod_cons<1, decltype(index)::value + 1, false>();
});
test_prod_cons<1, 10>(); // test & verify
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_mmu;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, 1, LoopCount, void>(&el_arr_mmu);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
});
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::multi,
ipc::circ::relat::multi,
ipc::circ::trans::broadcast>
> el_arr_mmb;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, 1, LoopCount, void>(&el_arr_mmb);
});
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
});
}
void Unit::test_queue() {