optimize structure

This commit is contained in:
zhangyi 2019-03-25 20:14:59 +08:00
parent 0c911fb0a4
commit 731f61a3c1
7 changed files with 137 additions and 167 deletions

View File

@ -7,7 +7,6 @@
#include "rw_lock.h"
#include "platform/waiter_wrapper.h"
#include "platform/detail.h"
namespace ipc {
@ -25,7 +24,6 @@ constexpr u1_t index_of(u2_t c) noexcept {
}
class conn_head {
ipc::detail::waiter cc_waiter_, waiter_;
std::atomic<std::size_t> cc_ { 0 }; // connection counter
ipc::spin_lock lc_;
@ -47,12 +45,6 @@ public:
conn_head(const conn_head&) = delete;
conn_head& operator=(const conn_head&) = delete;
auto & waiter() noexcept { return this->waiter_; }
auto const & waiter() const noexcept { return this->waiter_; }
auto & conn_waiter() noexcept { return this->cc_waiter_; }
auto const & conn_waiter() const noexcept { return this->cc_waiter_; }
std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_release);
}

View File

@ -6,6 +6,7 @@
#include <utility>
#include <atomic>
#include <type_traits>
#include <string>
#include "def.h"
#include "shm.h"
@ -13,9 +14,12 @@
#include "pool_alloc.h"
#include "queue.h"
#include "policy.h"
#include "rw_lock.h"
#include "memory/resource.h"
#include "platform/detail.h"
#include "platform/waiter_wrapper.h"
namespace {
@ -76,17 +80,32 @@ struct detail_impl {
using queue_t = ipc::queue<msg_t<data_length>, Policy>;
struct conn_info_t {
queue_t que_;
waiter cc_waiter_, wt_waiter_, rd_waiter_;
conn_info_t(char const * name)
: que_ ((std::string{ "__QU_CONN__" } + name).c_str()) {
cc_waiter_.open((std::string{ "__CC_CONN__" } + name).c_str());
wt_waiter_.open((std::string{ "__WT_CONN__" } + name).c_str());
rd_waiter_.open((std::string{ "__RD_CONN__" } + name).c_str());
}
};
constexpr static void* head_of(queue_t* que) {
return static_cast<void*>(que->elems());
}
constexpr static queue_t* queue_of(ipc::handle_t h) {
return static_cast<queue_t*>(h);
constexpr static conn_info_t* info_of(ipc::handle_t h) {
return static_cast<conn_info_t*>(h);
}
static auto& queues_cache() {
static tls::pointer<mem::vector<queue_t*>> qc;
return *qc.create();
constexpr static queue_t* queue_of(ipc::handle_t h) {
auto info = info_of(h);
if (info == nullptr) {
return nullptr;
}
return &(info->que_);
}
static auto& recv_cache() {
@ -106,7 +125,7 @@ static auto& recv_cache() {
/* API implementations */
static ipc::handle_t connect(char const * name) {
return mem::alloc<queue_t>(name);
return mem::alloc<conn_info_t>(name);
}
static void disconnect(ipc::handle_t h) {
@ -114,8 +133,10 @@ static void disconnect(ipc::handle_t h) {
if (que == nullptr) {
return;
}
que->disconnect(); // needn't to detach, cause it will be deleted soon.
mem::free(que);
if (que->disconnect()) {
info_of(h)->cc_waiter_.broadcast();
}
mem::free(info_of(h));
}
static std::size_t recv_count(ipc::handle_t h) {
@ -131,7 +152,14 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) {
if (que == nullptr) {
return false;
}
return que->wait_for_connect(r_count);
for (unsigned k = 0; que->conn_count() < r_count;) {
ipc::sleep(k, [h, que, r_count] {
return info_of(h)->cc_waiter_.wait_if([que, r_count] {
return que->conn_count() < r_count;
});
});
}
return true;
}
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
@ -154,6 +182,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
static_cast<byte_t const *>(data) + offset, data_length)) {
std::this_thread::yield();
}
info_of(h)->rd_waiter_.broadcast();
}
// if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset;
@ -162,6 +191,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
std::this_thread::yield();
}
info_of(h)->rd_waiter_.broadcast();
}
return true;
}
@ -169,11 +199,22 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
static buff_t recv(ipc::handle_t h) {
auto que = queue_of(h);
if (que == nullptr) return {};
que->connect(); // wouldn't connect twice
if (que->connect()) { // wouldn't connect twice
info_of(h)->cc_waiter_.broadcast();
}
auto& rc = recv_cache();
while (1) {
// pop a new message
auto msg = que->pop();
typename queue_t::value_t msg;
for (unsigned k = 0; !que->pop(msg);) {
bool succ = false;
ipc::sleep(k, [h, que, &msg, &succ] {
return info_of(h)->rd_waiter_.wait_if([que, &msg, &succ] {
return !(succ = que->pop(msg));
});
});
if (succ) break;
}
if (msg.head_.que_ == nullptr) return {};
if (msg.head_.que_ == que) continue; // pop next
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length
@ -217,7 +258,7 @@ namespace ipc {
namespace detail {
std::size_t calc_unique_id() {
static shm::handle g_shm { "__GLOBAL_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
static shm::handle g_shm { "__IPC_GLOBAL_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
return static_cast<std::atomic<std::size_t>*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed);
}

View File

@ -219,4 +219,34 @@ public:
};
} // namespace detail
class waiter : public detail::waiter_wrapper {
shm::handle shm_;
using detail::waiter_wrapper::attach;
public:
~waiter() {
close();
}
bool open(char const * name) {
if (name == nullptr || name[0] == '\0') {
return false;
}
close();
if (!shm_.acquire((std::string{ "__SHM_WAITER__" } + name).c_str(), sizeof(waiter_t))) {
return false;
}
attach(static_cast<waiter_t*>(shm_.get()));
return detail::waiter_wrapper::open((std::string{ "__IMP_WAITER__" } + name).c_str());
}
void close() {
detail::waiter_wrapper::close();
shm_.release();
}
};
} // namespace ipc

View File

@ -9,71 +9,48 @@
#include <thread>
#include <chrono>
#include <string>
#include <cassert>
#include "def.h"
#include "shm.h"
#include "log.h"
#include "rw_lock.h"
#include "platform/waiter_wrapper.h"
#include "platform/detail.h"
namespace ipc {
namespace detail {
class queue_waiter {
class queue_conn {
protected:
ipc::detail::waiter_wrapper waiter_;
ipc::detail::waiter_wrapper cc_waiter_;
bool connected_ = false;
shm::handle elems_h_;
template <typename Elems>
Elems* open(char const * name) {
if (name == nullptr || name[0] == '\0') {
ipc::error("fail open waiter: name is empty!\n");
return nullptr;
}
if (!elems_h_.acquire(name, sizeof(Elems))) {
return nullptr;
}
auto elems = static_cast<Elems*>(elems_h_.get());
if (elems == nullptr) {
ipc::error("fail acquire elems: %s\n", name);
return nullptr;
}
elems->init();
return elems;
}
template <typename Elems>
void open(Elems*(& elems), char const * name) {
assert(name != nullptr && name[0] != '\0');
if (elems == nullptr) {
elems = open<Elems>(name);
}
assert(elems != nullptr);
waiter_.attach(&(elems->waiter()));
if (!waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str())) {
return;
}
cc_waiter_.attach(&(elems->conn_waiter()));
cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str());
}
void close() {
waiter_.close();
waiter_.attach(nullptr);
cc_waiter_.close();
cc_waiter_.attach(nullptr);
}
template <typename Elems>
void close(Elems* /*elems*/) {
close();
elems_h_.release();
}
public:
queue_waiter() = default;
queue_waiter(const queue_waiter&) = delete;
queue_waiter& operator=(const queue_waiter&) = delete;
queue_conn() = default;
queue_conn(const queue_conn&) = delete;
queue_conn& operator=(const queue_conn&) = delete;
bool connected() const noexcept {
return connected_;
@ -89,9 +66,7 @@ public:
}
connected_ = true;
elems->connect();
auto ret = std::make_tuple(true, elems->cursor());
cc_waiter_.broadcast();
return ret;
return std::make_tuple(true, elems->cursor());
}
template <typename Elems>
@ -103,27 +78,13 @@ public:
}
connected_ = false;
elems->disconnect();
cc_waiter_.broadcast();
return true;
}
template <typename Elems>
bool wait_for_connect(Elems* elems, std::size_t count) {
if (elems == nullptr) return false;
for (unsigned k = 0; elems->conn_count() < count;) {
ipc::sleep(k, [this, elems, count] {
return cc_waiter_.wait_if([elems, count] {
return elems->conn_count() < count;
});
});
}
return true;
}
};
template <typename Elems>
class queue_base : public queue_waiter {
using base_t = queue_waiter;
class queue_base : public queue_conn {
using base_t = queue_conn;
public:
using elems_t = Elems;
@ -140,16 +101,11 @@ public:
explicit queue_base(char const * name)
: queue_base() {
attach(nullptr, name);
elems_ = open<elems_t>(name);
}
explicit queue_base(elems_t* els, char const * name = nullptr)
: queue_base() {
attach(els, name);
}
/* not virtual */ ~queue_base(void) {
base_t::close(elems_);
/* not virtual */ ~queue_base() {
base_t::close();
}
constexpr elems_t * elems() const noexcept {
@ -173,10 +129,6 @@ public:
return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
}
bool wait_for_connect(std::size_t count) {
return base_t::wait_for_connect(elems_, count);
}
bool valid() const noexcept {
return elems_ != nullptr;
}
@ -185,57 +137,22 @@ public:
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
}
elems_t* attach(elems_t* els, char const * name = nullptr) noexcept {
auto old = elems_;
elems_ = els;
if (name == nullptr || name[0] == '\0') {
base_t::close(old);
}
else base_t::open(elems_, name);
return old;
}
elems_t* detach() noexcept {
if (elems_ == nullptr) return nullptr;
base_t::close<elems_t>(nullptr); // not release shm
auto old = elems_;
elems_ = nullptr;
return old;
}
template <typename T, typename... P>
auto push(P&&... params) {
if (elems_ == nullptr) return false;
if (elems_->push([&](void* p) {
return elems_->push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
})) {
this->waiter_.broadcast();
return true;
}
return false;
});
}
template <typename T>
T pop() {
bool pop(T& item) {
if (elems_ == nullptr) {
return {};
return false;
}
T item;
auto pop_item = [this, &item] {
return elems_->pop(&this->cursor_, [&item](void* p) {
return elems_->pop(&(this->cursor_), [&item](void* p) {
::new (&item) T(std::move(*static_cast<T*>(p)));
});
};
for (unsigned k = 0;;) {
if (pop_item()) return item;
bool succ = false;
ipc::sleep(k, [this, &succ, &pop_item] {
return this->waiter_.wait_if([&succ, &pop_item] {
return !(succ = pop_item());
});
});
if (succ) return item;
}
}
};
@ -246,6 +163,8 @@ class queue : public detail::queue_base<typename Policy::template elems_t<sizeof
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>>;
public:
using value_t = T;
using base_t::base_t;
using base_t::push;
using base_t::pop;
@ -255,8 +174,8 @@ public:
return base_t::template push<T>(std::forward<P>(params)...);
}
T pop() {
return base_t::template pop<T>();
bool pop(T& item) {
return base_t::pop(item);
}
};

View File

@ -146,6 +146,7 @@ void benchmark_prod_cons(T* cq) {
}
// quit
tcq.send(cn, { -1, -1 });
tcq.disconnect(cn);
}};
++pid;
}

