fix bugs; optimize (TBD); update performance.xlsx

This commit is contained in:
mutouyun 2019-03-20 18:26:41 +08:00
parent 0d0b1e0a48
commit 4049e78c32
6 changed files with 114 additions and 57 deletions

Binary file not shown.

View File

@ -146,7 +146,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
{ que, msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length) }, {} { que, msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length) }, {}
}; };
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length); std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length);
if (!que->push(msg)) return false; while (!que->push(msg)) std::this_thread::yield();
} }
// if remain > 0, this is the last message fragment // if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset; int remain = static_cast<int>(size) - offset;
@ -156,7 +156,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
}; };
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
static_cast<std::size_t>(remain)); static_cast<std::size_t>(remain));
if (!que->push(msg)) return false; while (!que->push(msg)) std::this_thread::yield();
} }
return true; return true;
} }

View File

@ -182,17 +182,12 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
bool push(W* wrapper, F&& f, E<DataSize>* elems) { bool push(W* wrapper, F&& f, E<DataSize>* elems) {
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false; if (conn_cnt == 0) return false;
auto el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element // check all consumers have finished reading this element
while (1) { rc_t expected = 0;
rc_t expected = 0; if (!el->rc_.compare_exchange_strong(
if (el->rc_.compare_exchange_weak( expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) { return false; // full
break;
}
std::this_thread::yield();
conn_cnt = wrapper->conn_count(); // acquire
if (conn_cnt == 0) return false;
} }
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
wt_.fetch_add(1, std::memory_order_release); wt_.fetch_add(1, std::memory_order_release);
@ -202,7 +197,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize> template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>* elems) { bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>* elems) {
if (cur == cursor()) return false; // acquire if (cur == cursor()) return false; // acquire
auto el = elems + circ::index_of(cur++); auto* el = elems + circ::index_of(cur++);
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
rc_t cur_rc = el->rc_.load(std::memory_order_acquire); rc_t cur_rc = el->rc_.load(std::memory_order_acquire);
@ -219,38 +214,89 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
}; };
template <> template <>
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
: prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
using rc_t = std::uint64_t;
using flag_t = std::uint64_t;
enum : rc_t {
rc_mask = 0x00000000ffffffffull,
rc_incr = 0x0000000100000000ull
};
template <std::size_t DataSize>
struct elem_t {
byte_t data_[DataSize] {};
std::atomic<rc_t > rc_ { 0 }; // read-counter
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
};
alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
circ::u2_t cursor() const noexcept {
return ct_.load(std::memory_order_acquire);
}
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize> template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
bool push(W* wrapper, F&& f, E<DataSize>* elems) { bool push(W* wrapper, F&& f, E<DataSize>* elems) {
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); E<DataSize>* el;
if (conn_cnt == 0) return false; circ::u2_t cur_ct, nxt_ct;
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire), for (unsigned k = 0;;) {
nxt_ct = cur_ct + 1; auto cc = wrapper->conn_count(std::memory_order_relaxed);
auto el = elems + circ::index_of(cur_ct); if (cc == 0) {
// check all consumers have finished reading this element return false; // no reader
while (1) { }
rc_t expected = 0; el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc & rc_mask) {
return false; // full
}
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if ((cur_fl != cur_ct) && cur_fl) {
return false; // full
}
// (cur_rc & rc_mask) should == 0 here
if (el->rc_.compare_exchange_weak( if (el->rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) { cur_rc, static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) {
break; break;
} }
std::this_thread::yield(); ipc::yield(k);
conn_cnt = wrapper->conn_count(); // acquire
if (conn_cnt == 0) return false;
} }
// only one thread/process would touch here at one time
ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release);
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
while (1) { // set flag & try update wt
auto exp_wt = cur_ct; el->f_ct_.store(~static_cast<flag_t>(cur_ct));
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { return true;
}
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize, std::size_t N>
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>(& elems)[N]) {
auto* el = elems + circ::index_of(cur);
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if (cur_fl != ~static_cast<flag_t>(cur)) {
return false; // empty
}
++cur;
std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) {
auto cur_rc = el->rc_.load(std::memory_order_acquire);
switch (cur_rc & rc_mask) {
case 0:
el->f_ct_.store(cur + N - 1, std::memory_order_release);
return true;
case 1:
el->f_ct_.store(cur + N - 1, std::memory_order_release);
[[fallthrough]];
default:
if (el->rc_.compare_exchange_weak(
cur_rc, cur_rc + rc_incr - 1, std::memory_order_release)) {
return true;
}
break; break;
} }
std::this_thread::yield(); ipc::yield(k);
} }
return true;
} }
}; };

View File

