fix some bugs; use thread_local

This commit is contained in:
mutouyun 2018-12-17 22:44:37 +08:00
parent cca70b018c
commit 09c3c557ba
5 changed files with 112 additions and 72 deletions

View File

@ -20,7 +20,7 @@ buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); }
IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT handle_t connect (char const * name);
IPC_EXPORT void disconnect(handle_t h); IPC_EXPORT void disconnect(handle_t h);
IPC_EXPORT std::size_t conn_count(handle_t h); IPC_EXPORT std::size_t recv_count(handle_t h);
IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size);
IPC_EXPORT buff_t recv(handle_t h); IPC_EXPORT buff_t recv(handle_t h);
@ -43,7 +43,8 @@ public:
bool connect(char const * name); bool connect(char const * name);
void disconnect(); void disconnect();
std::size_t conn_count() const;
std::size_t recv_count() const;
bool send(void const * data, std::size_t size); bool send(void const * data, std::size_t size);
bool send(buff_t const & buff); bool send(buff_t const & buff);

View File

@ -13,7 +13,7 @@
#include "def.h" #include "def.h"
#include "circ_queue.h" #include "circ_queue.h"
#include "rw_lock.h" #include "rw_lock.h"
#include "tls_pointer.h" //#include "tls_pointer.h"
namespace { namespace {
@ -36,13 +36,13 @@ struct shm_info_t {
queue_t::array_t elems_; // the circ_elem_array in shm queue_t::array_t elems_; // the circ_elem_array in shm
}; };
/* ///*
* thread_local stl object's destructor causing crash // * thread_local stl object's destructor causing crash
* See: https://sourceforge.net/p/mingw-w64/bugs/527/ // * See: https://sourceforge.net/p/mingw-w64/bugs/527/
* https://sourceforge.net/p/mingw-w64/bugs/727/ // * https://sourceforge.net/p/mingw-w64/bugs/727/
*/ //*/
/*thread_local*/ ///*thread_local*/
tls::pointer<std::unordered_map<decltype(msg_t::id_), buff_t>> recv_caches__; //tls::pointer<std::unordered_map<decltype(msg_t::id_), buff_t>> recv_caches__;
std::unordered_map<handle_t, queue_t> h2q__; std::unordered_map<handle_t, queue_t> h2q__;
rw_lock h2q_lc__; rw_lock h2q_lc__;
@ -90,15 +90,9 @@ handle_t connect(char const * name) {
if (mem == nullptr) { if (mem == nullptr) {
return nullptr; return nullptr;
} }
queue_t* queue;
{ {
std::unique_lock<rw_lock> guard { h2q_lc__ }; std::unique_lock<rw_lock> guard { h2q_lc__ };
queue = &(h2q__[h]); h2q__[h].attach(&(static_cast<shm_info_t*>(mem)->elems_));
if (queue == nullptr) {
return nullptr;
}
queue->attach(&(static_cast<shm_info_t*>(mem)->elems_));
queue->connect();
} }
h_guard.release(); h_guard.release();
return h; return h;
@ -121,7 +115,7 @@ void disconnect(handle_t h) {
shm::release(h, sizeof(queue_t)); shm::release(h, sizeof(queue_t));
} }
std::size_t conn_count(handle_t h) { std::size_t recv_count(handle_t h) {
auto queue = queue_of(h); auto queue = queue_of(h);
if (queue == nullptr) { if (queue == nullptr) {
return error_count; return error_count;
@ -174,7 +168,7 @@ buff_t recv(handle_t h) {
if (!queue->connected()) { if (!queue->connected()) {
queue->connect(); queue->connect();
} }
auto rcs = recv_caches__.create(); thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rcs;
while(1) { while(1) {
// pop a new message // pop a new message
auto msg = queue->pop(); auto msg = queue->pop();
@ -182,13 +176,13 @@ buff_t recv(handle_t h) {
std::size_t remain = static_cast<std::size_t>( std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_); static_cast<int>(data_length) + msg.remain_);
// find cache with msg.id_ // find cache with msg.id_
auto cache_it = rcs->find(msg.id_); auto cache_it = rcs.find(msg.id_);
if (cache_it == rcs->end()) { if (cache_it == rcs.end()) {
if (remain <= data_length) { if (remain <= data_length) {
return make_buff(msg.data_, remain); return make_buff(msg.data_, remain);
} }
// cache the first message fragment // cache the first message fragment
else rcs->emplace(msg.id_, make_buff(msg.data_)); else rcs.emplace(msg.id_, make_buff(msg.data_));
} }
// has cached before this message // has cached before this message
else { else {
@ -198,7 +192,7 @@ buff_t recv(handle_t h) {
cache.insert(cache.end(), msg.data_, msg.data_ + remain); cache.insert(cache.end(), msg.data_, msg.data_ + remain);
// finish this message, erase it from cache // finish this message, erase it from cache
auto buf = std::move(cache); auto buf = std::move(cache);
rcs->erase(cache_it); rcs.erase(cache_it);
return buf; return buf;
} }
// there are remain datas after this message // there are remain datas after this message
@ -263,8 +257,8 @@ void channel::disconnect() {
ipc::disconnect(impl(p_)->h_); ipc::disconnect(impl(p_)->h_);
} }
std::size_t channel::conn_count() const { std::size_t channel::recv_count() const {
return ipc::conn_count(impl(p_)->h_); return ipc::recv_count(impl(p_)->h_);
} }
bool channel::send(void const *data, std::size_t size) { bool channel::send(void const *data, std::size_t size) {

View File

@ -40,11 +40,11 @@ struct test_stopwatch {
} }
}; };
template <bool V> template <typename V>
struct test_verify; struct test_verify;
template <> template <>
struct test_verify<false> { struct test_verify<void> {
test_verify (int) {} test_verify (int) {}
void prepare (void*) {} void prepare (void*) {}
void push_data(int, ...) {} void push_data(int, ...) {}
@ -54,7 +54,7 @@ struct test_verify<false> {
template <typename T> template <typename T>
struct test_cq; struct test_cq;
template <int N, int M, int Loops, bool V = true, typename T> template <int N, int M, int Loops, typename V = void, typename T>
void benchmark_prod_cons(T* cq) { void benchmark_prod_cons(T* cq) {
test_cq<T> tcq { cq }; test_cq<T> tcq { cq };

View File

@ -23,8 +23,8 @@ struct msg_t {
} // internal-linkage } // internal-linkage
template <bool V> template <>
struct test_verify { struct test_verify<cq_t> {
std::unordered_map<int, std::vector<int>>* list_; std::unordered_map<int, std::vector<int>>* list_;
int lcount_; int lcount_;
@ -212,7 +212,7 @@ void Unit::test_inst() {
template <int N, int M, bool V = true, int Loops = LoopCount> template <int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() { void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, V>(cq__); benchmark_prod_cons<N, M, Loops, std::conditional_t<V, cq_t, void>>(cq__);
} }
void Unit::test_prod_cons_1v1() { void Unit::test_prod_cons_1v1() {

View File

@ -17,11 +17,13 @@
# include <cxxabi.h> // abi::__cxa_demangle # include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/ #endif/*__GNUC__*/
#include "ipc.h"
#include "rw_lock.h"
#include "stopwatch.hpp" #include "stopwatch.hpp"
#include "spin_lock.hpp" #include "spin_lock.hpp"
#include "random.hpp" #include "random.hpp"
#include "ipc.h"
#include "rw_lock.h"
#include "test.h" #include "test.h"
namespace { namespace {
@ -29,8 +31,8 @@ namespace {
std::vector<ipc::buff_t> datas__; std::vector<ipc::buff_t> datas__;
constexpr int DataMin = 2; constexpr int DataMin = 2;
constexpr int DataMax = 512; constexpr int DataMax = 256;
constexpr int LoopCount = 100/*0*//*000*/; constexpr int LoopCount = 100000;
} // internal-linkage } // internal-linkage
@ -39,22 +41,24 @@ struct test_cq<ipc::channel> {
using cn_t = ipc::channel; using cn_t = ipc::channel;
std::string conn_name_; std::string conn_name_;
std::size_t conn_count_ = 0;
test_cq(void*) test_cq(void*)
: conn_name_("test-ipc-channel") : conn_name_("test-ipc-channel") {
{} auto watcher = connect();
QCOMPARE(watcher.recv_count(), 0);
cn_t connect() {
cn_t cn { conn_name_.c_str() };
conn_count_ = cn.conn_count();
return cn;
} }
void disconnect(cn_t&) {} cn_t connect() {
return { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) { void wait_start(int M) {
while (conn_count_ != static_cast<std::size_t>(M)) { auto watcher = connect();
while (watcher.recv_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield(); std::this_thread::yield();
} }
} }
@ -74,32 +78,32 @@ struct test_cq<ipc::channel> {
if (n < 0) { if (n < 0) {
cn.send(ipc::buff_t { '\0' }); cn.send(ipc::buff_t { '\0' });
} }
else cn.send(datas__[n]); else cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
} }
}; };
//template <> template <>
//struct test_verify<true> { struct test_verify<ipc::channel> {
// std::unordered_map<int, std::vector<ipc::buff_t>> list_; std::unordered_map<int, std::vector<ipc::buff_t>> list_;
// int lcount_; int lcount_;
// test_verify(int M) : lcount_{ M } {} test_verify(int M) : lcount_{ M } {}
// void prepare(void* pt) { void prepare(void* pt) {
// std::cout << "start consumer: " << pt << std::endl; std::cout << "start consumer: " << pt << std::endl;
// } }
// void push_data(int cid, ipc::buff_t const & msg) { void push_data(int cid, ipc::buff_t const & msg) {
// list_[cid].emplace_back(std::move(msg)); list_[cid].emplace_back(std::move(msg));
// } }
// void verify(int /*N*/, int /*Loops*/) { void verify(int /*N*/, int /*Loops*/) {
// std::cout << "verifying..." << std::endl; std::cout << "verifying..." << std::endl;
// for (int m = 0; m < lcount_; ++m) { for (int m = 0; m < lcount_; ++m) {
// QCOMPARE(datas__, list_[m]); QCOMPARE(datas__, list_[m]);
// } }
// } }
//}; };
namespace { namespace {
@ -201,7 +205,7 @@ void benchmark_lc() {
sw.print_elapsed(W, R, Loops); sw.print_elapsed(W, R, Loops);
} }
std::uint64_t sum = 0; std::uint64_t sum = 0;
for (int i : seq) sum += i; for (int i : seq) sum += static_cast<std::uint64_t>(i);
QCOMPARE(sum, acc<std::uint64_t>(1, Loops) * std::extent<decltype(w_trd)>::value); QCOMPARE(sum, acc<std::uint64_t>(1, Loops) * std::extent<decltype(w_trd)>::value);
}); });
} }
@ -230,10 +234,10 @@ void benchmark_lc() {
} }
template <int W, int R> template <int W, int R>
void test_performance() { void test_lock_performance() {
std::cout << std::endl std::cout << std::endl
<< "test_performance: [" << W << ":" << R << "]" << "test_lock_performance: [" << W << ":" << R << "]"
<< std::endl; << std::endl;
benchmark_lc<ipc::rw_lock , W, R>(); benchmark_lc<ipc::rw_lock , W, R>();
@ -243,10 +247,10 @@ void test_performance() {
} }
void Unit::test_rw_lock() { void Unit::test_rw_lock() {
test_performance<1, 1>(); test_lock_performance<1, 1>();
test_performance<4, 4>(); test_lock_performance<4, 4>();
test_performance<1, 8>(); test_lock_performance<1, 8>();
test_performance<8, 1>(); test_lock_performance<8, 1>();
} }
void Unit::test_send_recv() { void Unit::test_send_recv() {
@ -340,8 +344,49 @@ void Unit::test_channel() {
t2.join(); t2.join();
} }
template <int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, ipc::channel, void>>((ipc::channel*)nullptr);
}
template <int P, int C>
struct test_performance {
static void start() {
test_performance<P - 1, C - 1>::start();
test_prod_cons<P, C, false>();
}
};
template <int C>
struct test_performance<1, C> {
static void start() {
test_performance<1, C - 1>::start();
test_prod_cons<1, C, false>();
}
};
template <int P>
struct test_performance<P, 1> {
static void start() {
test_performance<P - 1, 1>::start();
test_prod_cons<P, 1, false>();
}
};
template <>
struct test_performance<1, 1> {
static void start() {
test_prod_cons<1, 1, false>();
}
};
void Unit::test_channel_performance() { void Unit::test_channel_performance() {
benchmark_prod_cons<1, 1, LoopCount, false>((ipc::channel*)nullptr); test_prod_cons<1, 1>();
test_prod_cons<1, 8>();
test_performance<1 , 10>::start();
test_performance<10, 1 >::start();
test_performance<10, 10>::start();
} }
} // internal-linkage } // internal-linkage