View File

@ -133,6 +133,9 @@ struct test_cq<ea_t<D, P>> {
ca_->disconnect();
}
void disconnect(ca_t*) {
}
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
@ -173,28 +176,23 @@ struct test_cq<ea_t<D, P>> {
template <typename... T>
struct test_cq<ipc::queue<T...>> {
using cn_t = ipc::queue<T...>;
using ca_t = typename cn_t::elems_t;
ca_t* ca_;
test_cq(void*) : ca_(reinterpret_cast<ca_t*>(cq__)) {
::new (ca_) ca_t;
}
test_cq(void*) {}
cn_t* connect() {
cn_t* queue = new cn_t { ca_ };
cn_t* queue = new cn_t { "test-ipc-queue" };
[&] { QVERIFY(queue->connect()); } ();
return queue;
}
void disconnect(cn_t* queue) {
QVERIFY(queue->disconnect());
QVERIFY(queue->detach() != nullptr);
queue->disconnect();
delete queue;
}
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
cn_t que("test-ipc-queue");
while (que.conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
@ -202,18 +200,21 @@ struct test_cq<ipc::queue<T...>> {
template <typename F>
void recv(cn_t* queue, F&& proc) {
while(1) {
auto msg = queue->pop();
typename cn_t::value_t msg;
while (!queue->pop(msg)) {
std::this_thread::yield();
}
if (msg.pid_ < 0) return;
proc(msg);
}
}
cn_t* connect_send() {
return nullptr;
return new cn_t { "test-ipc-queue" };
}
void send(cn_t* /*cn*/, msg_t const & msg) {
while (!cn_t{ ca_ }.push(msg)) {
void send(cn_t* cn, msg_t const & msg) {
while (!cn->push(msg)) {
std::this_thread::yield();
}
}
@ -385,15 +386,12 @@ void Unit::test_queue() {
>>;
queue_t queue;
queue.push(msg_t { 1, 2 });
QCOMPARE(queue.pop(), msg_t{});
QVERIFY(!queue.push(msg_t { 1, 2 }));
msg_t msg {};
QVERIFY(!queue.pop(msg));
QCOMPARE(msg, (msg_t {}));
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__));
std::memset(cq__, 0, sizeof(decltype(queue)::elems_t));
auto cq = reinterpret_cast<decltype(queue)::elems_t*>(cq__);
queue.attach(cq);
QVERIFY(queue.detach() != nullptr);
ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);
});

View File

@ -113,17 +113,11 @@ struct test_cq<ipc::channel> {
std::string conn_name_;
int m_ = 0;
std::vector<cn_t*> s_cns_;
ipc::rw_lock lc_;
test_cq(void*)
: conn_name_("test-ipc-channel") {
}
~test_cq() {
for (auto p : s_cns_) delete p;
}
cn_t connect() {
return cn_t { conn_name_.c_str() };
}
@ -146,27 +140,22 @@ struct test_cq<ipc::channel> {
} while(1);
}
cn_t* connect_send() {
auto p = new cn_t { conn_name_.c_str() };
{
std::unique_lock<ipc::rw_lock> guard { lc_ };
s_cns_.push_back(p);
}
return p;
cn_t connect_send() {
return connect();
}
void send(cn_t* cn, const std::array<int, 2>& info) {
void send(cn_t& cn, const std::array<int, 2>& info) {
thread_local struct s_dummy {
s_dummy(cn_t* cn, int m) {
cn->wait_for_recv(static_cast<std::size_t>(m));
s_dummy(cn_t& cn, int m) {
cn.wait_for_recv(static_cast<std::size_t>(m));
// std::printf("start to send: %d.\n", m);
}
} _(cn, m_);
int n = info[1];
if (n < 0) {
/*QVERIFY*/(cn->send(ipc::buff_t('\0')));
/*QVERIFY*/(cn.send(ipc::buff_t('\0')));
}
else /*QVERIFY*/(cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
else /*QVERIFY*/(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
}
};