mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
commit
564f511a95
@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16;
|
|||||||
std::atomic<bool> is_quit__{ false };
|
std::atomic<bool> is_quit__{ false };
|
||||||
std::atomic<std::size_t> size_counter__{ 0 };
|
std::atomic<std::size_t> size_counter__{ 0 };
|
||||||
|
|
||||||
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
|
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
|
||||||
|
|
||||||
msg_que_t que__{ name__ };
|
msg_que_t que__{ name__ };
|
||||||
ipc::byte_t buff__[max_sz];
|
ipc::byte_t buff__[max_sz];
|
||||||
|
|||||||
@ -29,7 +29,7 @@ enum : std::size_t {
|
|||||||
invalid_value = (std::numeric_limits<std::size_t>::max)(),
|
invalid_value = (std::numeric_limits<std::size_t>::max)(),
|
||||||
data_length = 64,
|
data_length = 64,
|
||||||
large_msg_limit = data_length,
|
large_msg_limit = data_length,
|
||||||
large_msg_align = 512,
|
large_msg_align = 1024,
|
||||||
large_msg_cache = 32,
|
large_msg_cache = 32,
|
||||||
default_timeout = 100 // ms
|
default_timeout = 100 // ms
|
||||||
};
|
};
|
||||||
|
|||||||
301
src/ipc.cpp
301
src/ipc.cpp
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
#include "libipc/utility/log.h"
|
#include "libipc/utility/log.h"
|
||||||
#include "libipc/utility/id_pool.h"
|
#include "libipc/utility/id_pool.h"
|
||||||
|
#include "libipc/utility/scope_guard.h"
|
||||||
#include "libipc/utility/utility.h"
|
#include "libipc/utility/utility.h"
|
||||||
|
|
||||||
#include "libipc/memory/resource.h"
|
#include "libipc/memory/resource.h"
|
||||||
@ -39,7 +40,7 @@ struct msg_t;
|
|||||||
|
|
||||||
template <std::size_t AlignSize>
|
template <std::size_t AlignSize>
|
||||||
struct msg_t<0, AlignSize> {
|
struct msg_t<0, AlignSize> {
|
||||||
msg_id_t conn_;
|
msg_id_t cc_id_;
|
||||||
msg_id_t id_;
|
msg_id_t id_;
|
||||||
std::int32_t remain_;
|
std::int32_t remain_;
|
||||||
bool storage_;
|
bool storage_;
|
||||||
@ -50,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> {
|
|||||||
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
|
|
||||||
msg_t() = default;
|
msg_t() = default;
|
||||||
msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s)
|
msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size)
|
||||||
: msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } {
|
: msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} {
|
||||||
if (this->storage_) {
|
if (this->storage_) {
|
||||||
if (d != nullptr) {
|
if (data != nullptr) {
|
||||||
// copy storage-id
|
// copy storage-id
|
||||||
*reinterpret_cast<std::size_t*>(&data_) = *static_cast<std::size_t const *>(d);
|
*reinterpret_cast<ipc::storage_id_t*>(&data_) =
|
||||||
|
*static_cast<ipc::storage_id_t const *>(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else std::memcpy(&data_, d, s);
|
else std::memcpy(&data_, data, size);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -90,26 +92,46 @@ auto cc_acc() {
|
|||||||
return static_cast<acc_t*>(acc_h.get());
|
return static_cast<acc_t*>(acc_h.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
|
||||||
|
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
|
||||||
|
}
|
||||||
|
|
||||||
|
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
|
||||||
|
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(
|
||||||
|
ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>)) + size));
|
||||||
|
}
|
||||||
|
|
||||||
|
struct chunk_t {
|
||||||
|
std::atomic<ipc::circ::cc_t> &conns() noexcept {
|
||||||
|
return *reinterpret_cast<std::atomic<ipc::circ::cc_t> *>(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *data() noexcept {
|
||||||
|
return reinterpret_cast<ipc::byte_t *>(this)
|
||||||
|
+ ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
struct chunk_info_t {
|
struct chunk_info_t {
|
||||||
ipc::id_pool<> pool_;
|
ipc::id_pool<> pool_;
|
||||||
ipc::spin_lock lock_;
|
ipc::spin_lock lock_;
|
||||||
|
|
||||||
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
|
|
||||||
return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
|
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
|
||||||
return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size);
|
return ipc::id_pool<>::max_count * chunk_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept {
|
ipc::byte_t *chunks_mem() noexcept {
|
||||||
if (id == ipc::invalid_value) return nullptr;
|
return reinterpret_cast<ipc::byte_t *>(this + 1);
|
||||||
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunks_elem_size(chunk_size) * id);
|
}
|
||||||
|
|
||||||
|
chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
|
||||||
|
if (id < 0) return nullptr;
|
||||||
|
return reinterpret_cast<chunk_t *>(chunks_mem() + (chunk_size * id));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto& chunk_storages() {
|
auto& chunk_storages() {
|
||||||
class chunk_t {
|
class chunk_handle_t {
|
||||||
ipc::shm::handle handle_;
|
ipc::shm::handle handle_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -128,31 +150,29 @@ auto& chunk_storages() {
|
|||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
static ipc::unordered_map<std::size_t, chunk_t> chunk_s;
|
static ipc::map<std::size_t, chunk_handle_t> chunk_hs;
|
||||||
return chunk_s;
|
return chunk_hs;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& chunk_lock() {
|
chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
|
||||||
static ipc::spin_lock chunk_l;
|
auto &storages = chunk_storages();
|
||||||
return chunk_l;
|
std::decay_t<decltype(storages)>::iterator it;
|
||||||
|
{
|
||||||
|
static ipc::rw_lock lock;
|
||||||
|
IPC_UNUSED_ std::shared_lock<ipc::rw_lock> guard {lock};
|
||||||
|
if ((it = storages.find(chunk_size)) == storages.end()) {
|
||||||
|
using chunk_handle_t = std::decay_t<decltype(storages)>::value_type::second_type;
|
||||||
|
guard.unlock();
|
||||||
|
IPC_UNUSED_ std::lock_guard<ipc::rw_lock> guard {lock};
|
||||||
|
it = storages.emplace(chunk_size, chunk_handle_t{}).first;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return it->second.get_info(chunk_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr std::size_t calc_chunk_size(std::size_t size) noexcept {
|
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size, ipc::circ::cc_t conns) {
|
||||||
return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto& chunk_storage(std::size_t chunk_size) {
|
|
||||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock());
|
|
||||||
return chunk_storages()[chunk_size];
|
|
||||||
}
|
|
||||||
|
|
||||||
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
|
|
||||||
if (conn_count == 0) return {};
|
|
||||||
|
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto & chunk_shm = chunk_storage(chunk_size);
|
auto info = chunk_storage_info(chunk_size);
|
||||||
|
|
||||||
auto info = chunk_shm.get_info(chunk_size);
|
|
||||||
if (info == nullptr) return {};
|
if (info == nullptr) return {};
|
||||||
|
|
||||||
info->lock_.lock();
|
info->lock_.lock();
|
||||||
@ -161,90 +181,92 @@ std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t
|
|||||||
auto id = info->pool_.acquire();
|
auto id = info->pool_.acquire();
|
||||||
info->lock_.unlock();
|
info->lock_.unlock();
|
||||||
|
|
||||||
auto ptr = info->at(chunk_size, id);
|
auto chunk = info->at(chunk_size, id);
|
||||||
if (ptr == nullptr) return {};
|
if (chunk == nullptr) return {};
|
||||||
reinterpret_cast<acc_t*>(ptr + chunk_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release);
|
chunk->conns().store(conns, std::memory_order_relaxed);
|
||||||
return { id, ptr };
|
return { id, chunk->data() };
|
||||||
}
|
}
|
||||||
|
|
||||||
void *find_storage(std::size_t id, std::size_t size) {
|
void *find_storage(ipc::storage_id_t id, std::size_t size) {
|
||||||
if (id == ipc::invalid_value) {
|
if (id < 0) {
|
||||||
ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size);
|
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto & chunk_shm = chunk_storage(chunk_size);
|
auto info = chunk_storage_info(chunk_size);
|
||||||
|
|
||||||
auto info = chunk_shm.get_info(chunk_size);
|
|
||||||
if (info == nullptr) return nullptr;
|
if (info == nullptr) return nullptr;
|
||||||
|
return info->at(chunk_size, id)->data();
|
||||||
auto ptr = info->at(chunk_size, id);
|
|
||||||
if (ptr == nullptr) return nullptr;
|
|
||||||
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->load(std::memory_order_acquire) == 0) {
|
|
||||||
ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size);
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
return ptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void recycle_storage(std::size_t id, std::size_t size) {
|
void release_storage(ipc::storage_id_t id, std::size_t size) {
|
||||||
if (id == ipc::invalid_value) {
|
if (id < 0) {
|
||||||
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size);
|
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
auto & chunk_shm = chunk_storage(chunk_size);
|
auto info = chunk_storage_info(chunk_size);
|
||||||
|
|
||||||
auto info = chunk_shm.get_info(chunk_size);
|
|
||||||
if (info == nullptr) return;
|
if (info == nullptr) return;
|
||||||
|
|
||||||
auto ptr = info->at(chunk_size, id);
|
|
||||||
if (ptr == nullptr) {
|
|
||||||
ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
|
|
||||||
// not the last receiver, just return
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
info->lock_.lock();
|
info->lock_.lock();
|
||||||
info->pool_.release(id);
|
info->pool_.release(id);
|
||||||
info->lock_.unlock();
|
info->lock_.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void clear_storage(std::size_t id, std::size_t size) {
|
template <ipc::relat Rp, ipc::relat Rc>
|
||||||
if (id == ipc::invalid_value) {
|
bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::unicast>,
|
||||||
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size);
|
std::atomic<ipc::circ::cc_t> &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
template <ipc::relat Rp, ipc::relat Rc>
|
||||||
auto & chunk_shm = chunk_storage(chunk_size);
|
bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,
|
||||||
|
std::atomic<ipc::circ::cc_t> &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept {
|
||||||
auto info = chunk_shm.get_info(chunk_size);
|
auto last_conns = curr_conns & ~conn_id;
|
||||||
if (info == nullptr) return;
|
|
||||||
|
|
||||||
auto ptr = info->at(chunk_size, id);
|
|
||||||
if (ptr == nullptr) return;
|
|
||||||
|
|
||||||
auto cc_flag = reinterpret_cast<acc_t*>(ptr + chunk_size);
|
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cc_curr = cc_flag->load(std::memory_order_acquire);
|
auto chunk_conns = conns.load(std::memory_order_acquire);
|
||||||
if (cc_curr == 0) return; // means this id has been cleared
|
if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) {
|
||||||
if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) {
|
return (chunk_conns & last_conns) == 0;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Flag>
|
||||||
|
void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
|
||||||
|
if (id < 0) {
|
||||||
|
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
|
auto info = chunk_storage_info(chunk_size);
|
||||||
|
if (info == nullptr) return;
|
||||||
|
|
||||||
|
auto chunk = info->at(chunk_size, id);
|
||||||
|
if (chunk == nullptr) return;
|
||||||
|
|
||||||
|
if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
info->lock_.lock();
|
info->lock_.lock();
|
||||||
info->pool_.release(id);
|
info->pool_.release(id);
|
||||||
info->lock_.unlock();
|
info->lock_.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename MsgT>
|
||||||
|
bool clear_message(void* p) {
|
||||||
|
auto msg = static_cast<MsgT*>(p);
|
||||||
|
if (msg->storage_) {
|
||||||
|
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
|
||||||
|
if (r_size <= 0) {
|
||||||
|
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
release_storage(
|
||||||
|
*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
|
||||||
|
static_cast<std::size_t>(r_size));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
struct conn_info_head {
|
struct conn_info_head {
|
||||||
|
|
||||||
ipc::string name_;
|
ipc::string name_;
|
||||||
@ -324,8 +346,10 @@ struct queue_generator {
|
|||||||
template <typename Policy>
|
template <typename Policy>
|
||||||
struct detail_impl {
|
struct detail_impl {
|
||||||
|
|
||||||
using queue_t = typename queue_generator<Policy>::queue_t;
|
using policy_t = Policy;
|
||||||
using conn_info_t = typename queue_generator<Policy>::conn_info_t;
|
using flag_t = typename policy_t::flag_t;
|
||||||
|
using queue_t = typename queue_generator<policy_t>::queue_t;
|
||||||
|
using conn_info_t = typename queue_generator<policy_t>::conn_info_t;
|
||||||
|
|
||||||
constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept {
|
constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept {
|
||||||
return static_cast<conn_info_t*>(h);
|
return static_cast<conn_info_t*>(h);
|
||||||
@ -419,7 +443,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
ipc::error("fail: send, que->ready_sending() == false\n");
|
ipc::error("fail: send, que->ready_sending() == false\n");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (que->elems()->connections(std::memory_order_relaxed) == 0) {
|
ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
|
||||||
|
if (conns == 0) {
|
||||||
ipc::error("fail: send, there is no receiver on this connection.\n");
|
ipc::error("fail: send, there is no receiver on this connection.\n");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -432,7 +457,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
||||||
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
||||||
if (size > ipc::large_msg_limit) {
|
if (size > ipc::large_msg_limit) {
|
||||||
auto dat = apply_storage(que->conn_count(), size);
|
auto dat = acquire_storage(size, conns);
|
||||||
void * buf = dat.second;
|
void * buf = dat.second;
|
||||||
if (buf != nullptr) {
|
if (buf != nullptr) {
|
||||||
std::memcpy(buf, data, size);
|
std::memcpy(buf, data, size);
|
||||||
@ -440,11 +465,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
|
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
|
||||||
}
|
}
|
||||||
// try using message fragment
|
// try using message fragment
|
||||||
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
|
//ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
|
||||||
}
|
}
|
||||||
// push message fragment
|
// push message fragment
|
||||||
std::int32_t offset = 0;
|
std::int32_t offset = 0;
|
||||||
for (int i = 0; i < static_cast<int>(size / ipc::data_length); ++i, offset += ipc::data_length) {
|
for (std::int32_t i = 0; i < static_cast<std::int32_t>(size / ipc::data_length); ++i, offset += ipc::data_length) {
|
||||||
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length),
|
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length),
|
||||||
static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) {
|
static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) {
|
||||||
return false;
|
return false;
|
||||||
@ -466,18 +491,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
|
|||||||
return send([tm](auto info, auto que, auto msg_id) {
|
return send([tm](auto info, auto que, auto msg_id) {
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(info->cc_id_, msg_id, remain, data, size);
|
return !que->push(
|
||||||
|
[](void*) { return true; },
|
||||||
|
info->cc_id_, msg_id, remain, data, size);
|
||||||
}, tm)) {
|
}, tm)) {
|
||||||
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
||||||
if (!que->force_push([](void* p) {
|
if (!que->force_push(
|
||||||
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
|
clear_message<typename queue_t::value_t>,
|
||||||
if (tmp_msg->storage_) {
|
info->cc_id_, msg_id, remain, data, size)) {
|
||||||
clear_storage(
|
|
||||||
*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
|
|
||||||
static_cast<std::int32_t>(ipc::data_length) + tmp_msg->remain_);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}, info->cc_id_, msg_id, remain, data, size)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -491,7 +512,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
|
|||||||
return send([tm](auto info, auto que, auto msg_id) {
|
return send([tm](auto info, auto que, auto msg_id) {
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
return !que->push(info->cc_id_, msg_id, remain, data, size);
|
return !que->push(
|
||||||
|
[](void*) { return true; },
|
||||||
|
info->cc_id_, msg_id, remain, data, size);
|
||||||
}, tm)) {
|
}, tm)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -512,34 +535,60 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
auto& rc = info_of(h)->recv_cache();
|
auto& rc = info_of(h)->recv_cache();
|
||||||
while (1) {
|
for (;;) {
|
||||||
// pop a new message
|
// pop a new message
|
||||||
typename queue_t::value_t msg;
|
typename queue_t::value_t msg;
|
||||||
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
|
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] {
|
||||||
|
return !que->pop(msg);
|
||||||
|
}, tm)) {
|
||||||
// pop failed, just return.
|
// pop failed, just return.
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
info_of(h)->wt_waiter_.broadcast();
|
info_of(h)->wt_waiter_.broadcast();
|
||||||
if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) {
|
if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) {
|
||||||
continue; // ignore message to self
|
continue; // ignore message to self
|
||||||
}
|
}
|
||||||
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
||||||
std::size_t remain = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
|
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
|
||||||
|
if (r_size <= 0) {
|
||||||
|
ipc::error("fail: recv, r_size = %d\n", (int)r_size);
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
std::size_t msg_size = static_cast<std::size_t>(r_size);
|
||||||
|
// large message
|
||||||
|
if (msg.storage_) {
|
||||||
|
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
|
||||||
|
void* buf = find_storage(buf_id, msg_size);
|
||||||
|
if (buf != nullptr) {
|
||||||
|
struct recycle_t {
|
||||||
|
ipc::storage_id_t storage_id;
|
||||||
|
ipc::circ::cc_t curr_conns;
|
||||||
|
ipc::circ::cc_t conn_id;
|
||||||
|
} *r_info = ipc::mem::alloc<recycle_t>(recycle_t{
|
||||||
|
buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id()
|
||||||
|
});
|
||||||
|
if (r_info == nullptr) {
|
||||||
|
ipc::log("fail: ipc::mem::alloc<recycle_t>.\n");
|
||||||
|
return ipc::buff_t{buf, msg_size}; // no recycle
|
||||||
|
} else {
|
||||||
|
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
|
||||||
|
auto r_info = static_cast<recycle_t *>(p_info);
|
||||||
|
IPC_UNUSED_ auto finally = ipc::guard([r_info] {
|
||||||
|
ipc::mem::free(r_info);
|
||||||
|
});
|
||||||
|
recycle_storage<flag_t>(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id);
|
||||||
|
}, r_info};
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
// find cache with msg.id_
|
// find cache with msg.id_
|
||||||
auto cac_it = rc.find(msg.id_);
|
auto cac_it = rc.find(msg.id_);
|
||||||
if (cac_it == rc.end()) {
|
if (cac_it == rc.end()) {
|
||||||
if (remain <= ipc::data_length) {
|
if (msg_size <= ipc::data_length) {
|
||||||
return make_cache(msg.data_, remain);
|
return make_cache(msg.data_, msg_size);
|
||||||
}
|
|
||||||
if (msg.storage_) {
|
|
||||||
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
|
|
||||||
void * buf = find_storage(buf_id, remain);
|
|
||||||
if (buf != nullptr) {
|
|
||||||
return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) {
|
|
||||||
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
|
|
||||||
}, reinterpret_cast<void*>(buf_id + 1) };
|
|
||||||
}
|
|
||||||
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
|
|
||||||
}
|
}
|
||||||
// gc
|
// gc
|
||||||
if (rc.size() > 1024) {
|
if (rc.size() > 1024) {
|
||||||
@ -553,14 +602,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
|
|||||||
for (auto id : need_del) rc.erase(id);
|
for (auto id : need_del) rc.erase(id);
|
||||||
}
|
}
|
||||||
// cache the first message fragment
|
// cache the first message fragment
|
||||||
rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) });
|
rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) });
|
||||||
}
|
}
|
||||||
// has cached before this message
|
// has cached before this message
|
||||||
else {
|
else {
|
||||||
auto& cac = cac_it->second;
|
auto& cac = cac_it->second;
|
||||||
// this is the last message fragment
|
// this is the last message fragment
|
||||||
if (msg.remain_ <= 0) {
|
if (msg.remain_ <= 0) {
|
||||||
cac.append(&(msg.data_), remain);
|
cac.append(&(msg.data_), msg_size);
|
||||||
// finish this message, erase it from cache
|
// finish this message, erase it from cache
|
||||||
auto buff = std::move(cac.buff_);
|
auto buff = std::move(cac.buff_);
|
||||||
rc.erase(cac_it);
|
rc.erase(cac_it);
|
||||||
|
|||||||
@ -130,10 +130,10 @@ public:
|
|||||||
return head_.force_push(que, std::forward<F>(f), block_);
|
return head_.force_push(que, std::forward<F>(f), block_);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Q, typename F>
|
template <typename Q, typename F, typename R>
|
||||||
bool pop(Q* que, cursor_t* cur, F&& f) {
|
bool pop(Q* que, cursor_t* cur, F&& f, R&& out) {
|
||||||
if (cur == nullptr) return false;
|
if (cur == nullptr) return false;
|
||||||
return head_.pop(que, *cur, std::forward<F>(f), block_);
|
return head_.pop(que, *cur, std::forward<F>(f), std::forward<R>(out), block_);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -16,7 +16,7 @@ namespace circ {
|
|||||||
using u1_t = ipc::uint_t<8>;
|
using u1_t = ipc::uint_t<8>;
|
||||||
using u2_t = ipc::uint_t<32>;
|
using u2_t = ipc::uint_t<32>;
|
||||||
|
|
||||||
/** only supports max 32 connections */
|
/** only supports max 32 connections in broadcast mode */
|
||||||
using cc_t = u2_t;
|
using cc_t = u2_t;
|
||||||
|
|
||||||
constexpr u1_t index_of(u2_t c) noexcept {
|
constexpr u1_t index_of(u2_t c) noexcept {
|
||||||
|
|||||||
@ -5,6 +5,7 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
|
|
||||||
@ -49,6 +50,11 @@ using unordered_map = std::unordered_map<
|
|||||||
Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>>
|
Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>>
|
||||||
>;
|
>;
|
||||||
|
|
||||||
|
template <typename Key, typename T>
|
||||||
|
using map = std::map<
|
||||||
|
Key, T, std::less<Key>, ipc::mem::allocator<std::pair<const Key, T>>
|
||||||
|
>;
|
||||||
|
|
||||||
template <typename Char>
|
template <typename Char>
|
||||||
using basic_string = std::basic_string<
|
using basic_string = std::basic_string<
|
||||||
Char, std::char_traits<Char>, ipc::mem::allocator<Char>
|
Char, std::char_traits<Char>, ipc::mem::allocator<Char>
|
||||||
|
|||||||
@ -111,5 +111,17 @@ constexpr const T& (min)(const T& a, const T& b) {
|
|||||||
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
#endif/*__cplusplus < 201703L*/
|
||||||
|
|
||||||
|
template <typename T, typename U>
|
||||||
|
auto horrible_cast(U rhs) noexcept
|
||||||
|
-> typename std::enable_if<std::is_trivially_copyable<T>::value
|
||||||
|
&& std::is_trivially_copyable<U>::value, T>::type {
|
||||||
|
union {
|
||||||
|
T t;
|
||||||
|
U u;
|
||||||
|
} r = {};
|
||||||
|
r.u = rhs;
|
||||||
|
return r.t;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -15,8 +15,10 @@ struct choose;
|
|||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
struct choose<circ::elem_array, Flag> {
|
struct choose<circ::elem_array, Flag> {
|
||||||
|
using flag_t = Flag;
|
||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize, AlignSize>;
|
using elems_t = circ::elem_array<ipc::prod_cons_impl<flag_t>, DataSize, AlignSize>;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace policy
|
} // namespace policy
|
||||||
|
|||||||
@ -58,13 +58,14 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename W, typename F, typename R, typename E>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) {
|
||||||
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
||||||
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(&(elems[cur_rd].data_));
|
std::forward<F>(f)(&(elems[cur_rd].data_));
|
||||||
|
std::forward<R>(out)(true);
|
||||||
rd_.fetch_add(1, std::memory_order_release);
|
rd_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -80,8 +81,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
template <typename W, typename F, typename R,
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
|
||||||
byte_t buff[DS];
|
byte_t buff[DS];
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
@ -92,6 +94,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
|||||||
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
std::forward<F>(f)(buff);
|
std::forward<F>(f)(buff);
|
||||||
|
std::forward<R>(out)(true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
@ -156,8 +159,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
template <typename W, typename F, typename R,
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
|
||||||
byte_t buff[DS];
|
byte_t buff[DS];
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
@ -179,6 +183,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
std::forward<F>(f)(buff);
|
std::forward<F>(f)(buff);
|
||||||
|
std::forward<R>(out)(true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
@ -263,20 +268,20 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename W, typename F, typename R, typename E>
|
||||||
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) {
|
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
|
||||||
if (cur == cursor()) return false; // acquire
|
if (cur == cursor()) return false; // acquire
|
||||||
auto* el = elems + circ::index_of(cur++);
|
auto* el = elems + circ::index_of(cur++);
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
circ::cc_t rem_cc = cur_rc & ep_mask;
|
if ((cur_rc & ep_mask) == 0) {
|
||||||
if (rem_cc == 0) {
|
std::forward<R>(out)(true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (el->rc_.compare_exchange_weak(cur_rc,
|
auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
|
||||||
cur_rc & ~static_cast<rc_t>(wrapper->connected_id()),
|
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
||||||
std::memory_order_release)) {
|
std::forward<R>(out)((nxt_rc & ep_mask) == 0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
@ -395,8 +400,8 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E, std::size_t N>
|
template <typename W, typename F, typename R, typename E, std::size_t N>
|
||||||
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) {
|
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
|
||||||
auto* el = elems + circ::index_of(cur);
|
auto* el = elems + circ::index_of(cur);
|
||||||
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
||||||
if (cur_fl != ~static_cast<flag_t>(cur)) {
|
if (cur_fl != ~static_cast<flag_t>(cur)) {
|
||||||
@ -406,17 +411,18 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
|||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
circ::cc_t rem_cc = cur_rc & rc_mask;
|
if ((cur_rc & rc_mask) == 0) {
|
||||||
if (rem_cc == 0) {
|
std::forward<R>(out)(true);
|
||||||
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if ((rem_cc & ~wrapper->connected_id()) == 0) {
|
auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
|
||||||
|
bool last_one = false;
|
||||||
|
if ((last_one = (nxt_rc & rc_mask) == 0)) {
|
||||||
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
||||||
}
|
}
|
||||||
if (el->rc_.compare_exchange_weak(cur_rc,
|
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
||||||
inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()),
|
std::forward<R>(out)(last_one);
|
||||||
std::memory_order_release)) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
|
|||||||
@ -155,11 +155,11 @@ public:
|
|||||||
return !valid() || (cursor_ == elems_->cursor());
|
return !valid() || (cursor_ == elems_->cursor());
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T, typename... P>
|
template <typename T, typename F, typename... P>
|
||||||
bool push(P&&... params) {
|
bool push(F&& prep, P&&... params) {
|
||||||
if (elems_ == nullptr) return false;
|
if (elems_ == nullptr) return false;
|
||||||
return elems_->push(this, [&](void* p) {
|
return elems_->push(this, [&](void* p) {
|
||||||
::new (p) T(std::forward<P>(params)...);
|
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,14 +171,14 @@ public:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T, typename F>
|
||||||
bool pop(T& item) {
|
bool pop(T& item, F&& out) {
|
||||||
if (elems_ == nullptr) {
|
if (elems_ == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
|
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
|
||||||
::new (&item) T(std::move(*static_cast<T*>(p)));
|
::new (&item) T(std::move(*static_cast<T*>(p)));
|
||||||
});
|
}, std::forward<F>(out));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -204,7 +204,12 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool pop(T& item) {
|
bool pop(T& item) {
|
||||||
return base_t::pop(item);
|
return base_t::pop(item, [](bool) {});
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename F>
|
||||||
|
bool pop(T& item, F&& out) {
|
||||||
|
return base_t::pop(item, std::forward<F>(out));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -2,13 +2,15 @@
|
|||||||
|
|
||||||
#include <type_traits> // std::aligned_storage_t
|
#include <type_traits> // std::aligned_storage_t
|
||||||
#include <cstring> // std::memcmp
|
#include <cstring> // std::memcmp
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
#include "libipc/def.h"
|
#include "libipc/def.h"
|
||||||
|
|
||||||
#include "libipc/platform/detail.h"
|
#include "libipc/platform/detail.h"
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
|
using storage_id_t = std::int32_t;
|
||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct id_type;
|
struct id_type;
|
||||||
|
|
||||||
@ -16,7 +18,7 @@ template <std::size_t AlignSize>
|
|||||||
struct id_type<0, AlignSize> {
|
struct id_type<0, AlignSize> {
|
||||||
uint_t<8> id_;
|
uint_t<8> id_;
|
||||||
|
|
||||||
id_type& operator=(std::size_t val) {
|
id_type& operator=(storage_id_t val) {
|
||||||
id_ = static_cast<uint_t<8>>(val);
|
id_ = static_cast<uint_t<8>>(val);
|
||||||
return (*this);
|
return (*this);
|
||||||
}
|
}
|
||||||
@ -57,7 +59,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void init() {
|
void init() {
|
||||||
for (std::size_t i = 0; i < max_count;) {
|
for (storage_id_t i = 0; i < max_count;) {
|
||||||
i = next_[i] = (i + 1);
|
i = next_[i] = (i + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -71,22 +73,22 @@ public:
|
|||||||
return cursor_ == max_count;
|
return cursor_ == max_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t acquire() {
|
storage_id_t acquire() {
|
||||||
if (empty()) return invalid_value;
|
if (empty()) return -1;
|
||||||
std::size_t id = cursor_;
|
storage_id_t id = cursor_;
|
||||||
cursor_ = next_[id]; // point to next
|
cursor_ = next_[id]; // point to next
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool release(std::size_t id) {
|
bool release(storage_id_t id) {
|
||||||
if (id == invalid_value) return false;
|
if (id < 0) return false;
|
||||||
next_[id] = cursor_;
|
next_[id] = cursor_;
|
||||||
cursor_ = static_cast<uint_t<8>>(id); // put it back
|
cursor_ = static_cast<uint_t<8>>(id); // put it back
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void * at(std::size_t id) { return &(next_[id].data_); }
|
void * at(storage_id_t id) { return &(next_[id].data_); }
|
||||||
void const * at(std::size_t id) const { return &(next_[id].data_); }
|
void const * at(storage_id_t id) const { return &(next_[id].data_); }
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
@ -94,8 +96,8 @@ class obj_pool : public id_pool<sizeof(T), alignof(T)> {
|
|||||||
using base_t = id_pool<sizeof(T), alignof(T)>;
|
using base_t = id_pool<sizeof(T), alignof(T)>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
T * at(std::size_t id) { return reinterpret_cast<T *>(base_t::at(id)); }
|
T * at(storage_id_t id) { return reinterpret_cast<T *>(base_t::at(id)); }
|
||||||
T const * at(std::size_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); }
|
T const * at(storage_id_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); }
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <mutex>
|
||||||
|
#include <atomic>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
|
||||||
#include "libipc/ipc.h"
|
#include "libipc/ipc.h"
|
||||||
@ -16,15 +18,28 @@ using namespace ipc;
|
|||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
constexpr int LoopCount = 10000;
|
||||||
|
constexpr int MultiMax = 8;
|
||||||
|
|
||||||
|
struct msg_head {
|
||||||
|
int id_;
|
||||||
|
};
|
||||||
|
|
||||||
class rand_buf : public buffer {
|
class rand_buf : public buffer {
|
||||||
public:
|
public:
|
||||||
rand_buf() {
|
rand_buf() {
|
||||||
int size = capo::random<>{1, 65536}();
|
int size = capo::random<>{sizeof(msg_head), 65536}();
|
||||||
*this = buffer(new char[size], size, [](void * p, std::size_t) {
|
*this = buffer(new char[size], size, [](void * p, std::size_t) {
|
||||||
delete [] static_cast<char *>(p);
|
delete [] static_cast<char *>(p);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rand_buf(msg_head const &msg) {
|
||||||
|
*this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) {
|
||||||
|
delete static_cast<msg_head *>(p);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
rand_buf(rand_buf &&) = default;
|
rand_buf(rand_buf &&) = default;
|
||||||
rand_buf(rand_buf const & rhs) {
|
rand_buf(rand_buf const & rhs) {
|
||||||
if (rhs.empty()) return;
|
if (rhs.empty()) return;
|
||||||
@ -40,11 +55,11 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void set_id(int k) noexcept {
|
void set_id(int k) noexcept {
|
||||||
*get<char *>() = static_cast<char>(k);
|
get<msg_head *>()->id_ = k;
|
||||||
}
|
}
|
||||||
|
|
||||||
int get_id() const noexcept {
|
int get_id() const noexcept {
|
||||||
return static_cast<int>(*get<char *>());
|
return get<msg_head *>()->id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
using buffer::operator=;
|
using buffer::operator=;
|
||||||
@ -60,64 +75,79 @@ void test_basic(char const * name) {
|
|||||||
EXPECT_FALSE(que1.try_send(test2));
|
EXPECT_FALSE(que1.try_send(test2));
|
||||||
|
|
||||||
que_t que2 { que1.name(), ipc::receiver };
|
que_t que2 { que1.name(), ipc::receiver };
|
||||||
EXPECT_TRUE(que1.send(test1));
|
ASSERT_TRUE(que1.send(test1));
|
||||||
EXPECT_TRUE(que1.try_send(test2));
|
ASSERT_TRUE(que1.try_send(test2));
|
||||||
|
|
||||||
EXPECT_EQ(que2.recv(), test1);
|
EXPECT_EQ(que2.recv(), test1);
|
||||||
EXPECT_EQ(que2.recv(), test2);
|
EXPECT_EQ(que2.recv(), test2);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <relat Rp, relat Rc, trans Ts>
|
class data_set {
|
||||||
void test_sr(char const * name, int size, int s_cnt, int r_cnt) {
|
std::vector<rand_buf> datas_;
|
||||||
using que_t = chan<Rp, Rc, Ts>;
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
data_set() {
|
||||||
|
datas_.resize(LoopCount);
|
||||||
|
for (int i = 0; i < LoopCount; ++i) {
|
||||||
|
datas_[i].set_id(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<rand_buf> const &get() const noexcept {
|
||||||
|
return datas_;
|
||||||
|
}
|
||||||
|
} const data_set__;
|
||||||
|
|
||||||
|
template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>>
|
||||||
|
void test_sr(char const * name, int s_cnt, int r_cnt) {
|
||||||
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
|
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
|
||||||
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
|
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
|
||||||
|
|
||||||
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||||
ipc_ut::test_stopwatch sw;
|
ipc_ut::test_stopwatch sw;
|
||||||
std::vector<rand_buf> tests(size);
|
|
||||||
|
|
||||||
for (int k = 0; k < s_cnt; ++k) {
|
for (int k = 0; k < s_cnt; ++k) {
|
||||||
ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] {
|
ipc_ut::sender() << [name, &sw, r_cnt, k] {
|
||||||
que_t que1 { name };
|
Que que { name, ipc::sender };
|
||||||
EXPECT_TRUE(que1.wait_for_recv(r_cnt));
|
EXPECT_TRUE(que.wait_for_recv(r_cnt));
|
||||||
sw.start();
|
sw.start();
|
||||||
for (auto & buf : tests) {
|
for (int i = 0; i < (int)data_set__.get().size(); ++i) {
|
||||||
rand_buf data { buf };
|
EXPECT_TRUE(que.send(data_set__.get()[i]));
|
||||||
data.set_id(k);
|
|
||||||
EXPECT_TRUE(que1.send(data));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int k = 0; k < r_cnt; ++k) {
|
for (int k = 0; k < r_cnt; ++k) {
|
||||||
ipc_ut::reader() << [name, &tests, s_cnt] {
|
ipc_ut::reader() << [name] {
|
||||||
que_t que2 { name, ipc::receiver };
|
Que que { name, ipc::receiver };
|
||||||
std::vector<int> cursors(s_cnt);
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
rand_buf got { que2.recv() };
|
rand_buf got { que.recv() };
|
||||||
ASSERT_FALSE(got.empty());
|
ASSERT_FALSE(got.empty());
|
||||||
int & cur = cursors.at(got.get_id());
|
int i = got.get_id();
|
||||||
ASSERT_TRUE((cur >= 0) && (cur < static_cast<int>(tests.size())));
|
if (i == -1) {
|
||||||
rand_buf buf { tests.at(cur++) };
|
return;
|
||||||
buf.set_id(got.get_id());
|
}
|
||||||
EXPECT_EQ(got, buf);
|
ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
|
||||||
int n = 0;
|
auto const &data_set = data_set__.get()[i];
|
||||||
for (; n < static_cast<int>(cursors.size()); ++n) {
|
if (data_set != got) {
|
||||||
if (cursors[n] < static_cast<int>(tests.size())) break;
|
printf("data_set__.get()[%d] != got, size = %zd/%zd\n",
|
||||||
|
i, data_set.size(), got.size());
|
||||||
|
EXPECT_TRUE(false);
|
||||||
}
|
}
|
||||||
if (n == static_cast<int>(cursors.size())) break;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
ipc_ut::sender().wait_for_done();
|
ipc_ut::sender().wait_for_done();
|
||||||
|
Que que { name };
|
||||||
|
EXPECT_TRUE(que.wait_for_recv(r_cnt));
|
||||||
|
for (int k = 0; k < r_cnt; ++k) {
|
||||||
|
que.send(rand_buf{msg_head{-1}});
|
||||||
|
}
|
||||||
ipc_ut::reader().wait_for_done();
|
ipc_ut::reader().wait_for_done();
|
||||||
sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, size, name);
|
sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, (int)data_set__.get().size(), name);
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr int LoopCount = 10000;
|
|
||||||
constexpr int MultiMax = 8;
|
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|
||||||
TEST(IPC, basic) {
|
TEST(IPC, basic) {
|
||||||
@ -129,22 +159,26 @@ TEST(IPC, basic) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(IPC, 1v1) {
|
TEST(IPC, 1v1) {
|
||||||
test_sr<relat::single, relat::single, trans::unicast >("ssu", LoopCount, 1, 1);
|
test_sr<relat::single, relat::single, trans::unicast >("ssu", 1, 1);
|
||||||
test_sr<relat::single, relat::multi , trans::unicast >("smu", LoopCount, 1, 1);
|
test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, 1);
|
||||||
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", LoopCount, 1, 1);
|
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, 1);
|
||||||
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, 1);
|
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, 1);
|
||||||
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, 1);
|
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(IPC, 1vN) {
|
TEST(IPC, 1vN) {
|
||||||
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, MultiMax);
|
//test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, MultiMax);
|
||||||
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, MultiMax);
|
//test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, MultiMax);
|
||||||
|
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, MultiMax);
|
||||||
|
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, MultiMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(IPC, Nv1) {
|
TEST(IPC, Nv1) {
|
||||||
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, 1);
|
//test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, 1);
|
||||||
|
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(IPC, NvN) {
|
TEST(IPC, NvN) {
|
||||||
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, MultiMax);
|
//test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, MultiMax);
|
||||||
|
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, MultiMax);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -44,7 +44,7 @@ constexpr int ThreadMax = 8;
|
|||||||
|
|
||||||
template <typename Que>
|
template <typename Que>
|
||||||
void push(Que & que, int p, int d) {
|
void push(Que & que, int p, int d) {
|
||||||
for (int n = 0; !que.push(p, d); ++n) {
|
for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) {
|
||||||
ASSERT_NE(n, PushRetry);
|
ASSERT_NE(n, PushRetry);
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,6 +9,8 @@
|
|||||||
#include <cstddef> // std::size_t
|
#include <cstddef> // std::size_t
|
||||||
#include <cassert> // assert
|
#include <cassert> // assert
|
||||||
|
|
||||||
|
#include "capo/scope_guard.hpp"
|
||||||
|
|
||||||
namespace ipc_ut {
|
namespace ipc_ut {
|
||||||
|
|
||||||
class thread_pool final {
|
class thread_pool final {
|
||||||
@ -32,6 +34,9 @@ class thread_pool final {
|
|||||||
if (pool->quit_) return;
|
if (pool->quit_) return;
|
||||||
if (pool->jobs_.empty()) {
|
if (pool->jobs_.empty()) {
|
||||||
pool->waiting_cnt_ += 1;
|
pool->waiting_cnt_ += 1;
|
||||||
|
CAPO_SCOPE_GUARD_ = [pool] {
|
||||||
|
pool->waiting_cnt_ -= 1;
|
||||||
|
};
|
||||||
|
|
||||||
if (pool->waiting_cnt_ == pool->workers_.size()) {
|
if (pool->waiting_cnt_ == pool->workers_.size()) {
|
||||||
pool->cv_empty_.notify_all();
|
pool->cv_empty_.notify_all();
|
||||||
@ -41,8 +46,6 @@ class thread_pool final {
|
|||||||
pool->cv_jobs_.wait(guard);
|
pool->cv_jobs_.wait(guard);
|
||||||
if (pool->quit_) return;
|
if (pool->quit_) return;
|
||||||
} while (pool->jobs_.empty());
|
} while (pool->jobs_.empty());
|
||||||
|
|
||||||
pool->waiting_cnt_ -= 1;
|
|
||||||
}
|
}
|
||||||
assert(!pool->jobs_.empty());
|
assert(!pool->jobs_.empty());
|
||||||
job = std::move(pool->jobs_.front());
|
job = std::move(pool->jobs_.front());
|
||||||
@ -71,6 +74,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void start(std::size_t n) {
|
void start(std::size_t n) {
|
||||||
|
std::unique_lock<std::mutex> guard { lock_ };
|
||||||
if (n <= workers_.size()) return;
|
if (n <= workers_.size()) return;
|
||||||
for (std::size_t i = workers_.size(); i < n; ++i) {
|
for (std::size_t i = workers_.size(); i < n; ++i) {
|
||||||
workers_.push_back(std::thread { &thread_pool::proc, this });
|
workers_.push_back(std::thread { &thread_pool::proc, this });
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user