fix bugs of large message buffer cache & recycle

This commit is contained in:
zhangyi 2019-10-24 12:23:52 +08:00
parent 80452b574b
commit dbe6d6d3c6
8 changed files with 164 additions and 92 deletions

View File

@ -37,27 +37,15 @@ public:
void * data() noexcept;
void const * data() const noexcept;
# if __cplusplus >= 201703L
template <typename T>
auto data() noexcept -> std::enable_if_t<!std::is_const_v<T>, T*> {
auto data() noexcept -> typename std::enable_if<!std::is_const<T>::value, T*>::type {
return static_cast<T*>(data());
}
template <typename T>
auto data() const noexcept -> std::enable_if_t<std::is_const_v<T>, T*> {
auto data() const noexcept -> typename std::enable_if<std::is_const<T>::value, T*>::type {
return static_cast<T*>(data());
}
# else /*__cplusplus < 201703L*/
template <typename T>
auto data() noexcept -> std::enable_if_t<!std::is_const<T>::value, T*> {
return static_cast<T*>(data());
}
template <typename T>
auto data() const noexcept -> std::enable_if_t<std::is_const<T>::value, T*> {
return static_cast<T*>(data());
}
# endif/*__cplusplus < 201703L*/
std::size_t size() const noexcept;
@ -84,6 +72,7 @@ public:
}
friend IPC_EXPORT bool operator==(buffer const & b1, buffer const & b2);
friend IPC_EXPORT bool operator!=(buffer const & b1, buffer const & b2);
private:
class buffer_;

View File

@ -29,6 +29,7 @@ enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64,
small_msg_limit = data_length,
large_msg_cache = 32,
default_timeut = 100 // ms
};

View File

