Merge pull request #38 from mutouyun/issue-17

Issue 17
This commit is contained in:
木头云 2021-05-08 10:55:49 +08:00 committed by GitHub
commit 1a39118c1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 88 deletions

View File

@ -29,6 +29,7 @@ enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(), invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64, data_length = 64,
large_msg_limit = data_length, large_msg_limit = data_length,
large_msg_align = 512,
large_msg_cache = 32, large_msg_cache = 32,
default_timeout = 100 // ms default_timeout = 100 // ms
}; };

View File

@ -21,6 +21,7 @@
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/id_pool.h" #include "libipc/utility/id_pool.h"
#include "libipc/utility/utility.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
@ -90,77 +91,69 @@ auto cc_acc() {
return static_cast<acc_t*>(acc_h.get()); return static_cast<acc_t*>(acc_h.get());
} }
auto& cls_storages() { struct chunk_info_t {
struct cls_t {
ipc::shm::handle id_info_;
std::array<ipc::shm::handle, ipc::id_pool<>::max_count> mems_;
};
static ipc::unordered_map<std::size_t, cls_t> cls_s;
return cls_s;
}
auto& cls_lock() {
static ipc::spin_lock cls_l;
return cls_l;
}
struct cls_info_t {
ipc::id_pool<> pool_; ipc::id_pool<> pool_;
ipc::spin_lock lock_; ipc::spin_lock lock_;
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t));
}
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size);
}
ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept {
if (id == ipc::invalid_value) return nullptr;
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunks_elem_size(chunk_size) * id);
}
}; };
constexpr std::size_t calc_cls_size(std::size_t size) noexcept { auto& chunk_storages() {
return (((size - 1) / ipc::large_msg_limit) + 1) * ipc::large_msg_limit; class chunk_t {
ipc::shm::handle handle_;
public:
chunk_info_t *get_info(std::size_t chunk_size) {
if (!handle_.valid() &&
!handle_.acquire( ("__CHUNK_INFO__" + ipc::to_string(chunk_size)).c_str(),
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size);
return nullptr;
}
auto info = static_cast<chunk_info_t*>(handle_.get());
if (info == nullptr) {
ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size);
return nullptr;
}
return info;
}
};
static ipc::unordered_map<std::size_t, chunk_t> chunk_s;
return chunk_s;
} }
auto& cls_storage(std::size_t cls_size) { auto& chunk_lock() {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(cls_lock()); static ipc::spin_lock chunk_l;
return cls_storages()[cls_size]; return chunk_l;
} }
template <typename T> constexpr std::size_t calc_chunk_size(std::size_t size) noexcept {
cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size) { return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align;
if (!cls_shm.id_info_.valid() &&
!cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) {
ipc::error("[%s] cls_shm.id_info_.acquire failed: cls_size = %zd\n", func, cls_size);
return nullptr;
}
auto info = static_cast<cls_info_t*>(cls_shm.id_info_.get());
if (info == nullptr) {
ipc::error("[%s] cls_shm.id_info_.get failed: cls_size = %zd\n", func, cls_size);
return nullptr;
}
return info;
} }
template <typename T> auto& chunk_storage(std::size_t chunk_size) {
ipc::byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock());
if (id == ipc::invalid_value) { return chunk_storages()[chunk_size];
return nullptr;
}
if (!cls_shm.mems_[id].valid() &&
!cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) +
"__" + ipc::to_string(id)).c_str(),
cls_size + sizeof(acc_t))) {
ipc::error("[%s] cls_shm.mems_[id].acquire failed: id = %zd, cls_size = %zd\n", func, id, cls_size);
return nullptr;
}
auto ptr = static_cast<ipc::byte_t *>(cls_shm.mems_[id].get());
if (ptr == nullptr) {
ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size);
return nullptr;
}
return ptr;
} }
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) { std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
if (conn_count == 0) return {}; if (conn_count == 0) return {};
std::size_t cls_size = calc_cls_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & cls_shm = cls_storage(cls_size); auto & chunk_shm = chunk_storage(chunk_size);
auto info = cls_storage_info("apply_storage", cls_shm, cls_size); auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return {}; if (info == nullptr) return {};
info->lock_.lock(); info->lock_.lock();
@ -169,20 +162,28 @@ std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t
auto id = info->pool_.acquire(); auto id = info->pool_.acquire();
info->lock_.unlock(); info->lock_.unlock();
auto ptr = cls_storage_mem("apply_storage", cls_shm, cls_size, id); auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) return {}; if (ptr == nullptr) return {};
reinterpret_cast<acc_t*>(ptr + cls_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release); reinterpret_cast<acc_t*>(ptr + chunk_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release);
return { id, ptr }; return { id, ptr };
} }
void* find_storage(std::size_t id, std::size_t size) { void *find_storage(std::size_t id, std::size_t size) {
std::size_t cls_size = calc_cls_size(size); if (id == ipc::invalid_value) {
auto & cls_shm = cls_storage(cls_size); ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size);
return nullptr;
}
auto ptr = cls_storage_mem("find_storage", cls_shm, cls_size, id); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return nullptr;
auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) return nullptr; if (ptr == nullptr) return nullptr;
if (reinterpret_cast<acc_t*>(ptr + cls_size)->load(std::memory_order_acquire) == 0) { if (reinterpret_cast<acc_t*>(ptr + chunk_size)->load(std::memory_order_acquire) == 0) {
ipc::error("[find_storage] cc test failed: id = %zd, cls_size = %zd\n", id, cls_size); ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size);
return nullptr; return nullptr;
} }
return ptr; return ptr;
@ -194,26 +195,22 @@ void recycle_storage(std::size_t id, std::size_t size) {
return; return;
} }
std::size_t cls_size = calc_cls_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & cls_shm = cls_storage(cls_size); auto & chunk_shm = chunk_storage(chunk_size);
if (!cls_shm.mems_[id].valid()) { auto info = chunk_shm.get_info(chunk_size);
ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size); if (info == nullptr) return;
return;
} auto ptr = info->at(chunk_size, id);
auto ptr = static_cast<ipc::byte_t*>(cls_shm.mems_[id].get());
if (ptr == nullptr) { if (ptr == nullptr) {
ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size); ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size);
return; return;
} }
if (reinterpret_cast<acc_t*>(ptr + cls_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { if (reinterpret_cast<acc_t*>(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
// not the last receiver, just return // not the last receiver, just return
return; return;
} }
auto info = cls_storage_info("recycle_storage", cls_shm, cls_size);
if (info == nullptr) return;
info->lock_.lock(); info->lock_.lock();
info->pool_.release(id); info->pool_.release(id);
info->lock_.unlock(); info->lock_.unlock();
@ -225,13 +222,16 @@ void clear_storage(std::size_t id, std::size_t size) {
return; return;
} }
std::size_t cls_size = calc_cls_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & cls_shm = cls_storage(cls_size); auto & chunk_shm = chunk_storage(chunk_size);
auto ptr = cls_storage_mem("clear_storage", cls_shm, cls_size, id); auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return;
auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) return; if (ptr == nullptr) return;
auto cc_flag = reinterpret_cast<acc_t*>(ptr + cls_size); auto cc_flag = reinterpret_cast<acc_t*>(ptr + chunk_size);
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cc_curr = cc_flag->load(std::memory_order_acquire); auto cc_curr = cc_flag->load(std::memory_order_acquire);
if (cc_curr == 0) return; // means this id has been cleared if (cc_curr == 0) return; // means this id has been cleared
@ -241,9 +241,6 @@ void clear_storage(std::size_t id, std::size_t size) {
ipc::yield(k); ipc::yield(k);
} }
auto info = cls_storage_info("clear_storage", cls_shm, cls_size);
if (info == nullptr) return;
info->lock_.lock(); info->lock_.lock();
info->pool_.release(id); info->pool_.release(id);
info->lock_.unlock(); info->lock_.unlock();

