add impl of force_push for single-single-unicast; make test_circ works

This commit is contained in:
zhangyi 2020-03-29 14:28:53 +08:00
parent 6850b47e3a
commit d762634380
2 changed files with 64 additions and 50 deletions

View File

@ -45,8 +45,22 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
return push(wrapper, std::forward<F>(f), elems);
bool force_push(W* /*wrapper*/, F&& f, E* elems) {
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_acquire);
if (cur_wt != circ::index_of(cur_rd - 1)) {
break;
}
// full
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_acq_rel)) {
break;
}
ipc::yield(k);
}
std::forward<F>(f)(&(elems[cur_wt].data_));
wt_.fetch_add(1, std::memory_order_release);
return true;
}
template <typename W, typename F, typename E>
@ -107,7 +121,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
circ::index_of(rd_.load(std::memory_order_acquire))) {
return false; // full
}
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) {
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_acq_rel)) {
break;
}
ipc::yield(k);
@ -118,7 +132,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
while (1) {
auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
if (cur_ct != wt_.load(std::memory_order_acquire)) {
if (cur_ct != wt_.load(std::memory_order_relaxed)) {
return true;
}
if ((~cac_ct) != cur_ct) {

View File

@ -32,7 +32,6 @@ using cq_t = ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
>;
cq_t* cq__;
bool operator==(msg_t const & m1, msg_t const & m2) {
return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_);
@ -117,28 +116,28 @@ template <std::size_t D, typename P>
struct test_cq<ea_t<D, P>> {
using ca_t = ea_t<D, P>;
using cn_t = decltype(std::declval<ca_t>().connect());
using cr_t = decltype(std::declval<ca_t>().cursor());
typename quit_mode<P>::type quit_ = false;
ca_t* ca_;
cn_t cc_id_;
std::unordered_map<cn_t, cr_t> conns_;
test_cq(ca_t* ca) : ca_(ca) {}
cn_t connect() {
return cc_id_ = ca_->connect();
auto it = conns_.emplace(ca_->connect(), ca_->cursor());
return it.first->first;
}
ca_t* connect_send() {
return ca_;
}
void disconnect(cn_t cc_id) {
ca_->disconnect(cc_id);
}
void disconnect(ca_t* ca) {
ca->disconnect(cc_id_);
}
cn_t connected_id() const noexcept {
return cc_id_;
}
void disconnect(ca_t*) {}
ca_t * elems() noexcept { return ca_; }
ca_t const * elems() const noexcept { return ca_; }
@ -150,10 +149,18 @@ struct test_cq<ea_t<D, P>> {
}
template <typename F>
void recv(cn_t cur, F&& proc) {
void recv(cn_t cn, F&& proc) {
struct que {
cn_t cn_;
// for ca_->pop
cn_t connected_id() const noexcept { return cn_; }
} q { cn };
cr_t& cur = conns_[cn];
while (1) {
msg_t msg;
while (ca_->pop(this, &cur, [&msg](void* p) {
while (ca_->pop(&q, &cur, [&msg](void* p) {
msg = *static_cast<msg_t*>(p);
})) {
if (msg.pid_ < 0) {
@ -167,10 +174,6 @@ struct test_cq<ea_t<D, P>> {
}
}
ca_t* connect_send() {
return ca_;
}
void send(ca_t* ca, msg_t const & msg) {
while (!ca->push(this, [&msg](void* p) {
(*static_cast<msg_t*>(p)) = msg;
@ -238,15 +241,13 @@ class Unit : public TestSuite {
private slots:
void initTestCase();
void cleanupTestCase();
void test_inst();
void test_prod_cons_1v1();
void test_prod_cons_1v3();
void test_prod_cons_performance();
void test_queue();
};
// } unit__;
//};
} unit__;
#include "test_circ.moc"
@ -255,11 +256,6 @@ constexpr int LoopCount = 1000000;
void Unit::initTestCase() {
TestSuite::initTestCase();
cq__ = new cq_t;
}
void Unit::cleanupTestCase() {
delete cq__;
}
void Unit::test_inst() {
@ -270,22 +266,10 @@ void Unit::test_inst() {
QCOMPARE(static_cast<std::size_t>(cq_t::data_size), sizeof(msg_t));
std::cout << "sizeof(ea_t<sizeof(msg_t)>) = " << sizeof(*cq__) << std::endl;
}
template <int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, cq_t, void>>(cq__);
std::cout << "sizeof(ea_t<sizeof(msg_t)>) = " << sizeof(cq_t) << std::endl;
}
void Unit::test_prod_cons_1v1() {
// ea_t<
// sizeof(msg_t),
// pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
// > el_arr_mmb;
// benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
// benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>
@ -307,8 +291,12 @@ void Unit::test_prod_cons_1v1() {
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
test_prod_cons<1, 1>();
test_prod_cons<1, 1, false>();
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_smb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smb);
ea_t<
sizeof(msg_t),
@ -316,6 +304,7 @@ void Unit::test_prod_cons_1v1() {
> el_arr_mmb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb);
}
void Unit::test_prod_cons_1v3() {
@ -333,8 +322,12 @@ void Unit::test_prod_cons_1v3() {
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
test_prod_cons<1, 3>();
test_prod_cons<1, 3, false>();
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_smb);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smb);
ea_t<
sizeof(msg_t),
@ -353,10 +346,17 @@ void Unit::test_prod_cons_performance() {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
});
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<1, decltype(index)::value + 1, false>();
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
ipc::detail::static_for<8>([&el_arr_smb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smb);
});
// test & verify
ipc::detail::static_for<8>([&el_arr_smb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, cq_t>(&el_arr_smb);
});
test_prod_cons<1, 8>(); // test & verify
ea_t<
sizeof(msg_t),
@ -398,7 +398,7 @@ void Unit::test_queue() {
msg_t msg {};
QVERIFY(!queue.pop(msg));
QCOMPARE(msg, (msg_t {}));
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__));
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(cq_t));
ipc::detail::static_for<16>([](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);