adjust file naming; supplement test cases

This commit is contained in:
mutouyun 2018-12-24 11:56:42 +08:00
parent f962dc4cb5
commit 0eb61b8745
6 changed files with 57 additions and 13 deletions

View File

@ -24,9 +24,9 @@ HEADERS += \
../include/def.h \ ../include/def.h \
../include/rw_lock.h \ ../include/rw_lock.h \
../include/tls_pointer.h \ ../include/tls_pointer.h \
../src/route.hpp \ ../src/channel.inc \
../src/channel.hpp \ ../src/route.inc \
../src/id_pool.hpp ../src/id_pool.inc
SOURCES += \ SOURCES += \
../src/shm.cpp \ ../src/shm.cpp \

View File

@ -7,12 +7,13 @@
#include <shared_mutex> #include <shared_mutex>
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include <thread>
#include "def.h" #include "def.h"
#include "shm.h" #include "shm.h"
#include "rw_lock.h" #include "rw_lock.h"
#include "id_pool.hpp" #include "id_pool.inc"
namespace { namespace {
@ -131,23 +132,31 @@ std::size_t channel::recv_count() const {
return impl(p_)->r_.recv_count(); return impl(p_)->r_.recv_count();
} }
template <typename... P>
inline bool channel_send(route& rt, P&&... params) {
while (rt.recv_count() == 0) {
std::this_thread::yield();
}
return rt.send(params...); // no need std::forward
}
bool channel::send(void const * data, std::size_t size) { bool channel::send(void const * data, std::size_t size) {
return impl(p_)->r_.send(data, size); return channel_send(impl(p_)->r_, data, size);
} }
bool channel::send(buff_t const & buff) { bool channel::send(buff_t const & buff) {
return impl(p_)->r_.send(buff); return channel_send(impl(p_)->r_, buff);
} }
bool channel::send(std::string const & str) { bool channel::send(std::string const & str) {
return impl(p_)->r_.send(str); return channel_send(impl(p_)->r_, str);
} }
buff_t channel::recv() { buff_t channel::recv() {
if (!valid()) return {}; if (!valid()) return {};
std::array<queue_t*, id_pool::max_count> ques; std::array<queue_t*, id_pool::max_count> ques;
return ipc::multi_recv([&] { return ipc::multi_recv([&] {
std::array<std::size_t, id_pool::max_count> acqeds; std::array<std::size_t, id_pool::max_count> acqs;
std::size_t counter = 0; std::size_t counter = 0;
std::unordered_map<std::size_t, route> cache; std::unordered_map<std::size_t, route> cache;
// get all acquired ids // get all acquired ids
@ -156,13 +165,13 @@ buff_t channel::recv() {
impl(p_)->acc().for_each([&](std::size_t id, bool acquired) { impl(p_)->acc().for_each([&](std::size_t id, bool acquired) {
if (id == impl(p_)->id_) return; if (id == impl(p_)->id_) return;
if (acquired) { if (acquired) {
acqeds[counter++] = id; acqs[counter++] = id;
} }
}); });
} }
// populate route cache & ques // populate route cache & ques
for (std::size_t i = 0; i < counter; ++i) { for (std::size_t i = 0; i < counter; ++i) {
auto id = acqeds[i]; auto id = acqs[i];
auto it = impl(p_)->rts_.find(id); auto it = impl(p_)->rts_.find(id);
// it's a new id // it's a new id
if (it == impl(p_)->rts_.end()) { if (it == impl(p_)->rts_.end()) {

View File

@ -193,5 +193,5 @@ buff_t recv(handle_t h) {
} // namespace ipc } // namespace ipc
#include "route.hpp" #include "route.inc"
#include "channel.hpp" #include "channel.inc"

View File

@ -65,8 +65,10 @@ bool route::connect(char const * name) {
} }
void route::disconnect() { void route::disconnect() {
if (!valid()) return;
ipc::disconnect(impl(p_)->h_); ipc::disconnect(impl(p_)->h_);
impl(p_)->h_ = nullptr; impl(p_)->h_ = nullptr;
impl(p_)->n_.clear();
} }
std::size_t route::recv_count() const { std::size_t route::recv_count() const {

View File

@ -123,6 +123,7 @@ private slots:
void test_route(); void test_route();
void test_route_performance(); void test_route_performance();
void test_channel(); void test_channel();
void test_channel_rtt();
} unit__; } unit__;
#include "test_ipc.moc" #include "test_ipc.moc"
@ -158,7 +159,7 @@ struct lc_wrapper : Mutex {
void unlock_shared() { Mutex::unlock(); } void unlock_shared() { Mutex::unlock(); }
}; };
template <typename Lc, int W, int R, int Loops = 100000> template <typename Lc, int W, int R, int Loops = LoopCount>
void benchmark_lc() { void benchmark_lc() {
std::thread w_trd[W]; std::thread w_trd[W];
std::thread r_trd[R]; std::thread r_trd[R];
@ -415,4 +416,36 @@ void Unit::test_channel() {
t2.join(); t2.join();
} }
void Unit::test_channel_rtt() {
test_stopwatch sw;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {
auto dd = cc.recv();
if (dd.size() < 2) return;
// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl;
cc.send(ipc::buff_t { 'a' });
}
}};
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
sw.start();
for (std::size_t i = 0; i < LoopCount; ++i) {
// std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
cc.send(datas__[i]);
/*auto dd = */cc.recv();
// if (dd.size() != 1 || dd[0] != 'a') {
// QVERIFY(false);
// }
}
cc.send(ipc::buff_t { '\0' });
t1.join();
sw.print_elapsed(DataMin, DataMax, LoopCount);
}};
t2.join();
}
} // internal-linkage } // internal-linkage