still has some bugs

This commit is contained in:
mutouyun 2018-12-26 18:22:34 +08:00
parent 6a2c7671db
commit 0ea8e27446
10 changed files with 411 additions and 245 deletions

View File

@ -100,16 +100,18 @@ public:
using base_t::cursor;
void* acquire() noexcept {
uint_t<32> conn_cnt = static_cast<uint_t<32>>(conn_count()); // acquire
if (conn_cnt == 0) return nullptr;
elem_t* el = elem(base_t::acquire());
// check all consumers have finished reading
while(1) {
uint_t<32> expected = 0,
conn_cnt = static_cast<uint_t<32>>(conn_count()); // acquire
uint_t<32> expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, conn_cnt, std::memory_order_relaxed)) {
break;
}
std::this_thread::yield();
conn_cnt = static_cast<uint_t<32>>(conn_count()); // acquire
}
return el->data_;
}

View File

@ -43,7 +43,7 @@ public:
std::size_t connect() noexcept {
if (elems_ == nullptr) return invalid_value;
if (connected_.exchange(true, std::memory_order_relaxed)) {
if (connected_.exchange(true, std::memory_order_acq_rel)) {
// if it's already connected, just return an error count
return invalid_value;
}
@ -52,7 +52,7 @@ public:
std::size_t disconnect() noexcept {
if (elems_ == nullptr) return invalid_value;
if (!connected_.exchange(false, std::memory_order_relaxed)) {
if (!connected_.exchange(false, std::memory_order_acq_rel)) {
// if it's already disconnected, just return an error count
return invalid_value;
}
@ -63,8 +63,12 @@ public:
return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
}
bool empty() const noexcept {
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
}
bool connected() const noexcept {
return connected_.load(std::memory_order_relaxed);
return connected_.load(std::memory_order_acquire);
}
array_t* attach(array_t* arr) noexcept {
@ -82,30 +86,12 @@ public:
return old;
}
bool push(T const & item) noexcept {
if (elems_ == nullptr) return false;
auto ptr = elems_->acquire();
::new (ptr) T(item);
elems_->commit(ptr);
return true;
}
template <typename P>
auto push(P&& param) noexcept // disable this if P is as same as T
-> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> {
if (elems_ == nullptr) return false;
auto ptr = elems_->acquire();
::new (ptr) T { std::forward<P>(param) };
elems_->commit(ptr);
return true;
}
template <typename... P>
auto push(P&&... params) noexcept // some compilers are not support this well
-> Requires<(sizeof...(P) != 1), bool> {
auto push(P&&... params) noexcept {
if (elems_ == nullptr) return false;
auto ptr = elems_->acquire();
::new (ptr) T { std::forward<P>(params)... };
if (ptr == nullptr) return false;
::new (ptr) T(std::forward<P>(params)...);
elems_->commit(ptr);
return true;
}
@ -116,9 +102,9 @@ public:
auto [ques, size] = upd();
for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
queue* que = ques[i];
if (que == nullptr) continue;
if (que->elems_ == nullptr) return nullptr;
if (que->cursor_ != que->elems_->cursor()) {
if ((que != nullptr) && (que->elems_ == nullptr ||
que->elems_->cursor() != que->cursor_)) {
// may que->elems_ == nullptr
return que;
}
}
@ -129,8 +115,12 @@ public:
}
static T pop(queue* que) noexcept {
if (que == nullptr) return {};
if (que->elems_ == nullptr) return {};
if (que == nullptr) {
return {};
}
if (que->elems_ == nullptr) {
return {};
}
auto item_ptr = static_cast<T*>(que->elems_->take(que->cursor_++));
T item = std::move(*item_ptr);
que->elems_->put(item_ptr);

View File

@ -49,7 +49,7 @@ buff_t recv(handle_t const (& hs)[N]) { return recv(hs, N); }
class IPC_EXPORT route {
public:
route();
route(char const * name);
explicit route(char const * name);
route(route&& rhs);
~route();
@ -89,7 +89,7 @@ private:
class IPC_EXPORT channel {
public:
channel();
channel(char const * name);
explicit channel(char const * name);
channel(channel&& rhs);
~channel();
@ -111,6 +111,13 @@ public:
bool send(buff_t const & buff);
bool send(std::string const & str);
bool wait_for_recv(std::size_t r_count, std::size_t until) const;
bool wait_for_recv(std::size_t r_count) const;
bool send_for(std::size_t r_count, void const * data, std::size_t size);
bool send_for(std::size_t r_count, buff_t const & buff);
bool send_for(std::size_t r_count, std::string const & str);
buff_t recv();
private:

View File

@ -6,8 +6,9 @@
#include <limits>
#include <shared_mutex>
#include <mutex>
#include <unordered_map>
#include <thread>
#include <memory>
#include <utility>
#include "def.h"
#include "shm.h"
@ -26,6 +27,20 @@ struct ch_info_t {
};
#pragma pack()
inline bool wait_for_recv(route const & rt, std::size_t r_count, std::size_t until) {
for (unsigned k = 0; rt.recv_count() < r_count; ++k) {
if (k > until) return false;
std::this_thread::yield();
}
return true;
}
template <typename... P>
inline bool channel_send(route& rt, std::size_t r_count, P&&... params) {
if (!wait_for_recv(rt, r_count, 1024)) return false;
return rt.send(params...); // no need std::forward
}
} // internal-linkage
////////////////////////////////////////////////////////////////
@ -42,9 +57,7 @@ public:
std::string n_;
std::size_t id_;
std::unordered_map<std::size_t, route> rts_;
~channel_(void) { rts_.clear(); }
std::array<route, id_pool::max_count> rts_;
ch_info_t& info() {
return *static_cast<ch_info_t*>(h_.get());
@ -92,7 +105,7 @@ char const * channel::name() const {
}
channel channel::clone() const {
return { name() };
return channel { name() };
}
bool channel::connect(char const * name) {
@ -123,7 +136,9 @@ void channel::disconnect() {
std::unique_lock<rw_lock> guard { impl(p_)->info().lc_ };
impl(p_)->acc().release(impl(p_)->id_);
}
impl(p_)->rts_.clear();
for (auto& rt : impl(p_)->rts_) {
rt.disconnect();
}
impl(p_)->r_.disconnect();
impl(p_)->h_.release();
}
@ -132,25 +147,36 @@ std::size_t channel::recv_count() const {
return impl(p_)->r_.recv_count();
}
template <typename... P>
inline bool channel_send(route& rt, P&&... params) {
for (unsigned k = 0; rt.recv_count() == 0; ++k) {
if (k >= 1024) return false;
std::this_thread::yield();
}
return rt.send(params...); // no need std::forward
bool channel::wait_for_recv(std::size_t r_count, std::size_t until) const {
return ::wait_for_recv(impl(p_)->r_, r_count, until);
}
bool channel::wait_for_recv(std::size_t r_count) const {
return wait_for_recv(r_count, (std::numeric_limits<std::size_t>::max)());
}
bool channel::send(void const * data, std::size_t size) {
return channel_send(impl(p_)->r_, data, size);
return impl(p_)->r_.send(data, size);
}
bool channel::send(buff_t const & buff) {
return channel_send(impl(p_)->r_, buff);
return impl(p_)->r_.send(buff);
}
bool channel::send(std::string const & str) {
return channel_send(impl(p_)->r_, str);
return impl(p_)->r_.send(str);
}
bool channel::send_for(std::size_t r_count, void const * data, std::size_t size) {
return ::channel_send(impl(p_)->r_, r_count, data, size);
}
bool channel::send_for(std::size_t r_count, buff_t const & buff) {
return ::channel_send(impl(p_)->r_, r_count, buff);
}
bool channel::send_for(std::size_t r_count, std::string const & str) {
return ::channel_send(impl(p_)->r_, r_count, str);
}
buff_t channel::recv() {
@ -159,33 +185,26 @@ buff_t channel::recv() {
return ipc::multi_recv([&] {
std::array<std::size_t, id_pool::max_count> acqs;
std::size_t counter = 0;
std::unordered_map<std::size_t, route> cache;
// get all acquired ids
{
std::shared_lock<rw_lock> guard { impl(p_)->info().lc_ };
impl(p_)->acc().for_each([&](std::size_t id, bool acquired) {
impl(p_)->acc().for_acquired([this, &acqs, &counter](std::size_t id) {
if (id == impl(p_)->id_) return;
if (acquired) {
acqs[counter++] = id;
}
acqs[counter++] = id;
});
}
// populate route cache & ques
for (std::size_t i = 0; i < counter; ++i) {
auto id = acqs[i];
auto it = impl(p_)->rts_.find(id);
for (std::size_t k = 0; k < counter; ++k) {
std::size_t id = acqs[k];
auto& it = impl(p_)->rts_[id];
// it's a new id
if (it == impl(p_)->rts_.end()) {
it = cache.emplace(id, (impl(p_)->n_ + std::to_string(id)).c_str()).first;
queue_of(it->second.handle())->connect();
if (!it.valid()) {
it.connect((impl(p_)->n_ + std::to_string(id)).c_str());
queue_of(it.handle())->connect();
}
// it's an existing id
else it = cache.insert(impl(p_)->rts_.extract(it)).position;
// get queue of this route
ques[i] = queue_of(it->second.handle());
ques[k] = queue_of(it.handle());
}
// update rts mapping
impl(p_)->rts_.swap(cache);
return std::make_tuple(ques.data(), counter);
});
}

View File

@ -11,13 +11,15 @@ public:
};
private:
uint_t<8> acquir_ = 0;
uint_t<8> cursor_ = 0;
uint_t<8> block_[max_count] {};
uint_t<8> next_[max_count] {};
public:
void init() {
acquir_ = max_count;
for (std::size_t i = 0; i < max_count;) {
i = block_[i] = static_cast<uint_t<8>>(i + 1);
i = next_[i] = static_cast<uint_t<8>>(i + 1);
}
}
@ -35,20 +37,42 @@ public:
return invalid_value;
}
std::size_t id = cursor_;
cursor_ = block_[id]; // point to next
block_[id] = 0; // clear flag
cursor_ = next_[id]; // point to next
next_[id] = acquir_;
acquir_ = static_cast<uint_t<8>>(id); // put it in acquired list
return id;
}
void release(std::size_t id) {
block_[id] = cursor_;
bool release(std::size_t id) {
if (acquir_ == max_count) return false;
if (acquir_ == id) {
acquir_ = next_[id]; // point to next
}
else {
auto a = next_[acquir_], l = acquir_;
while (1) {
if (a == max_count) {
return false; // found nothing
}
if (a == id) {
next_[l] = next_[a];
break;
}
l = a;
a = next_[a];
}
}
next_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back
return true;
}
template <typename F>
void for_each(F&& fr) {
for (std::size_t i = 0; i < max_count; ++i) {
fr(i, block_[i] == 0);
void for_acquired(F&& fr) {
auto a = acquir_;
while (a != max_count) {
fr(a);
a = next_[a];
}
}
};

View File

@ -17,25 +17,26 @@ using data_t = byte_t[data_length];
#pragma pack(1)
struct msg_t {
int remain_;
std::size_t id_;
int remain_;
data_t data_;
};
#pragma pack()
using queue_t = circ::queue<msg_t>;
using queue_t = circ::queue<msg_t>;
using msg_id_t = decltype(msg_t::id_);
struct shm_info_t {
std::atomic_size_t id_acc_; // message id accumulator
queue_t::array_t elems_; // the circ_elem_array in shm
std::atomic<msg_id_t> id_acc_; // message id accumulator
queue_t::array_t elems_; // the circ_elem_array in shm
};
constexpr queue_t* queue_of(handle_t h) {
return static_cast<queue_t*>(h);
}
inline std::atomic_size_t* acc_of(queue_t* que) {
return reinterpret_cast<std::atomic_size_t*>(que->elems()) - 1;
inline std::atomic<msg_id_t>* acc_of(queue_t* que) {
return reinterpret_cast<std::atomic<msg_id_t>*>(que->elems()) - 1;
}
inline auto& recv_cache() {
@ -43,7 +44,7 @@ inline auto& recv_cache() {
* the performance of tls::pointer is not good enough
* so regardless of the mingw-crash-problem for the moment
*/
thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rc;
thread_local std::unordered_map<msg_id_t, buff_t> rc;
return rc;
}
@ -109,28 +110,26 @@ bool send(handle_t h, void const * data, std::size_t size) {
if (que == nullptr) {
return false;
}
// calc a new message id
auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed);
// calc a new message id, start with 1
auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed) + 1;
// push message fragment, one fragment size is data_length
int offset = 0;
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
msg_t msg {
static_cast<int>(size) - offset - static_cast<int>(data_length),
msg_id, { 0 }
msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length), {}
};
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length);
que->push(msg);
if (!que->push(msg)) return false;
}
// if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset;
if (remain > 0) {
msg_t msg {
remain - static_cast<int>(data_length),
msg_id, { 0 }
msg_id, remain - static_cast<int>(data_length), {}
};
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
static_cast<std::size_t>(remain));
que->push(msg);
if (!que->push(msg)) return false;
}
return true;
}
@ -141,6 +140,7 @@ buff_t multi_recv(F&& upd) {
while(1) {
// pop a new message
auto msg = queue_t::pop(queue_t::multi_wait_for(upd));
if (msg.id_ == 0) return {};
// msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_);

View File

@ -54,7 +54,7 @@ handle_t route::handle() const {
}
route route::clone() const {
return { name() };
return route { name() };
}
bool route::connect(char const * name) {

View File

@ -5,6 +5,12 @@
#include <iostream>
#include <atomic>
#include <thread>
#include <string>
#include <cstdio>
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/
#include "stopwatch.hpp"
@ -56,13 +62,27 @@ struct test_verify<void> {
template <typename T>
struct test_cq;
template <typename T>
std::string type_name() {
#if defined(__GNUC__)
const char* typeid_name = typeid(T).name();
const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr);
std::unique_ptr<void, decltype(::free)*> guard { (void*)real_name, ::free };
if (real_name == nullptr) real_name = typeid_name;
return real_name;
#else
return typeid(T).name();
#endif/*__GNUC__*/
}
template <int N, int M, int Loops, typename V = void, typename T>
void benchmark_prod_cons(T* cq) {
std::cout << "benchmark_prod_cons " << type_name<T>() << " [" << N << ":" << M << ", " << Loops << "]" << std::endl;
test_cq<T> tcq { cq };
std::thread producers[N];
std::thread consumers[M];
std::atomic_int fini { 0 };
std::atomic_int fini_p { 0 }, fini_c { 0 };
test_stopwatch sw;
test_verify<V> vf { M };
@ -72,9 +92,20 @@ void benchmark_prod_cons(T* cq) {
t = std::thread{[&, cid] {
vf.prepare(&t);
auto cn = tcq.connect();
tcq.recv(cn, [&](auto&& msg) { vf.push_data(cid, msg); });
int i = 0;
tcq.recv(cn, [&](auto&& msg) {
if (i % ((Loops * N) / 10) == 0) {
std::printf("%d-recving: %d%%\n", cid, (i * 100) / (Loops * N));
}
vf.push_data(cid, msg);
++i;
});
std::printf("%d-consumer-disconnect\n", cid);
tcq.disconnect(cn);
if (++fini != std::extent<decltype(consumers)>::value) return;
if (++fini_c != std::extent<decltype(consumers)>::value) {
std::printf("%d-consumer-end\n", cid);
return;
}
sw.print_elapsed(N, M, Loops);
vf.verify(N, Loops);
}};
@ -87,16 +118,20 @@ void benchmark_prod_cons(T* cq) {
int pid = 0;
for (auto& t : producers) {
t = std::thread{[&, pid] {
auto cn = tcq.connect_send();
sw.start();
for (int i = 0; i < Loops; ++i) {
tcq.send({ pid, i });
tcq.send(cn, { pid, i });
if (i % (Loops / 10) == 0) {
std::printf("%d-sending: %d%%\n", pid, i * 100 / Loops);
}
}
if (++fini_p != std::extent<decltype(producers)>::value) return;
// quit
tcq.send(cn, { -1, -1 });
}};
++pid;
}
for (auto& t : producers) t.join();
// quit
tcq.send({ -1, -1 });
for (auto& t : consumers) t.join();
}

View File

@ -108,10 +108,14 @@ struct test_cq<ipc::circ::elem_array<D>> {
} while(1);
}
void send(msg_t const & msg) {
msg_t* pmsg = static_cast<msg_t*>(ca_->acquire());
ca_t* connect_send() {
return ca_;
}
void send(ca_t* ca, msg_t const & msg) {
msg_t* pmsg = static_cast<msg_t*>(ca->acquire());
(*pmsg) = msg;
ca_->commit(pmsg);
ca->commit(pmsg);
}
};
@ -153,8 +157,12 @@ struct test_cq<ipc::circ::queue<T>> {
} while(1);
}
void send(msg_t const & msg) {
cn_t{ ca_ }.push(msg);
cn_t connect_send() {
return cn_t{ ca_ };
}
void send(cn_t& cn, msg_t const & msg) {
cn.push(msg);
}
};
@ -260,7 +268,7 @@ void Unit::test_prod_cons_performance() {
void Unit::test_queue() {
ipc::circ::queue<msg_t> queue;
queue.push(1, 2);
queue.push(msg_t { 1, 2 });
QCOMPARE(queue.pop(), msg_t{});
QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__));

View File

@ -14,10 +14,6 @@
#include <utility>
#include <unordered_map>
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/
#include "stopwatch.hpp"
#include "spin_lock.hpp"
#include "random.hpp"
@ -37,53 +33,8 @@ constexpr int LoopCount = 100000;
} // internal-linkage
template <>
struct test_cq<ipc::route> {
using cn_t = ipc::route;
std::string conn_name_;
test_cq(void*)
: conn_name_("test-ipc-route") {
ipc::clear_recv(conn_name_.c_str());
}
cn_t connect() {
return { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) {
auto watcher = ipc::connect(conn_name_.c_str());
while (ipc::recv_count(watcher) != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t& cn, F&& proc) {
do {
auto msg = cn.recv();
if (msg.size() < 2) return;
proc(msg);
} while(1);
}
void send(const std::array<int, 2>& info) {
thread_local auto cn = connect();
int n = info[1];
if (n < 0) {
cn.send(ipc::buff_t { '\0' });
}
else cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
}
};
template <>
struct test_verify<ipc::route> {
template <typename T>
struct test_verify {
std::unordered_map<int, std::vector<ipc::buff_t>> list_;
int lcount_;
@ -105,6 +56,120 @@ struct test_verify<ipc::route> {
}
};
template <>
struct test_cq<ipc::route> {
using cn_t = ipc::route;
std::string conn_name_;
test_cq(void*)
: conn_name_("test-ipc-route") {
ipc::clear_recv(conn_name_.c_str());
}
cn_t connect() {
return cn_t { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) {
auto watcher = ipc::connect(conn_name_.c_str());
while (ipc::recv_count(watcher) != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t& cn, F&& proc) {
do {
auto msg = cn.recv();
if (msg.size() < 2) {
QCOMPARE(msg, ipc::buff_t { '\0' });
return;
}
proc(msg);
} while(1);
}
cn_t connect_send() {
return connect();
}
void send(cn_t& cn, const std::array<int, 2>& info) {
int n = info[1];
if (n < 0) {
cn.send(ipc::buff_t { '\0' });
}
else cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
}
};
template <>
struct test_cq<ipc::channel> {
using cn_t = ipc::channel;
std::string conn_name_;
int m_ = 0;
std::vector<cn_t*> s_cns_;
ipc::rw_lock lc_;
test_cq(void*)
: conn_name_("test-ipc-channel") {
}
~test_cq() {
for (auto p : s_cns_) delete p;
}
cn_t connect() {
return cn_t { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) { m_ = M; }
template <typename F>
void recv(cn_t& cn, F&& proc) {
do {
auto msg = cn.recv();
if (msg.size() < 2) {
QCOMPARE(msg, ipc::buff_t { '\0' });
return;
}
proc(msg);
} while(1);
}
cn_t* connect_send() {
auto p = new cn_t { conn_name_.c_str() };
{
std::unique_lock<ipc::rw_lock> guard { lc_ };
s_cns_.push_back(p);
}
return p;
}
void send(cn_t* cn, const std::array<int, 2>& info) {
thread_local struct s_dummy {
s_dummy(cn_t* cn, int m) {
cn->wait_for_recv(m);
std::printf("start to send: %d.\n", m);
}
} _(cn, m_);
int n = info[1];
if (n < 0) {
cn->send(ipc::buff_t { '\0' });
}
else cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
}
};
namespace {
class Unit : public TestSuite {
@ -121,9 +186,13 @@ private slots:
void test_rw_lock();
void test_send_recv();
void test_route();
void test_route_rtt();
void test_route_performance();
void test_channel();
void test_channel_rtt();
void test_channel_performance_1vN();
void test_channel_performance_Nv1();
void test_channel_performance_NvN();
} unit__;
#include "test_ipc.moc"
@ -170,17 +239,7 @@ void benchmark_lc() {
Lc lc;
test_stopwatch sw;
#if defined(__GNUC__)
{
const char* typeid_name = typeid(Lc).name();
const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr);
std::unique_ptr<void, decltype(::free)*> guard { (void*)real_name, ::free };
if (real_name == nullptr) real_name = typeid_name;
std::cout << std::endl << real_name << std::endl;
}
#else
std::cout << std::endl << typeid(Lc).name() << std::endl;
#endif/*__GNUC__*/
std::cout << std::endl << type_name<Lc>() << std::endl;
for (auto& t : r_trd) {
t = std::thread([&] {
@ -260,56 +319,25 @@ void Unit::test_send_recv() {
QVERIFY(h != nullptr);
ipc::clear_recv(h);
char data[] = "hello ipc!";
QVERIFY(ipc::send(h, data, sizeof(data)));
auto got = ipc::recv(h);
QCOMPARE((char*)got.data(), data);
std::thread xx([h, data] {
auto got = ipc::recv(h);
QCOMPARE((char*)got.data(), data);
});
while (!ipc::send(h, data, sizeof(data))) {
std::this_thread::yield();
}
xx.join();
ipc::disconnect(h);
}
template <typename T, int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, T, void>>((T*)nullptr);
}
void Unit::test_route() {
ipc::clear_recv("my-ipc-route");
auto wait_for_handshake = [](int id) {
ipc::route cc { "my-ipc-route" };
std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm;
std::atomic_bool unmatched { true };
std::thread re {[&] {
bool has_re = false;
do {
auto dd = cc.recv();
QVERIFY(!dd.empty());
std::string got { reinterpret_cast<char*>(dd.data()), dd.size() - 1 };
if (cfm == got) continue;
std::cout << id << "-recv: " << got << "[" << dd.size() << "]" << std::endl;
if (ack != got) {
char const cp[] = "copy:";
// check header
if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) {
std::cout << id << "-re: " << got << std::endl;
QVERIFY(has_re = cc.send(
std::string{ "re-" }.append(
reinterpret_cast<char*>(dd.data()), dd.size() - 1)));
}
}
else if (unmatched.load(std::memory_order_relaxed)) {
unmatched.store(false, std::memory_order_release);
std::cout << id << "-matched!" << std::endl;
}
} while (!has_re || unmatched.load(std::memory_order_relaxed));
}};
while (unmatched.load(std::memory_order_acquire)) {
if (!cc.send(cfm)) {
std::cout << id << "-send failed!" << std::endl;
unmatched = false;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
re.join();
std::cout << id << "-fini handshake!" << std::endl;
return cc;
};
std::vector<char const *> const datas = {
"hello!",
"foo",
@ -322,71 +350,106 @@ void Unit::test_route() {
};
std::thread t1 {[&] {
auto cc = wait_for_handshake(1);
const char cp[] = "copy:", re[] = "re-copy:";
bool unchecked = true;
for (std::size_t i = 0; i < datas.size(); ++i, unchecked = false) {
ipc::buff_t dd;
do {
dd = cc.recv();
} while (unchecked &&
(((dd.size() > sizeof(cp)) && std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) ||
((dd.size() > sizeof(re)) && std::memcmp(dd.data(), re, sizeof(re) - 1) == 0)));
ipc::route cc { "my-ipc-route" };
for (std::size_t i = 0; i < datas.size(); ++i) {
ipc::buff_t dd = cc.recv();
QCOMPARE(dd.size(), std::strlen(datas[i]) + 1);
QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0);
}
}};
std::thread t2 {[&] {
auto cc = wait_for_handshake(2);
ipc::route cc { "my-ipc-route" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
for (std::size_t i = 0; i < datas.size(); ++i) {
std::cout << "sending: " << datas[i] << std::endl;
cc.send(datas[i]);
QVERIFY(cc.send(datas[i]));
}
}};
t1.join();
t2.join();
test_prod_cons<ipc::route, 1, 1>();
}
template <int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, ipc::route, void>>((ipc::route*)nullptr);
void Unit::test_route_rtt() {
test_stopwatch sw;
std::thread t1 {[&] {
ipc::route cc { "my-ipc-route-1" };
ipc::route cr { "my-ipc-route-2" };
for (std::size_t i = 0;; ++i) {
auto dd = cc.recv();
if (dd.size() < 2) return;
// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl;
while (!cr.send(ipc::buff_t { 'a' })) {
std::this_thread::yield();
}
}
}};
std::thread t2 {[&] {
ipc::route cc { "my-ipc-route-1" };
ipc::route cr { "my-ipc-route-2" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
sw.start();
for (std::size_t i = 0; i < LoopCount; ++i) {
// std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
cc.send(datas__[i]);
/*auto dd = */cr.recv();
// if (dd.size() != 1 || dd[0] != 'a') {
// QVERIFY(false);
// }
}
cc.send(ipc::buff_t { '\0' });
t1.join();
sw.print_elapsed(1, 1, LoopCount);
}};
t2.join();
}
template <int P, int C>
template <int P, int C, bool V = false>
struct test_performance {
template <typename T = ipc::route>
static void start() {
test_performance<P - 1, C - 1>::start();
test_prod_cons<P, C, false>();
test_performance<P - 1, C - 1, V>::template start<T>();
test_prod_cons<T, P, C, V>();
}
};
template <int C>
struct test_performance<1, C> {
template <int C, bool V>
struct test_performance<1, C, V> {
template <typename T = ipc::route>
static void start() {
test_performance<1, C - 1>::start();
test_prod_cons<1, C, false>();
test_performance<1, C - 1, V>::template start<T>();
test_prod_cons<T, 1, C, V>();
}
};
template <int P>
struct test_performance<P, 1> {
template <int P, bool V>
struct test_performance<P, 1, V> {
template <typename T = ipc::route>
static void start() {
test_performance<P - 1, 1>::start();
test_prod_cons<P, 1, false>();
test_performance<P - 1, 1, V>::template start<T>();
test_prod_cons<T, P, 1, V>();
}
};
template <>
struct test_performance<1, 1> {
template <bool V>
struct test_performance<1, 1, V> {
template <typename T = ipc::route>
static void start() {
test_prod_cons<1, 1, false>();
test_prod_cons<T, 1, 1, V>();
}
};
void Unit::test_route_performance() {
test_prod_cons<1, 1>();
test_performance<1, 10>::start();
}
@ -402,9 +465,7 @@ void Unit::test_channel() {
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
cc.wait_for_recv(1);
for (std::size_t i = 0; i < (std::min)(100, LoopCount); ++i) {
std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
cc.send(datas__[i]);
@ -425,16 +486,19 @@ void Unit::test_channel_rtt() {
auto dd = cc.recv();
if (dd.size() < 2) return;
// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl;
while (!cc.send(ipc::buff_t { 'a' })) {}
while (!cc.send(ipc::buff_t { 'a' })) {
cc.wait_for_recv(1);
}
}
}};
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
cc.wait_for_recv(1);
sw.start();
for (std::size_t i = 0; i < LoopCount; ++i) {
// std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
while (!cc.send(datas__[i])) {}
cc.send(datas__[i]);
/*auto dd = */cc.recv();
// if (dd.size() != 1 || dd[0] != 'a') {
// QVERIFY(false);
@ -442,10 +506,27 @@ void Unit::test_channel_rtt() {
}
cc.send(ipc::buff_t { '\0' });
t1.join();
sw.print_elapsed(DataMin, DataMax, LoopCount);
sw.print_elapsed(1, 1, LoopCount);
}};
t2.join();
}
void Unit::test_channel_performance_1vN() {
test_performance<1, 10, true>::start<ipc::channel>();
// test_prod_cons<ipc::channel, 1, 2, false>();
}
void Unit::test_channel_performance_Nv1() {
test_performance<10, 1>::start<ipc::channel>();
// test_prod_cons<ipc::channel, 1, 1, false>();
// test_prod_cons<ipc::channel, 2, 1, false>();
// test_prod_cons<ipc::channel, 3, 1, false>();
// test_prod_cons<ipc::channel, 4, 1, false>();
}
void Unit::test_channel_performance_NvN() {
test_performance<10, 10>::start<ipc::channel>();
}
} // internal-linkage