@ -9,6 +9,10 @@ bool operator==(buffer const & b1, buffer const & b2) {
return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0);
}
bool operator!=(buffer const & b1, buffer const & b2) {
return !(b1 == b2);
}
class buffer::buffer_ : public pimpl<buffer_> {
public:
void* p_;

View File

@ -10,12 +10,14 @@
namespace ipc {
template <std::size_t DataSize, std::size_t AlignSize>
struct id_type {
uint_t<8> id_;
std::aligned_storage_t<DataSize, AlignSize> data_;
struct id_type;
id_type& operator=(uint_t<8> val) {
id_ = val;
template <std::size_t AlignSize>
struct id_type<0, AlignSize> {
uint_t<8> id_;
id_type& operator=(std::size_t val) {
id_ = static_cast<uint_t<8>>(val);
return (*this);
}
@ -24,18 +26,9 @@ struct id_type {
}
};
template <std::size_t AlignSize>
struct id_type<0, AlignSize> {
uint_t<8> id_;
id_type& operator=(uint_t<8> val) {
id_ = val;
return (*this);
}
operator uint_t<8>() const {
return id_;
}
template <std::size_t DataSize, std::size_t AlignSize>
struct id_type : id_type<0, AlignSize> {
std::aligned_storage_t<DataSize, AlignSize> data_;
};
template <std::size_t DataSize = 0,
@ -43,7 +36,7 @@ template <std::size_t DataSize = 0,
class id_pool {
public:
enum : std::size_t {
max_count = (std::numeric_limits<uint_t<8>>::max)() // 255
max_count = ipc::detail::min<std::size_t>(large_msg_cache, (std::numeric_limits<uint_t<8>>::max)())
};
private:
@ -59,7 +52,7 @@ public:
void init() {
for (std::size_t i = 0; i < max_count;) {
i = next_[i] = static_cast<uint_t<8>>(i + 1);
i = next_[i] = (i + 1);
}
}
@ -73,9 +66,7 @@ public:
}
std::size_t acquire() {
if (empty()) {
return invalid_value;
}
if (empty()) return invalid_value;
std::size_t id = cursor_;
cursor_ = next_[id]; // point to next
return id;

View File

@ -113,21 +113,55 @@ constexpr std::size_t calc_cls_size(std::size_t size) noexcept {
return (((size - 1) / small_msg_limit) + 1) * small_msg_limit;
}
std::pair<std::size_t, void*> apply_storage(std::size_t size) {
std::size_t cls_size = calc_cls_size(size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
auto& cls_storage(std::size_t cls_size) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(cls_lock());
return cls_storages()[cls_size];
}
template <typename T>
cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size) {
if (!cls_shm.id_info_.valid() &&
!cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) {
return {};
ipc::error("[%s] cls_shm.id_info_.acquire failed: cls_size = %zd\n", func, cls_size);
return nullptr;
}
auto info = static_cast<cls_info_t*>(cls_shm.id_info_.get());
if (info == nullptr) {
return {};
ipc::error("[%s] cls_shm.id_info_.get failed: cls_size = %zd\n", func, cls_size);
return nullptr;
}
return info;
}
template <typename T>
byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) {
if (id == invalid_value) {
return nullptr;
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(),
cls_size + sizeof(acc_t))) {
ipc::error("[%s] cls_shm.mems_[id].acquire failed: id = %zd, cls_size = %zd\n", func, id, cls_size);
return nullptr;
}
byte_t* ptr = static_cast<byte_t*>(cls_shm.mems_[id].get());
if (ptr == nullptr) {
ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size);
return nullptr;
}
return ptr;
}
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
if (conn_count == 0) return {};
std::size_t cls_size = calc_cls_size(size);
auto & cls_shm = cls_storage(cls_size);
auto info = cls_storage_info("apply_storage", cls_shm, cls_size);
if (info == nullptr) return {};
info->lock_.lock();
info->pool_.prepare();
@ -135,50 +169,80 @@ std::pair<std::size_t, void*> apply_storage(std::size_t size) {
auto id = info->pool_.acquire();
info->lock_.unlock();
if (id == invalid_value) {
return {};
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(), cls_size)) {
return {};
}
return { id, cls_shm.mems_[id].get() };
auto ptr = cls_storage_mem("apply_storage", cls_shm, cls_size, id);
if (ptr == nullptr) return {};
reinterpret_cast<acc_t*>(ptr + cls_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release);
return { id, ptr };
}
void* find_storage(std::size_t id, std::size_t size) {
std::size_t cls_size = calc_cls_size(size);
auto & cls_shm = cls_storage(cls_size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
if (id == invalid_value) {
auto ptr = cls_storage_mem("find_storage", cls_shm, cls_size, id);
if (ptr == nullptr) return nullptr;
if (reinterpret_cast<acc_t*>(ptr + cls_size)->load(std::memory_order_acquire) == 0) {
ipc::error("[find_storage] cc test failed: id = %zd, cls_size = %zd\n", id, cls_size);
return nullptr;
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(), cls_size)) {
return nullptr;
}
return cls_shm.mems_[id].get();
return ptr;
}
void recycle_storage(std::size_t id, std::size_t size) {
if (id == invalid_value) {
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size);
return;
}
std::size_t cls_size = calc_cls_size(size);
auto & cls_shm = cls_storage(cls_size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
if (!cls_shm.id_info_.valid() &&
!cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) {
if (!cls_shm.mems_[id].valid()) {
ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size);
return;
}
auto info = static_cast<cls_info_t*>(cls_shm.id_info_.get());
if (info == nullptr) {
byte_t* ptr = static_cast<byte_t*>(cls_shm.mems_[id].get());
if (ptr == nullptr) {
ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size);
return;
}
if (reinterpret_cast<acc_t*>(ptr + cls_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
// not the last receiver, just return
return;
}
auto info = cls_storage_info("recycle_storage", cls_shm, cls_size);
if (info == nullptr) return;
info->lock_.lock();
info->pool_.release(id);
info->lock_.unlock();
}
void clear_storage(std::size_t id, std::size_t size) {
if (id == invalid_value) {
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size);
return;
}
std::size_t cls_size = calc_cls_size(size);
auto & cls_shm = cls_storage(cls_size);
auto ptr = cls_storage_mem("clear_storage", cls_shm, cls_size, id);
if (ptr == nullptr) return;
auto cc_flag = reinterpret_cast<acc_t*>(ptr + cls_size);
for (unsigned k = 0;;) {
auto cc_curr = cc_flag->load(std::memory_order_acquire);
if (cc_curr == 0) return; // means this id has been cleared
if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) {
break;
}
ipc::yield(k);
}
auto info = cls_storage_info("clear_storage", cls_shm, cls_size);
if (info == nullptr) return;
info->lock_.lock();
info->pool_.release(id);
@ -339,27 +403,28 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
if (size > small_msg_limit) {
auto dat = apply_storage(size);
auto dat = apply_storage(que->conn_count(), size);
void * buf = dat.second;
if (buf != nullptr) {
std::memcpy(buf, data, size);
return try_push(static_cast<int>(size) - static_cast<int>(data_length), &(dat.first), 0);
return try_push(static_cast<std::int32_t>(size) -
static_cast<std::int32_t>(data_length), &(dat.first), 0);
}
// try using message fragment
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
}
// push message fragment
int offset = 0;
std::int32_t offset = 0;
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
if (!try_push(static_cast<int>(size) - offset - static_cast<int>(data_length),
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(data_length),
static_cast<byte_t const *>(data) + offset, data_length)) {
return false;
}
}
// if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset;
std::int32_t remain = static_cast<std::int32_t>(size) - offset;
if (remain > 0) {
if (!try_push(remain - static_cast<int>(data_length),
if (!try_push(remain - static_cast<std::int32_t>(data_length),
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
return false;
}
@ -369,12 +434,19 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size);
}, que->dis_flag() ? 0 : static_cast<std::size_t>(default_timeut))) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
if (!que->force_push(info->cc_id_, msg_id, remain, data, size)) {
if (!que->force_push([](void* p) {
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
if (tmp_msg->storage_) {
clear_storage(*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
static_cast<std::int32_t>(data_length) + tmp_msg->remain_);
}
return true;
}, info->cc_id_, msg_id, remain, data, size)) {
return false;
}
}
@ -386,7 +458,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
static bool try_send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size);
}, 0)) {
@ -419,7 +491,7 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
continue; // ignore message to self
}
// msg.remain_ may minus & abs(msg.remain_) < data_length
auto remain = static_cast<std::size_t>(static_cast<std::int32_t>(data_length) + msg.remain_);
std::size_t remain = static_cast<std::int32_t>(data_length) + msg.remain_;
// find cache with msg.id_
auto cac_it = rc.find(msg.id_);
if (cac_it == rc.end()) {

View File

@ -149,11 +149,11 @@ public:
});
}
template <typename T, typename... P>
auto force_push(P&&... params) {
template <typename T, typename F, typename... P>
auto force_push(F&& prep, P&&... params) {
if (elems_ == nullptr) return false;
return elems_->force_push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
});
}

