use big message cache

This commit is contained in:
zhangyi 2019-10-23 16:23:07 +08:00
parent 8da0b32d0b
commit d4bf94c2a3
4 changed files with 232 additions and 73 deletions

View File

@ -28,7 +28,7 @@ using uint_t = typename uint<N>::type;
enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64,
small_msg_limit = data_length * 64 - 1, // 4095
small_msg_limit = data_length,
default_timeut = 100 // ms
};

104
src/id_pool.h Normal file
View File

@ -0,0 +1,104 @@
#pragma once
#include <type_traits> // std::aligned_storage_t
#include <cstring> // std::memcmp
#include "def.h"
#include "platform/detail.h"
namespace ipc {
template <std::size_t DataSize, std::size_t AlignSize>
struct id_type {
uint_t<8> id_;
std::aligned_storage_t<DataSize, AlignSize> data_;
id_type& operator=(uint_t<8> val) {
id_ = val;
return (*this);
}
operator uint_t<8>() const {
return id_;
}
};
template <std::size_t AlignSize>
struct id_type<0, AlignSize> {
uint_t<8> id_;
id_type& operator=(uint_t<8> val) {
id_ = val;
return (*this);
}
operator uint_t<8>() const {
return id_;
}
};
template <std::size_t DataSize = 0,
std::size_t AlignSize = (ipc::detail::min)(DataSize, alignof(std::max_align_t))>
class id_pool {
public:
enum : std::size_t {
max_count = (std::numeric_limits<uint_t<8>>::max)() // 255
};
private:
id_type<DataSize, AlignSize> next_[max_count];
uint_t<8> cursor_ = 0;
bool prepared_ = false;
public:
void prepare() {
if (!prepared_ && this->invalid()) this->init();
prepared_ = true;
}
void init() {
for (std::size_t i = 0; i < max_count;) {
i = next_[i] = static_cast<uint_t<8>>(i + 1);
}
}
bool invalid() const {
static id_pool inv;
return std::memcmp(this, &inv, sizeof(id_pool)) == 0;
}
bool empty() const {
return cursor_ == max_count;
}
std::size_t acquire() {
if (empty()) {
return invalid_value;
}
std::size_t id = cursor_;
cursor_ = next_[id]; // point to next
return id;
}
bool release(std::size_t id) {
if (id == invalid_value) return false;
next_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back
return true;
}
void * at(std::size_t id) { return &(next_[id].data_); }
void const * at(std::size_t id) const { return &(next_[id].data_); }
};
template <typename T>
class obj_pool : public id_pool<sizeof(T), alignof(T)> {
using base_t = id_pool<sizeof(T), alignof(T)>;
public:
T * at(std::size_t id) { return reinterpret_cast<T *>(base_t::at(id)); }
T const * at(std::size_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); }
};
} // namespace ipc

View File