@ -81,29 +81,31 @@ public:
} }
template <typename Elems> template <typename Elems>
std::size_t connect(Elems* elems) { auto connect(Elems* elems)
if (elems == nullptr) return invalid_value; -> std::tuple<bool, decltype(std::declval<Elems>().cursor())> {
if (elems == nullptr) return {};
if (connected_) { if (connected_) {
// if it's already connected, just return an error count // if it's already connected, just return false
return invalid_value; return {};
} }
connected_ = true; connected_ = true;
auto ret = elems->connect(); elems->connect();
auto ret = std::make_tuple(true, elems->cursor());
cc_waiter_.broadcast(); cc_waiter_.broadcast();
return ret; return ret;
} }
template <typename Elems> template <typename Elems>
std::size_t disconnect(Elems* elems) { bool disconnect(Elems* elems) {
if (elems == nullptr) return invalid_value; if (elems == nullptr) return false;
if (!connected_) { if (!connected_) {
// if it's already disconnected, just return an error count // if it's already disconnected, just return false
return invalid_value; return false;
} }
connected_ = false; connected_ = false;
auto ret = elems->disconnect(); elems->disconnect();
cc_waiter_.broadcast(); cc_waiter_.broadcast();
return ret; return true;
} }
template <typename Elems> template <typename Elems>
@ -155,11 +157,16 @@ public:
return elems_; return elems_;
} }
std::size_t connect() { bool connect() {
return base_t::connect(elems_); auto tp = base_t::connect(elems_);
if (std::get<0>(tp)) {
cursor_ = std::get<1>(tp);
return true;
}
return false;
} }
std::size_t disconnect() { bool disconnect() {
return base_t::disconnect(elems_); return base_t::disconnect(elems_);
} }
@ -186,9 +193,6 @@ public:
base_t::close(old); base_t::close(old);
} }
else base_t::open(elems_, name); else base_t::open(elems_, name);
if (elems_ != nullptr) {
cursor_ = elems_->cursor();
}
return old; return old;
} }

View File

@ -183,12 +183,12 @@ struct test_cq<ipc::queue<T...>> {
cn_t* connect() { cn_t* connect() {
cn_t* queue = new cn_t { ca_ }; cn_t* queue = new cn_t { ca_ };
[&] { QVERIFY(queue->connect() != ipc::invalid_value); } (); [&] { QVERIFY(queue->connect()); } ();
return queue; return queue;
} }
void disconnect(cn_t* queue) { void disconnect(cn_t* queue) {
QVERIFY(queue->disconnect() != ipc::invalid_value); QVERIFY(queue->disconnect());
QVERIFY(queue->detach() != nullptr); QVERIFY(queue->detach() != nullptr);
delete queue; delete queue;
} }
@ -241,7 +241,7 @@ private slots:
#include "test_circ.moc" #include "test_circ.moc"
constexpr int LoopCount = 1000000; constexpr int LoopCount = 10000000;
//constexpr int LoopCount = 1000/*0000*/; //constexpr int LoopCount = 1000/*0000*/;
void Unit::initTestCase() { void Unit::initTestCase() {
@ -270,6 +270,13 @@ void test_prod_cons() {
} }
void Unit::test_prod_cons_1v1() { void Unit::test_prod_cons_1v1() {
// ea_t<
// sizeof(msg_t),
// pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
// > el_arr_mmb;
// benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
// benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb);
ea_t< ea_t<
sizeof(msg_t), sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> pc_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>
@ -387,7 +394,7 @@ void Unit::test_queue() {
queue.attach(cq); queue.attach(cq);
QVERIFY(queue.detach() != nullptr); QVERIFY(queue.detach() != nullptr);
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr); benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);
}); });
} }

View File

@ -402,7 +402,7 @@ void Unit::test_route_rtt() {
void Unit::test_route_performance() { void Unit::test_route_performance() {
//return; //return;
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
test_prod_cons<ipc::route, 1, decltype(index)::value + 1, false>(); test_prod_cons<ipc::route, 1, decltype(index)::value + 1, false>();
}); });
test_prod_cons<ipc::route, 1, 8>(); // test & verify test_prod_cons<ipc::route, 1, 8>(); // test & verify
@ -476,13 +476,13 @@ void Unit::test_channel_rtt() {
} }
void Unit::test_channel_performance() { void Unit::test_channel_performance() {
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
test_prod_cons<ipc::channel, 1, decltype(index)::value + 1, false>(); test_prod_cons<ipc::channel, 1, decltype(index)::value + 1, false>();
}); });
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
test_prod_cons<ipc::channel, decltype(index)::value + 1, 1, false>(); test_prod_cons<ipc::channel, decltype(index)::value + 1, 1, false>();
}); });
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
test_prod_cons<ipc::channel, decltype(index)::value + 1, test_prod_cons<ipc::channel, decltype(index)::value + 1,
decltype(index)::value + 1, false>(); decltype(index)::value + 1, false>();
}); });