View File

@ -8,6 +8,7 @@
#include <string>
#include <memory>
#include <mutex>
#include <utility>
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
@ -106,7 +107,7 @@ void benchmark_prod_cons(T* cq) {
// std::unique_lock<capo::spin_lock> guard { lc };
// std::cout << cid << "-recving: " << (i * 100) / (Loops * N) << "%" << std::endl;
// }
vf.push_data(cid, msg);
vf.push_data(cid, std::forward<decltype(msg)>(msg));
++i;
});
// {

View File

@ -44,7 +44,7 @@ struct test_verify {
void prepare(void* /*pt*/) {}
void push_data(int cid, ipc::buff_t & msg) {
void push_data(int cid, ipc::buff_t && msg) {
list_[cid].emplace_back(std::move(msg));
}
@ -90,7 +90,7 @@ struct test_cq<ipc::route> {
QCOMPARE(msg, ipc::buff_t('\0'));
return;
}
proc(msg);
proc(std::move(msg));
} while(1);
}
@ -136,7 +136,7 @@ struct test_cq<ipc::channel> {
QCOMPARE(msg, ipc::buff_t('\0'));
return;
}
proc(msg);
proc(std::move(msg));
} while(1);
}
@ -313,7 +313,7 @@ void test_prod_cons() {
}
void Unit::test_route() {
//return;
// return;
std::vector<char const *> const datas = {
"hello!",
"foo",
@ -402,12 +402,24 @@ void Unit::test_route_performance() {
void Unit::test_channel() {
// return;
int fail_v = 0;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {
ipc::buff_t dd = cc.recv();
if (dd.size() < 2) return;
QCOMPARE(dd, datas__[i]);
if (dd != datas__[i]) {
std::printf("fail recv: %zd-[%zd, %zd]\n", i, dd.size(), datas__[i].size());
// for (std::size_t k = 0; k < dd.size(); ++k) {
// if (dd.data<ipc::byte_t>()[k] != datas__[i].data<ipc::byte_t>()[k]) {
// std::printf("fail check: %zd-%zd, %02x != %02x\n",
// i, k, (unsigned)dd .data<ipc::byte_t>()[k],
// (unsigned)datas__[i].data<ipc::byte_t>()[k]);
// }
// }
++fail_v;
}
}
}};
@ -415,7 +427,7 @@ void Unit::test_channel() {
ipc::channel cc { "my-ipc-channel" };
cc.wait_for_recv(1);
for (std::size_t i = 0; i < static_cast<std::size_t>((std::min)(100, LoopCount)); ++i) {
std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
std::printf("sending: %zd-[%zd]\n", i, datas__[i].size());
cc.send(datas__[i]);
}
cc.send(ipc::buff_t('\0'));
@ -423,6 +435,8 @@ void Unit::test_channel() {
}};
t2.join();
QCOMPARE(fail_v, 0);
}
void Unit::test_channel_rtt() {