@ -3,11 +3,12 @@
#include <type_traits>
#include <cstring>
#include <algorithm>
#include <utility>
#include <utility> // std::pair, std::move, std::forward
#include <atomic>
#include <type_traits> // aligned_storage_t
#include <string>
#include <vector>
#include <array>
#include "def.h"
#include "shm.h"
@ -17,6 +18,7 @@
#include "policy.h"
#include "rw_lock.h"
#include "log.h"
#include "id_pool.h"
#include "memory/resource.h"
@ -28,7 +30,9 @@
namespace {
using namespace ipc;
using msg_id_t = std::uint32_t;
using acc_t = std::atomic<msg_id_t>;
template <std::size_t DataSize, std::size_t AlignSize>
struct msg_t;
@ -48,10 +52,14 @@ struct msg_t : msg_t<0, AlignSize> {
msg_t() = default;
msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s)
: msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } {
if (!this->storage_) {
std::memcpy(&data_, d, s);
if (this->storage_) {
if (d != nullptr) {
// copy storage-id
*reinterpret_cast<std::size_t*>(&data_) = *static_cast<std::size_t const *>(d);
}
}
else std::memcpy(&data_, d, s);
}
};
template <typename T>
@ -77,13 +85,107 @@ struct cache_t {
}
};
struct conn_info_head {
using acc_t = std::atomic<msg_id_t>;
static auto cc_acc() {
auto cc_acc() {
static shm::handle acc_h("__CA_CONN__", sizeof(acc_t));
return static_cast<acc_t*>(acc_h.get());
}
auto& cls_storages() {
struct cls_t {
shm::handle id_info_;
std::array<shm::handle, id_pool<>::max_count> mems_;
};
static ipc::unordered_map<std::size_t, cls_t> cls_s;
return cls_s;
}
auto& cls_lock() {
static spin_lock cls_l;
return cls_l;
}
struct cls_info_t {
id_pool<> pool_;
spin_lock lock_;
};
constexpr std::size_t calc_cls_size(std::size_t size) noexcept {
return (((size - 1) / small_msg_limit) + 1) * small_msg_limit;
}
std::pair<std::size_t, void*> apply_storage(std::size_t size) {
std::size_t cls_size = calc_cls_size(size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
if (!cls_shm.id_info_.valid() &&
!cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) {
return {};
}
auto info = static_cast<cls_info_t*>(cls_shm.id_info_.get());
if (info == nullptr) {
return {};
}
info->lock_.lock();
info->pool_.prepare();
// got an unique id
auto id = info->pool_.acquire();
info->lock_.unlock();
if (id == invalid_value) {
return {};
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(), cls_size)) {
return {};
}
return { id, cls_shm.mems_[id].get() };
}
void* find_storage(msg_id_t id, std::size_t size) {
std::size_t cls_size = calc_cls_size(size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
if (id == invalid_value) {
return nullptr;
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(), cls_size)) {
return nullptr;
}
return cls_shm.mems_[id].get();
}
void recycle_storage(msg_id_t id, std::size_t size) {
std::size_t cls_size = calc_cls_size(size);
cls_lock().lock();
auto& cls_shm = cls_storages()[cls_size];
cls_lock().unlock();
if (!cls_shm.id_info_.valid() &&
!cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) {
return;
}
auto info = static_cast<cls_info_t*>(cls_shm.id_info_.get());
if (info == nullptr) {
return;
}
info->lock_.lock();
info->pool_.release(id);
info->lock_.unlock();
}
struct conn_info_head {
ipc::string name_;
msg_id_t cc_id_; // connection-info id
@ -103,27 +205,6 @@ struct conn_info_head {
*/
tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_;
struct simple_push {
template <std::size_t, std::size_t>
using elem_t = shm::id_t;
circ::u2_t wt_; // write index
constexpr circ::u2_t cursor() const noexcept {
return 0;
}
template <typename W, typename F, typename E>
bool push(W* /*wrapper*/, F&& f, E* elems) {
std::forward<F>(f)(&(elems[circ::index_of(wt_)]));
++ wt_;
return true;
}
};
circ::elem_array<simple_push, sizeof(shm::id_t)> msg_datas_;
conn_info_head(char const * name)
: name_ (name)
, cc_id_ ((cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed))
@ -140,32 +221,6 @@ struct conn_info_head {
auto& recv_cache() {
return *recv_cache_.create();
}
shm::id_t apply_storage(msg_id_t msg_id, std::size_t size) {
return shm::acquire(
("__ST_CONN__" + ipc::to_string(cc_id_) +
"__" + ipc::to_string(msg_id)).c_str(), size, shm::create);
}
static shm::id_t acquire_storage(msg_id_t cc_id, msg_id_t msg_id) {
return shm::acquire(
("__ST_CONN__" + ipc::to_string(cc_id) +
"__" + ipc::to_string(msg_id)).c_str(), 0, shm::open);
}
void store(shm::id_t dat) {
msg_datas_.push([dat](shm::id_t * id) {
(*id) = dat;
});
}
void clear_store() {
msg_datas_.push([](shm::id_t * id) {
if (*id == nullptr) return;
shm::remove(*id);
(*id) = nullptr;
});
}
};
template <typename W, typename F>
@ -284,15 +339,14 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
if (size > small_msg_limit) {
auto dat = info_of(h)->apply_storage(msg_id, size);
void * buf = shm::get_mem(dat, nullptr);
auto dat = apply_storage(size);
void * buf = dat.second;
if (buf != nullptr) {
std::memcpy(buf, data, size);
info_of(h)->store(dat);
return try_push(static_cast<int>(size) - static_cast<int>(data_length), nullptr, 0);
return try_push(static_cast<int>(size) - static_cast<int>(data_length), &(dat.first), 0);
}
// try using message fragment
ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
}
// push message fragment
int offset = 0;
@ -301,7 +355,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static_cast<byte_t const *>(data) + offset, data_length)) {
return false;
}
info_of(h)->clear_store();
}
// if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset;
@ -310,7 +363,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
return false;
}
info_of(h)->clear_store();
}
return true;
}
@ -375,16 +427,14 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
return make_cache(msg.data_, remain);
}
if (msg.storage_) {
auto dat = info_of(h)->acquire_storage(msg.conn_, msg.id_);
std::size_t dat_sz = 0;
void * buf = shm::get_mem(dat, &dat_sz);
if (buf != nullptr && remain <= dat_sz) {
return buff_t { buf, remain, [](void * dat, std::size_t) {
shm::release(dat);
}, dat };
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
void * buf = find_storage(buf_id, remain);
if (buf != nullptr) {
return buff_t { buf, remain, [](void* ptr, std::size_t size) {
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
}, reinterpret_cast<void*>(buf_id + 1) };
}
else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd, shm.size: %zd\n",
msg.id_, remain, dat_sz);
else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg.id_, remain);
}
// gc
if (rc.size() > 1024) {

View File

@ -353,6 +353,7 @@ void Unit::test_route() {
}
void Unit::test_route_rtt() {
// return;
test_stopwatch sw;
std::thread t1 {[&] {
@ -392,6 +393,7 @@ void Unit::test_route_rtt() {
}
void Unit::test_route_performance() {
// return;
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::route, 1, decltype(index)::value + 1, false>();
});
@ -399,6 +401,7 @@ void Unit::test_route_performance() {
}
void Unit::test_channel() {
// return;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {
@ -423,6 +426,7 @@ void Unit::test_channel() {
}
void Unit::test_channel_rtt() {
// return;
test_stopwatch sw;
std::thread t1 {[&] {
@ -465,6 +469,7 @@ void Unit::test_channel_rtt() {
}
void Unit::test_channel_performance() {
// return;
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::channel, 1, decltype(index)::value + 1, false>();
});