View File

@ -17,12 +17,13 @@
namespace ipc { namespace ipc {
namespace mem { namespace mem {
using async_pool_alloc = static_wrapper<variable_wrapper<async_wrapper< //using async_pool_alloc = static_wrapper<variable_wrapper<async_wrapper<
detail::fixed_alloc< // detail::fixed_alloc<
variable_alloc <sizeof(void*) * 1024 * 256>, // variable_alloc <sizeof(void*) * 1024 * 256>,
fixed_expand_policy<sizeof(void*) * 1024, sizeof(void*) * 1024 * 256> // fixed_expand_policy<sizeof(void*) * 1024, sizeof(void*) * 1024 * 256>
>, // >,
default_recycler >>>; // default_recycler >>>;
using async_pool_alloc = ipc::mem::static_alloc;
template <typename T> template <typename T>
using allocator = allocator_wrapper<T, async_pool_alloc>; using allocator = allocator_wrapper<T, async_pool_alloc>;

View File

@ -53,4 +53,9 @@ T horrible_cast(U val) {
return u.out; return u.out;
} }
IPC_CONSTEXPR_ std::size_t make_align(std::size_t align, std::size_t size) {
// align must be 2^n
return (size + align - 1) & ~(align - 1);
}
} // namespace ipc } // namespace ipc