mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
Add a user interface with a custom name prefix.
This commit is contained in:
parent
6d4cf14132
commit
21648c5f47
@ -65,4 +65,9 @@ struct relat_trait<wr<Rp, Rc, Ts>> {
|
|||||||
template <template <typename> class Policy, typename Flag>
|
template <template <typename> class Policy, typename Flag>
|
||||||
struct relat_trait<Policy<Flag>> : relat_trait<Flag> {};
|
struct relat_trait<Policy<Flag>> : relat_trait<Flag> {};
|
||||||
|
|
||||||
|
// the prefix tag of a channel
|
||||||
|
struct prefix {
|
||||||
|
char const *str;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -22,6 +22,7 @@ struct IPC_EXPORT chan_impl {
|
|||||||
static ipc::handle_t inited();
|
static ipc::handle_t inited();
|
||||||
|
|
||||||
static bool connect (ipc::handle_t * ph, char const * name, unsigned mode);
|
static bool connect (ipc::handle_t * ph, char const * name, unsigned mode);
|
||||||
|
static bool connect (ipc::handle_t * ph, prefix, char const * name, unsigned mode);
|
||||||
static bool reconnect (ipc::handle_t * ph, unsigned mode);
|
static bool reconnect (ipc::handle_t * ph, unsigned mode);
|
||||||
static void disconnect(ipc::handle_t h);
|
static void disconnect(ipc::handle_t h);
|
||||||
static void destroy (ipc::handle_t h);
|
static void destroy (ipc::handle_t h);
|
||||||
@ -54,6 +55,10 @@ public:
|
|||||||
: connected_{this->connect(name, mode)} {
|
: connected_{this->connect(name, mode)} {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chan_wrapper(prefix pref, char const * name, unsigned mode = ipc::sender)
|
||||||
|
: connected_{this->connect(pref, name, mode)} {
|
||||||
|
}
|
||||||
|
|
||||||
chan_wrapper(chan_wrapper&& rhs) noexcept
|
chan_wrapper(chan_wrapper&& rhs) noexcept
|
||||||
: chan_wrapper{} {
|
: chan_wrapper{} {
|
||||||
swap(rhs);
|
swap(rhs);
|
||||||
@ -102,6 +107,11 @@ public:
|
|||||||
detail_t::disconnect(h_); // clear old connection
|
detail_t::disconnect(h_); // clear old connection
|
||||||
return connected_ = detail_t::connect(&h_, name, mode_ = mode);
|
return connected_ = detail_t::connect(&h_, name, mode_ = mode);
|
||||||
}
|
}
|
||||||
|
bool connect(prefix pref, char const * name, unsigned mode = ipc::sender | ipc::receiver) {
|
||||||
|
if (name == nullptr || name[0] == '\0') return false;
|
||||||
|
detail_t::disconnect(h_); // clear old connection
|
||||||
|
return connected_ = detail_t::connect(&h_, pref, name, mode_ = mode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try connecting with new mode flags.
|
* Try connecting with new mode flags.
|
||||||
|
|||||||
@ -37,8 +37,8 @@ private:
|
|||||||
elem_t block_[elem_max] {};
|
elem_t block_[elem_max] {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @remarks 'warning C4348: redefinition of default parameter' with MSVC.
|
* \remarks 'warning C4348: redefinition of default parameter' with MSVC.
|
||||||
* @see
|
* \see
|
||||||
* - https://stackoverflow.com/questions/12656239/redefinition-of-default-template-parameter
|
* - https://stackoverflow.com/questions/12656239/redefinition-of-default-template-parameter
|
||||||
* - https://developercommunity.visualstudio.com/content/problem/425978/incorrect-c4348-warning-in-nested-template-declara.html
|
* - https://developercommunity.visualstudio.com/content/problem/425978/incorrect-c4348-warning-in-nested-template-declara.html
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -69,6 +69,11 @@ ipc::buff_t make_cache(T& data, std::size_t size) {
|
|||||||
return { ptr, size, ipc::mem::free };
|
return { ptr, size, ipc::mem::free };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
acc_t *cc_acc() {
|
||||||
|
static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t));
|
||||||
|
return static_cast<acc_t *>(acc_h.get());
|
||||||
|
}
|
||||||
|
|
||||||
struct cache_t {
|
struct cache_t {
|
||||||
std::size_t fill_;
|
std::size_t fill_;
|
||||||
ipc::buff_t buff_;
|
ipc::buff_t buff_;
|
||||||
@ -85,10 +90,39 @@ struct cache_t {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto cc_acc() {
|
struct conn_info_head {
|
||||||
static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t));
|
|
||||||
return static_cast<acc_t*>(acc_h.get());
|
ipc::string prefix_;
|
||||||
}
|
ipc::string name_;
|
||||||
|
msg_id_t cc_id_; // connection-info id
|
||||||
|
ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
||||||
|
ipc::shm::handle acc_h_;
|
||||||
|
|
||||||
|
conn_info_head(char const * prefix, char const * name)
|
||||||
|
: prefix_ {ipc::make_string(prefix)}
|
||||||
|
, name_ {ipc::make_string(name)}
|
||||||
|
, cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)}
|
||||||
|
, cc_waiter_{ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str()}
|
||||||
|
, wt_waiter_{ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str()}
|
||||||
|
, rd_waiter_{ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str()}
|
||||||
|
, acc_h_ {ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t)} {
|
||||||
|
}
|
||||||
|
|
||||||
|
void quit_waiting() {
|
||||||
|
cc_waiter_.quit_waiting();
|
||||||
|
wt_waiter_.quit_waiting();
|
||||||
|
rd_waiter_.quit_waiting();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto acc() {
|
||||||
|
return static_cast<acc_t*>(acc_h_.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& recv_cache() {
|
||||||
|
thread_local ipc::unordered_map<msg_id_t, cache_t> tls;
|
||||||
|
return tls;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
|
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
|
||||||
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
|
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
|
||||||
@ -130,17 +164,27 @@ struct chunk_info_t {
|
|||||||
|
|
||||||
auto& chunk_storages() {
|
auto& chunk_storages() {
|
||||||
class chunk_handle_t {
|
class chunk_handle_t {
|
||||||
ipc::shm::handle handle_;
|
ipc::unordered_map<ipc::string, ipc::shm::handle> handles_;
|
||||||
|
|
||||||
|
static bool make_handle(ipc::shm::handle &h, ipc::string const &shm_name, std::size_t chunk_size) {
|
||||||
|
if (!h.valid() &&
|
||||||
|
!h.acquire( shm_name.c_str(),
|
||||||
|
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
|
||||||
|
ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
chunk_info_t *get_info(std::size_t chunk_size) {
|
chunk_info_t *get_info(conn_info_head *inf, std::size_t chunk_size) {
|
||||||
if (!handle_.valid() &&
|
ipc::string pref {(inf == nullptr) ? ipc::string{} : inf->prefix_};
|
||||||
!handle_.acquire( ("__CHUNK_INFO__" + ipc::to_string(chunk_size)).c_str(),
|
ipc::string shm_name {ipc::make_prefix(pref, {"CHUNK_INFO__", ipc::to_string(chunk_size)})};
|
||||||
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
|
ipc::shm::handle &h = handles_[pref];
|
||||||
ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size);
|
if (!make_handle(h, shm_name, chunk_size)) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
auto info = static_cast<chunk_info_t*>(handle_.get());
|
auto *info = static_cast<chunk_info_t*>(h.get());
|
||||||
if (info == nullptr) {
|
if (info == nullptr) {
|
||||||
ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size);
|
ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -152,7 +196,7 @@ auto& chunk_storages() {
|
|||||||
return chunk_hs;
|
return chunk_hs;
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
|
chunk_info_t *chunk_storage_info(conn_info_head *inf, std::size_t chunk_size) {
|
||||||
auto &storages = chunk_storages();
|
auto &storages = chunk_storages();
|
||||||
std::decay_t<decltype(storages)>::iterator it;
|
std::decay_t<decltype(storages)>::iterator it;
|
||||||
{
|
{
|
||||||
@ -165,12 +209,12 @@ chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
|
|||||||
it = storages.emplace(chunk_size, chunk_handle_t{}).first;
|
it = storages.emplace(chunk_size, chunk_handle_t{}).first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return it->second.get_info(chunk_size);
|
return it->second.get_info(inf, chunk_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size, ipc::circ::cc_t conns) {
|
std::pair<ipc::storage_id_t, void*> acquire_storage(conn_info_head *inf, std::size_t size, ipc::circ::cc_t conns) {
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto info = chunk_storage_info(chunk_size);
|
auto info = chunk_storage_info(inf, chunk_size);
|
||||||
if (info == nullptr) return {};
|
if (info == nullptr) return {};
|
||||||
|
|
||||||
info->lock_.lock();
|
info->lock_.lock();
|
||||||
@ -185,24 +229,24 @@ std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size, ipc::circ:
|
|||||||
return { id, chunk->data() };
|
return { id, chunk->data() };
|
||||||
}
|
}
|
||||||
|
|
||||||
void *find_storage(ipc::storage_id_t id, std::size_t size) {
|
void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto info = chunk_storage_info(chunk_size);
|
auto info = chunk_storage_info(inf, chunk_size);
|
||||||
if (info == nullptr) return nullptr;
|
if (info == nullptr) return nullptr;
|
||||||
return info->at(chunk_size, id)->data();
|
return info->at(chunk_size, id)->data();
|
||||||
}
|
}
|
||||||
|
|
||||||
void release_storage(ipc::storage_id_t id, std::size_t size) {
|
void release_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto info = chunk_storage_info(chunk_size);
|
auto info = chunk_storage_info(inf, chunk_size);
|
||||||
if (info == nullptr) return;
|
if (info == nullptr) return;
|
||||||
info->lock_.lock();
|
info->lock_.lock();
|
||||||
info->pool_.release(id);
|
info->pool_.release(id);
|
||||||
@ -229,13 +273,13 @@ bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
|
void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto info = chunk_storage_info(chunk_size);
|
auto info = chunk_storage_info(inf, chunk_size);
|
||||||
if (info == nullptr) return;
|
if (info == nullptr) return;
|
||||||
|
|
||||||
auto chunk = info->at(chunk_size, id);
|
auto chunk = info->at(chunk_size, id);
|
||||||
@ -250,7 +294,7 @@ void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t cur
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename MsgT>
|
template <typename MsgT>
|
||||||
bool clear_message(void* p) {
|
bool clear_message(conn_info_head *inf, void* p) {
|
||||||
auto msg = static_cast<MsgT*>(p);
|
auto msg = static_cast<MsgT*>(p);
|
||||||
if (msg->storage_) {
|
if (msg->storage_) {
|
||||||
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
|
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
|
||||||
@ -258,45 +302,12 @@ bool clear_message(void* p) {
|
|||||||
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
|
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
release_storage(
|
release_storage(*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
|
||||||
*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
|
inf, static_cast<std::size_t>(r_size));
|
||||||
static_cast<std::size_t>(r_size));
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct conn_info_head {
|
|
||||||
|
|
||||||
ipc::string name_;
|
|
||||||
msg_id_t cc_id_; // connection-info id
|
|
||||||
ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
|
||||||
ipc::shm::handle acc_h_;
|
|
||||||
|
|
||||||
conn_info_head(char const * name)
|
|
||||||
: name_ {name}
|
|
||||||
, cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)}
|
|
||||||
, cc_waiter_{("__CC_CONN__" + name_).c_str()}
|
|
||||||
, wt_waiter_{("__WT_CONN__" + name_).c_str()}
|
|
||||||
, rd_waiter_{("__RD_CONN__" + name_).c_str()}
|
|
||||||
, acc_h_ {("__AC_CONN__" + name_).c_str(), sizeof(acc_t)} {
|
|
||||||
}
|
|
||||||
|
|
||||||
void quit_waiting() {
|
|
||||||
cc_waiter_.quit_waiting();
|
|
||||||
wt_waiter_.quit_waiting();
|
|
||||||
rd_waiter_.quit_waiting();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto acc() {
|
|
||||||
return static_cast<acc_t*>(acc_h_.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
auto& recv_cache() {
|
|
||||||
thread_local ipc::unordered_map<msg_id_t, cache_t> tls;
|
|
||||||
return tls;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <typename W, typename F>
|
template <typename W, typename F>
|
||||||
bool wait_for(W& waiter, F&& pred, std::uint64_t tm) {
|
bool wait_for(W& waiter, F&& pred, std::uint64_t tm) {
|
||||||
if (tm == 0) return !pred();
|
if (tm == 0) return !pred();
|
||||||
@ -322,11 +333,13 @@ struct queue_generator {
|
|||||||
struct conn_info_t : conn_info_head {
|
struct conn_info_t : conn_info_head {
|
||||||
queue_t que_;
|
queue_t que_;
|
||||||
|
|
||||||
conn_info_t(char const * name)
|
conn_info_t(char const * pref, char const * name)
|
||||||
: conn_info_head{name}
|
: conn_info_head{pref, name}
|
||||||
, que_{("__QU_CONN__" +
|
, que_{ipc::make_prefix(prefix_, {
|
||||||
ipc::to_string(DataSize) + "__" +
|
"QU_CONN__",
|
||||||
ipc::to_string(AlignSize) + "__" + name).c_str()} {
|
ipc::to_string(DataSize), "__",
|
||||||
|
ipc::to_string(AlignSize), "__",
|
||||||
|
name}).c_str()} {
|
||||||
}
|
}
|
||||||
|
|
||||||
void disconnect_receiver() {
|
void disconnect_receiver() {
|
||||||
@ -389,14 +402,18 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
|
|||||||
return que->ready_sending();
|
return que->ready_sending();
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
|
static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
|
||||||
assert(ph != nullptr);
|
assert(ph != nullptr);
|
||||||
if (*ph == nullptr) {
|
if (*ph == nullptr) {
|
||||||
*ph = ipc::mem::alloc<conn_info_t>(name);
|
*ph = ipc::mem::alloc<conn_info_t>(pref.str, name);
|
||||||
}
|
}
|
||||||
return reconnect(ph, start_to_recv);
|
return reconnect(ph, start_to_recv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
|
||||||
|
return connect(ph, {nullptr}, name, start_to_recv);
|
||||||
|
}
|
||||||
|
|
||||||
static void destroy(ipc::handle_t h) {
|
static void destroy(ipc::handle_t h) {
|
||||||
disconnect(h);
|
disconnect(h);
|
||||||
ipc::mem::free(info_of(h));
|
ipc::mem::free(info_of(h));
|
||||||
@ -445,15 +462,16 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// calc a new message id
|
// calc a new message id
|
||||||
auto acc = info_of(h)->acc();
|
conn_info_t *inf = info_of(h);
|
||||||
|
auto acc = inf->acc();
|
||||||
if (acc == nullptr) {
|
if (acc == nullptr) {
|
||||||
ipc::error("fail: send, info_of(h)->acc() == nullptr\n");
|
ipc::error("fail: send, info_of(h)->acc() == nullptr\n");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
||||||
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
auto try_push = std::forward<F>(gen_push)(inf, que, msg_id);
|
||||||
if (size > ipc::large_msg_limit) {
|
if (size > ipc::large_msg_limit) {
|
||||||
auto dat = acquire_storage(size, conns);
|
auto dat = acquire_storage(inf, size, conns);
|
||||||
void * buf = dat.second;
|
void * buf = dat.second;
|
||||||
if (buf != nullptr) {
|
if (buf != nullptr) {
|
||||||
std::memcpy(buf, data, size);
|
std::memcpy(buf, data, size);
|
||||||
@ -484,7 +502,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
}
|
}
|
||||||
|
|
||||||
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
||||||
return send([tm](auto info, auto que, auto msg_id) {
|
return send([tm](auto *info, auto *que, auto msg_id) {
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(
|
return !que->push(
|
||||||
@ -493,7 +511,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint
|
|||||||
}, tm)) {
|
}, tm)) {
|
||||||
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
||||||
if (!que->force_push(
|
if (!que->force_push(
|
||||||
clear_message<typename queue_t::value_t>,
|
[info](void* p) { return clear_message<typename queue_t::value_t>(info, p); },
|
||||||
info->cc_id_, msg_id, remain, data, size)) {
|
info->cc_id_, msg_id, remain, data, size)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -505,7 +523,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint
|
|||||||
}
|
}
|
||||||
|
|
||||||
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
||||||
return send([tm](auto info, auto que, auto msg_id) {
|
return send([tm](auto *info, auto *que, auto msg_id) {
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(
|
return !que->push(
|
||||||
@ -530,18 +548,19 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
// hasn't connected yet, just return.
|
// hasn't connected yet, just return.
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
auto& rc = info_of(h)->recv_cache();
|
conn_info_t *inf = info_of(h);
|
||||||
|
auto& rc = inf->recv_cache();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
// pop a new message
|
// pop a new message
|
||||||
typename queue_t::value_t msg {};
|
typename queue_t::value_t msg {};
|
||||||
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] {
|
if (!wait_for(inf->rd_waiter_, [que, &msg] {
|
||||||
return !que->pop(msg);
|
return !que->pop(msg);
|
||||||
}, tm)) {
|
}, tm)) {
|
||||||
// pop failed, just return.
|
// pop failed, just return.
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
info_of(h)->wt_waiter_.broadcast();
|
inf->wt_waiter_.broadcast();
|
||||||
if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) {
|
if ((inf->acc() != nullptr) && (msg.cc_id_ == inf->cc_id_)) {
|
||||||
continue; // ignore message to self
|
continue; // ignore message to self
|
||||||
}
|
}
|
||||||
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
||||||
@ -554,14 +573,18 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
// large message
|
// large message
|
||||||
if (msg.storage_) {
|
if (msg.storage_) {
|
||||||
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
|
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
|
||||||
void* buf = find_storage(buf_id, msg_size);
|
void* buf = find_storage(buf_id, inf, msg_size);
|
||||||
if (buf != nullptr) {
|
if (buf != nullptr) {
|
||||||
struct recycle_t {
|
struct recycle_t {
|
||||||
ipc::storage_id_t storage_id;
|
ipc::storage_id_t storage_id;
|
||||||
|
conn_info_t * inf;
|
||||||
ipc::circ::cc_t curr_conns;
|
ipc::circ::cc_t curr_conns;
|
||||||
ipc::circ::cc_t conn_id;
|
ipc::circ::cc_t conn_id;
|
||||||
} *r_info = ipc::mem::alloc<recycle_t>(recycle_t{
|
} *r_info = ipc::mem::alloc<recycle_t>(recycle_t{
|
||||||
buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id()
|
buf_id,
|
||||||
|
inf,
|
||||||
|
que->elems()->connections(std::memory_order_relaxed),
|
||||||
|
que->connected_id()
|
||||||
});
|
});
|
||||||
if (r_info == nullptr) {
|
if (r_info == nullptr) {
|
||||||
ipc::log("fail: ipc::mem::alloc<recycle_t>.\n");
|
ipc::log("fail: ipc::mem::alloc<recycle_t>.\n");
|
||||||
@ -572,7 +595,11 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
IPC_UNUSED_ auto finally = ipc::guard([r_info] {
|
IPC_UNUSED_ auto finally = ipc::guard([r_info] {
|
||||||
ipc::mem::free(r_info);
|
ipc::mem::free(r_info);
|
||||||
});
|
});
|
||||||
recycle_storage<flag_t>(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id);
|
recycle_storage<flag_t>(r_info->storage_id,
|
||||||
|
r_info->inf,
|
||||||
|
size,
|
||||||
|
r_info->curr_conns,
|
||||||
|
r_info->conn_id);
|
||||||
}, r_info};
|
}, r_info};
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -641,6 +668,11 @@ bool chan_impl<Flag>::connect(ipc::handle_t * ph, char const * name, unsigned mo
|
|||||||
return detail_impl<policy_t<Flag>>::connect(ph, name, mode & receiver);
|
return detail_impl<policy_t<Flag>>::connect(ph, name, mode & receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename Flag>
|
||||||
|
bool chan_impl<Flag>::connect(ipc::handle_t * ph, prefix pref, char const * name, unsigned mode) {
|
||||||
|
return detail_impl<policy_t<Flag>>::connect(ph, pref, name, mode & receiver);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
bool chan_impl<Flag>::reconnect(ipc::handle_t * ph, unsigned mode) {
|
bool chan_impl<Flag>::reconnect(ipc::handle_t * ph, unsigned mode) {
|
||||||
return detail_impl<policy_t<Flag>>::reconnect(ph, mode & receiver);
|
return detail_impl<policy_t<Flag>>::reconnect(ph, mode & receiver);
|
||||||
@ -658,7 +690,7 @@ void chan_impl<Flag>::destroy(ipc::handle_t h) {
|
|||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
char const * chan_impl<Flag>::name(ipc::handle_t h) {
|
char const * chan_impl<Flag>::name(ipc::handle_t h) {
|
||||||
auto info = detail_impl<policy_t<Flag>>::info_of(h);
|
auto *info = detail_impl<policy_t<Flag>>::info_of(h);
|
||||||
return (info == nullptr) ? nullptr : info->name_.c_str();
|
return (info == nullptr) ? nullptr : info->name_.c_str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -87,4 +87,24 @@ ipc::string to_string(T val) {
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// \brief Check string validity.
|
||||||
|
constexpr bool is_valid_string(char const *str) noexcept {
|
||||||
|
return (str != nullptr) && (str[0] != '\0');
|
||||||
|
}
|
||||||
|
|
||||||
|
/// \brief Make a valid string.
|
||||||
|
inline ipc::string make_string(char const *str) {
|
||||||
|
return is_valid_string(str) ? ipc::string{str} : ipc::string{};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// \brief Combine prefix from a list of strings.
|
||||||
|
inline ipc::string make_prefix(ipc::string prefix, std::initializer_list<ipc::string> args) {
|
||||||
|
prefix += "__IPC_SHM__";
|
||||||
|
for (auto const &txt: args) {
|
||||||
|
if (txt.empty()) continue;
|
||||||
|
prefix += txt;
|
||||||
|
}
|
||||||
|
return prefix;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -24,6 +24,7 @@
|
|||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <initializer_list>
|
||||||
|
|
||||||
// pre-defined
|
// pre-defined
|
||||||
|
|
||||||
@ -133,4 +134,4 @@ constexpr const T& (min)(const T& a, const T& b) {
|
|||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|
||||||
#endif // defined(__cplusplus)
|
#endif // defined(__cplusplus)
|
||||||
#endif // LIBIPC_SRC_PLATFORM_DETAIL_H_
|
#endif // LIBIPC_SRC_PLATFORM_DETAIL_H_
|
||||||
|
|||||||
@ -49,9 +49,6 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
|
|||||||
ipc::error("fail acquire: name is empty\n");
|
ipc::error("fail acquire: name is empty\n");
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
// For portable use, a shared memory object should be identified by name of the form /somename.
|
|
||||||
// see: https://man7.org/linux/man-pages/man3/shm_open.3.html
|
|
||||||
ipc::string op_name = ipc::string{"/__IPC_SHM__"} + name;
|
|
||||||
// Open the object for read-write access.
|
// Open the object for read-write access.
|
||||||
int flag = O_RDWR;
|
int flag = O_RDWR;
|
||||||
switch (mode) {
|
switch (mode) {
|
||||||
@ -68,9 +65,9 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
|
|||||||
flag |= O_CREAT;
|
flag |= O_CREAT;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
int fd = ::shm_open(op_name.c_str(), flag, S_IRUSR | S_IWUSR |
|
int fd = ::shm_open(name, flag, S_IRUSR | S_IWUSR |
|
||||||
S_IRGRP | S_IWGRP |
|
S_IRGRP | S_IWGRP |
|
||||||
S_IROTH | S_IWOTH);
|
S_IROTH | S_IWOTH);
|
||||||
if (fd == -1) {
|
if (fd == -1) {
|
||||||
ipc::error("fail shm_open[%d]: %s\n", errno, name);
|
ipc::error("fail shm_open[%d]: %s\n", errno, name);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -78,7 +75,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
|
|||||||
auto ii = mem::alloc<id_info_t>();
|
auto ii = mem::alloc<id_info_t>();
|
||||||
ii->fd_ = fd;
|
ii->fd_ = fd;
|
||||||
ii->size_ = size;
|
ii->size_ = size;
|
||||||
ii->name_ = std::move(op_name);
|
ii->name_ = name;
|
||||||
return ii;
|
return ii;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,7 +189,7 @@ void remove(char const * name) {
|
|||||||
ipc::error("fail remove: name is empty\n");
|
ipc::error("fail remove: name is empty\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
::shm_unlink((ipc::string{"__IPC_SHM__"} + name).c_str());
|
::shm_unlink(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace shm
|
} // namespace shm
|
||||||
|
|||||||
@ -1,44 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <Windows.h>
|
|
||||||
|
|
||||||
#include "libipc/memory/resource.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
|
||||||
namespace detail {
|
|
||||||
|
|
||||||
/// \brief This routine returns `true` if the caller's process is a member of the Administrators local group.
|
|
||||||
/// Caller is NOT expected to be impersonating anyone and is expected to be able to open its own process and process token.
|
|
||||||
/// \return true - Caller has Administrators local group.
|
|
||||||
/// false - Caller does not have Administrators local group.
|
|
||||||
/// \see https://learn.microsoft.com/en-us/windows/win32/api/securitybaseapi/nf-securitybaseapi-checktokenmembership
|
|
||||||
inline bool is_user_admin() {
|
|
||||||
SID_IDENTIFIER_AUTHORITY NtAuthority = SECURITY_NT_AUTHORITY;
|
|
||||||
PSID AdministratorsGroup;
|
|
||||||
BOOL b = AllocateAndInitializeSid(&NtAuthority,
|
|
||||||
2,
|
|
||||||
SECURITY_BUILTIN_DOMAIN_RID,
|
|
||||||
DOMAIN_ALIAS_RID_ADMINS,
|
|
||||||
0, 0, 0, 0, 0, 0,
|
|
||||||
&AdministratorsGroup);
|
|
||||||
if (b) {
|
|
||||||
if (!CheckTokenMembership(NULL, AdministratorsGroup, &b)) {
|
|
||||||
b = FALSE;
|
|
||||||
}
|
|
||||||
FreeSid(AdministratorsGroup);
|
|
||||||
}
|
|
||||||
return !!(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// \brief Identify the user group and add the appropriate prefix to the string.
|
|
||||||
/// \see http://msdn.microsoft.com/en-us/library/aa366551(v=VS.85).aspx
|
|
||||||
/// https://stackoverflow.com/questions/3999157/system-error-0x5-createfilemapping
|
|
||||||
inline ipc::string make_comfortable_prefix(ipc::string &&txt) {
|
|
||||||
if (is_user_admin()) {
|
|
||||||
return ipc::string{"Global\\"} + txt;
|
|
||||||
}
|
|
||||||
return txt;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace detail
|
|
||||||
} // namespace ipc
|
|
||||||
@ -76,7 +76,7 @@ public:
|
|||||||
}
|
}
|
||||||
DWORD ms = (tm == invalid_value) ? INFINITE : static_cast<DWORD>(tm);
|
DWORD ms = (tm == invalid_value) ? INFINITE : static_cast<DWORD>(tm);
|
||||||
/**
|
/**
|
||||||
* @see
|
* \see
|
||||||
* - https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf
|
* - https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf
|
||||||
* - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-signalobjectandwait
|
* - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-signalobjectandwait
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -9,7 +9,6 @@
|
|||||||
|
|
||||||
#include "to_tchar.h"
|
#include "to_tchar.h"
|
||||||
#include "get_sa.h"
|
#include "get_sa.h"
|
||||||
#include "comfortable_prefix.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
@ -34,8 +33,7 @@ public:
|
|||||||
|
|
||||||
bool open(char const *name) noexcept {
|
bool open(char const *name) noexcept {
|
||||||
close();
|
close();
|
||||||
h_ = ::CreateMutex(detail::get_sa(), FALSE,
|
h_ = ::CreateMutex(detail::get_sa(), FALSE, detail::to_tchar(name).c_str());
|
||||||
detail::to_tchar(detail::make_comfortable_prefix(name)).c_str());
|
|
||||||
if (h_ == NULL) {
|
if (h_ == NULL) {
|
||||||
ipc::error("fail CreateMutex[%lu]: %s\n", ::GetLastError(), name);
|
ipc::error("fail CreateMutex[%lu]: %s\n", ::GetLastError(), name);
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -8,7 +8,6 @@
|
|||||||
|
|
||||||
#include "to_tchar.h"
|
#include "to_tchar.h"
|
||||||
#include "get_sa.h"
|
#include "get_sa.h"
|
||||||
#include "comfortable_prefix.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
@ -33,7 +32,7 @@ public:
|
|||||||
close();
|
close();
|
||||||
h_ = ::CreateSemaphore(detail::get_sa(),
|
h_ = ::CreateSemaphore(detail::get_sa(),
|
||||||
static_cast<LONG>(count), LONG_MAX,
|
static_cast<LONG>(count), LONG_MAX,
|
||||||
detail::to_tchar(detail::make_comfortable_prefix(name)).c_str());
|
detail::to_tchar(name).c_str());
|
||||||
if (h_ == NULL) {
|
if (h_ == NULL) {
|
||||||
ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name);
|
ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name);
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -13,7 +13,6 @@
|
|||||||
|
|
||||||
#include "to_tchar.h"
|
#include "to_tchar.h"
|
||||||
#include "get_sa.h"
|
#include "get_sa.h"
|
||||||
#include "comfortable_prefix.h"
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
@ -34,7 +33,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
HANDLE h;
|
HANDLE h;
|
||||||
auto fmt_name = ipc::detail::to_tchar(detail::make_comfortable_prefix("__IPC_SHM__") + name);
|
auto fmt_name = ipc::detail::to_tchar(name);
|
||||||
// Opens a named file mapping object.
|
// Opens a named file mapping object.
|
||||||
if (mode == open) {
|
if (mode == open) {
|
||||||
h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str());
|
h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str());
|
||||||
|
|||||||
@ -41,8 +41,8 @@ constexpr auto to_tchar(ipc::string &&str) -> IsSameChar<T, ipc::string, ipc::st
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @remarks codecvt_utf8_utf16/std::wstring_convert is deprecated
|
* \remarks codecvt_utf8_utf16/std::wstring_convert is deprecated
|
||||||
* @see https://codingtidbit.com/2020/02/09/c17-codecvt_utf8-is-deprecated/
|
* \see https://codingtidbit.com/2020/02/09/c17-codecvt_utf8-is-deprecated/
|
||||||
* https://stackoverflow.com/questions/42946335/deprecated-header-codecvt-replacement
|
* https://stackoverflow.com/questions/42946335/deprecated-header-codecvt-replacement
|
||||||
* https://en.cppreference.com/w/cpp/locale/codecvt/in
|
* https://en.cppreference.com/w/cpp/locale/codecvt/in
|
||||||
* https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar
|
* https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user