mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
use storage-flag, simplify codes, use one acc handle for all connections & messages.
This commit is contained in:
parent
50e29703dc
commit
b3c577988b
69
src/ipc.cpp
69
src/ipc.cpp
@ -35,9 +35,10 @@ struct msg_t;
|
|||||||
|
|
||||||
template <std::size_t AlignSize>
|
template <std::size_t AlignSize>
|
||||||
struct msg_t<0, AlignSize> {
|
struct msg_t<0, AlignSize> {
|
||||||
void* que_;
|
msg_id_t conn_;
|
||||||
msg_id_t id_;
|
msg_id_t id_;
|
||||||
int remain_;
|
int remain_;
|
||||||
|
bool storage_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
@ -46,11 +47,17 @@ struct msg_t {
|
|||||||
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
|
|
||||||
msg_t() = default;
|
msg_t() = default;
|
||||||
msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s)
|
msg_t(msg_id_t c, msg_id_t i, int r, void const * d, std::size_t s)
|
||||||
: head_ { q, i, r } {
|
: head_ { c, i, r, false } {
|
||||||
if ((d != nullptr) && (s > 0)) {
|
if ((d != nullptr) && (s > 0)) {
|
||||||
std::memcpy(&data_, d, s);
|
std::memcpy(&data_, d, s);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
head_.storage_ = true;
|
||||||
|
if (d != nullptr) {
|
||||||
|
std::memcpy(&data_, d, sizeof(msg_id_t));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -80,9 +87,13 @@ struct cache_t {
|
|||||||
struct conn_info_head {
|
struct conn_info_head {
|
||||||
using acc_t = std::atomic<msg_id_t>;
|
using acc_t = std::atomic<msg_id_t>;
|
||||||
|
|
||||||
std::string name_;
|
static auto acc() {
|
||||||
|
static shm::handle acc_h("__AC_CONN__", sizeof(acc_t));
|
||||||
|
return static_cast<acc_t*>(acc_h.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
msg_id_t cc_id_; // connection-info id
|
||||||
waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
||||||
shm::handle acc_h_;
|
|
||||||
|
|
||||||
struct simple_push {
|
struct simple_push {
|
||||||
|
|
||||||
@ -101,34 +112,23 @@ struct conn_info_head {
|
|||||||
++ wt_;
|
++ wt_;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
|
||||||
bool force_push(W* /*wrapper*/, F&& /*f*/, E* /*elems*/) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& /*f*/, E* /*elems*/) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
circ::elem_array<simple_push, sizeof(shm::handle), 0> msg_datas_;
|
circ::elem_array<simple_push, sizeof(shm::handle), 0> msg_datas_;
|
||||||
|
|
||||||
conn_info_head(char const * name)
|
conn_info_head(char const * name)
|
||||||
: name_(name)
|
: cc_id_ ((acc() == nullptr) ? 0 : acc()->fetch_add(1, std::memory_order_relaxed))
|
||||||
, cc_waiter_(("__CC_CONN__" + name_).c_str())
|
, cc_waiter_((std::string { "__CC_CONN__" } + name).c_str())
|
||||||
, wt_waiter_(("__WT_CONN__" + name_).c_str())
|
, wt_waiter_((std::string { "__WT_CONN__" } + name).c_str())
|
||||||
, rd_waiter_(("__RD_CONN__" + name_).c_str())
|
, rd_waiter_((std::string { "__RD_CONN__" } + name).c_str()) {
|
||||||
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto acc() {
|
static shm::handle apply_storage(msg_id_t msg_id, std::size_t size) {
|
||||||
return static_cast<acc_t*>(acc_h_.get());
|
return { ("__ST_CONN__" + std::to_string(msg_id)).c_str(), size, shm::create };
|
||||||
}
|
}
|
||||||
|
|
||||||
shm::handle apply_storage(msg_id_t msg_id, std::size_t size, unsigned mode) {
|
static shm::handle apply_storage(msg_id_t msg_id) {
|
||||||
return { ("__IPC_DAT_STORAGE__" + name_ + "__" + std::to_string(msg_id)).c_str(), size, mode };
|
return { ("__ST_CONN__" + std::to_string(msg_id)).c_str(), 0, shm::open };
|
||||||
}
|
}
|
||||||
|
|
||||||
void store(shm::handle && dat) {
|
void store(shm::handle && dat) {
|
||||||
@ -270,7 +270,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
||||||
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
||||||
if (size > small_msg_limit) {
|
if (size > small_msg_limit) {
|
||||||
auto dat = info_of(h)->apply_storage(msg_id, size, shm::create);
|
auto dat = info_of(h)->apply_storage(msg_id, size);
|
||||||
void * buf = dat.get();
|
void * buf = dat.get();
|
||||||
if (buf != nullptr) {
|
if (buf != nullptr) {
|
||||||
std::memcpy(buf, data, size);
|
std::memcpy(buf, data, size);
|
||||||
@ -305,10 +305,10 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
|
|||||||
return send([](auto info, auto que, auto msg_id) {
|
return send([](auto info, auto que, auto msg_id) {
|
||||||
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
|
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(que, msg_id, remain, data, size);
|
return !que->push(info->cc_id_, msg_id, remain, data, size);
|
||||||
}, que->dis_flag() ? 0 : static_cast<std::size_t>(default_timeut))) {
|
}, que->dis_flag() ? 0 : static_cast<std::size_t>(default_timeut))) {
|
||||||
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
||||||
if (!que->force_push(que, msg_id, remain, data, size)) {
|
if (!que->force_push(info->cc_id_, msg_id, remain, data, size)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -322,7 +322,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size) {
|
|||||||
return send([](auto info, auto que, auto msg_id) {
|
return send([](auto info, auto que, auto msg_id) {
|
||||||
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
|
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(que, msg_id, remain, data, size);
|
return !que->push(info->cc_id_, msg_id, remain, data, size);
|
||||||
}, 0)) {
|
}, 0)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -345,16 +345,13 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
|||||||
while (1) {
|
while (1) {
|
||||||
// pop a new message
|
// pop a new message
|
||||||
typename queue_t::value_t msg;
|
typename queue_t::value_t msg;
|
||||||
if (!wait_for(info_of(h)->rd_waiter_,
|
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
|
||||||
[que, &msg] { return !que->pop(msg); }, tm)) {
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
info_of(h)->wt_waiter_.broadcast();
|
info_of(h)->wt_waiter_.broadcast();
|
||||||
if (msg.head_.que_ == nullptr) {
|
if ((info_of(h)->acc() != nullptr) && (msg.head_.conn_ == info_of(h)->cc_id_)) {
|
||||||
ipc::error("fail: recv, msg.head_.que_ == nullptr\n");
|
continue; // ignore message to self
|
||||||
return {};
|
|
||||||
}
|
}
|
||||||
if (msg.head_.que_ == que) continue; // pop next
|
|
||||||
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length
|
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length
|
||||||
auto remain = static_cast<std::size_t>(static_cast<int>(data_length) + msg.head_.remain_);
|
auto remain = static_cast<std::size_t>(static_cast<int>(data_length) + msg.head_.remain_);
|
||||||
// find cache with msg.head_.id_
|
// find cache with msg.head_.id_
|
||||||
@ -363,8 +360,8 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
|||||||
if (remain <= data_length) {
|
if (remain <= data_length) {
|
||||||
return make_cache(msg.data_, remain);
|
return make_cache(msg.data_, remain);
|
||||||
}
|
}
|
||||||
if (remain > small_msg_limit) {
|
if (msg.head_.storage_) {
|
||||||
auto dat = info_of(h)->apply_storage(msg.head_.id_, 0, shm::open);
|
auto dat = info_of(h)->apply_storage(msg.head_.id_);
|
||||||
void * buf = dat.get();
|
void * buf = dat.get();
|
||||||
if (buf != nullptr && remain <= dat.size()) {
|
if (buf != nullptr && remain <= dat.size()) {
|
||||||
auto id = dat.detach();
|
auto id = dat.detach();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user