remove is_fixed (TBD)

This commit is contained in:
mutouyun 2019-01-24 17:55:23 +08:00
parent 949d338f31
commit 5c9a8e0311
14 changed files with 232 additions and 110 deletions

View File

@ -32,9 +32,9 @@ HEADERS += \
../src/platform/waiter.h \
../src/circ/elem_def.h \
../src/circ/elem_array.h \
../src/circ/elem_chan.h \
../src/prod_cons.h \
../src/policy.h
../src/policy.h \
../src/queue.h
SOURCES += \
../src/shm.cpp \

View File

@ -46,7 +46,8 @@ using uint_t = typename uint<N>::type;
enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 16
data_length = 16,
name_length = 64
};
enum class relat { // multiplicity of the relationship

View File

@ -1,5 +1,8 @@
#pragma once
#include <new>
#include <utility>
#include "export.h"
#include "def.h"
@ -13,5 +16,88 @@ public:
static void free(void* p, std::size_t size);
};
////////////////////////////////////////////////////////////////
/// construct/destruct an object
////////////////////////////////////////////////////////////////
namespace detail {
template <typename T>
struct impl {
template <typename... P>
static T* construct(T* p, P&&... params) {
::new (p) T(std::forward<P>(params)...);
return p;
}
static void destruct(T* p) {
reinterpret_cast<T*>(p)->~T();
}
};
template <typename T, size_t N>
struct impl<T[N]> {
using type = T[N];
template <typename... P>
static type* construct(type* p, P&&... params) {
for (size_t i = 0; i < N; ++i) {
impl<T>::construct(&((*p)[i]), std::forward<P>(params)...);
}
return p;
}
static void destruct(type* p) {
for (size_t i = 0; i < N; ++i) {
impl<T>::destruct(&((*p)[i]));
}
}
};
} // namespace detail
template <typename T, typename... P>
T* construct(T* p, P&&... params) {
return detail::impl<T>::construct(p, std::forward<P>(params)...);
}
template <typename T, typename... P>
T* construct(void* p, P&&... params) {
return construct(static_cast<T*>(p), std::forward<P>(params)...);
}
template <typename T>
void destruct(T* p) {
return detail::impl<T>::destruct(p);
}
template <typename T>
void destruct(void* p) {
destruct(static_cast<T*>(p));
}
////////////////////////////////////////////////////////////////
/// general alloc/free
////////////////////////////////////////////////////////////////
inline void* alloc(std::size_t size) {
return pool_alloc::alloc(size);
}
template <typename T, typename... P>
T* alloc(P&&... params) {
return construct<T>(pool_alloc::alloc(sizeof(T)), std::forward<P>(params)...);
}
inline void free(void* p, std::size_t size) {
pool_alloc::free(p, size);
}
template <typename T>
void free(T* p) {
destruct(p);
pool_alloc::free(p, sizeof(T));
}
} // namespace mem
} // namespace ipc

View File

@ -113,6 +113,7 @@ public:
class rw_lock {
using lc_ui_t = unsigned;
std::atomic<lc_ui_t> lc_ { 0 };
enum : lc_ui_t {

View File

@ -1,37 +1,31 @@
#pragma once
#include <atomic>
#include <thread>
#include <cstring>
#include <limits>
#include <utility>
#include <type_traits>
#include "def.h"
#include "rw_lock.h"
#include "circ/elem_def.h"
#include "platform/detail.h"
namespace ipc {
namespace circ {
////////////////////////////////////////////////////////////////
/// element-array implementation
////////////////////////////////////////////////////////////////
namespace detail {
template <typename Policy, std::size_t DataSize>
class elem_array : public ipc::circ::conn_head {
class elem_array {
public:
using base_t = ipc::circ::conn_head;
using policy_t = Policy;
using cursor_t = decltype(std::declval<policy_t>().cursor());
#if __cplusplus >= 201703L
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>>;
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>>;
#else /*__cplusplus < 201703L*/
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>::value>;
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>::value>;
#endif/*__cplusplus < 201703L*/
enum : std::size_t {
head_size = sizeof(base_t) + sizeof(policy_t),
data_size = DataSize,
elem_max = (std::numeric_limits<uint_t<8>>::max)() + 1, // default is 255 + 1
elem_size = sizeof(elem_t),
@ -43,17 +37,57 @@ private:
elem_t block_[elem_max];
public:
auto cursor() const noexcept { return head_.cursor(); }
cursor_t cursor() const noexcept {
return head_.cursor();
}
template <typename F>
bool push(F&& f) {
return head_.push(this, std::forward<F>(f), block_);
template <typename E, typename F>
bool push(E* elems, F&& f) {
return head_.push(elems, std::forward<F>(f), block_);
}
template <typename E, typename F>
bool pop(E* elems, cursor_t* cur, F&& f) {
if (cur == nullptr) return false;
return head_.pop(elems, *cur, std::forward<F>(f), block_);
}
};
} // namespace detail
template <typename Policy, std::size_t DataSize>
class elem_array : public ipc::circ::conn_head {
public:
using base_t = ipc::circ::conn_head;
using array_t = detail::elem_array<Policy, DataSize>;
using policy_t = typename array_t::policy_t;
using cursor_t = typename array_t::cursor_t;
using elem_t = typename array_t::elem_t;
enum : std::size_t {
head_size = sizeof(base_t) + sizeof(policy_t),
data_size = array_t::data_size,
elem_max = array_t::elem_max,
elem_size = array_t::elem_size,
block_size = array_t::block_size
};
private:
array_t array_;
public:
cursor_t cursor() const noexcept {
return array_.cursor();
}
template <typename F>
bool pop(decltype(std::declval<policy_t>().cursor())* cur, F&& f) {
if (cur == nullptr) return false;
return head_.pop(this, *cur, std::forward<F>(f), block_);
bool push(F&& f) {
return array_.push(this, std::forward<F>(f));
}
template <typename F>
bool pop(cursor_t* cur, F&& f) {
return array_.pop(this, cur, std::forward<F>(f));
}
};

View File

@ -1,11 +0,0 @@
#pragma once
#include "circ/elem_array.h"
namespace ipc {
namespace circ {
} // namespace circ
} // namespace ipc

View File

@ -38,7 +38,7 @@ constexpr u1_t index_of(u2_t c) noexcept {
class conn_head {
ipc::detail::waiter cc_waiter_, waiter_;
std::atomic<std::size_t> cc_ { 0 }; // connection counter
std::atomic<std::size_t> cc_; // connection counter
public:
conn_head() = default;

View File

@ -9,9 +9,10 @@
#include "def.h"
#include "shm.h"
#include "tls_pointer.h"
#include "pool_alloc.h"
#include "queue.h"
#include "policy.h"
#include "memory/resource.h"
namespace {
@ -25,7 +26,7 @@ inline auto acc_of_msg() {
return static_cast<std::atomic<msg_id_t>*>(g_shm.get());
}
template <typename Policy, typename IsFixed = typename Policy::is_fixed>
template <typename Policy>
struct detail_impl {
#pragma pack(1)
@ -39,10 +40,6 @@ struct msg_t {
using queue_t = ipc::queue<msg_t, Policy>;
struct shm_info_t {
typename queue_t::elems_t elems_; // the elements in shm
};
constexpr static void* head_of(queue_t* que) {
return static_cast<void*>(que->elems());
}
@ -52,9 +49,9 @@ constexpr static queue_t* queue_of(ipc::handle_t h) {
}
static buff_t make_cache(void const * data, std::size_t size) {
auto ptr = mem::sync_pool_alloc::alloc(size);
auto ptr = mem::alloc(size);
std::memcpy(ptr, data, size);
return { ptr, size, mem::sync_pool_alloc::free };
return { ptr, size, mem::free };
}
struct cache_t {
@ -93,11 +90,7 @@ static auto& queues_cache() {
/* API implementations */
static ipc::handle_t connect(char const * name) {
auto mem = shm::acquire(name, sizeof(shm_info_t));
if (mem == nullptr) {
return nullptr;
}
return new queue_t { &(static_cast<shm_info_t*>(mem)->elems_), name };
return mem::alloc<queue_t>(name);
}
static void disconnect(ipc::handle_t h) {
@ -106,8 +99,7 @@ static void disconnect(ipc::handle_t h) {
return;
}
que->disconnect(); // needn't to detach, cause it will be deleted soon.
shm::release(head_of(que), sizeof(shm_info_t));
delete que;
mem::free(que);
}
static std::size_t recv_count(ipc::handle_t h) {
@ -131,7 +123,7 @@ static void clear_recv(ipc::handle_t h) {
if (head == nullptr) {
return;
}
std::memset(head, 0, sizeof(shm_info_t));
std::memset(head, 0, sizeof(queue_t::elems_t));
}
static void clear_recv(char const * name) {

View File

@ -2,7 +2,6 @@
#include <limits>
#include <new>
#include <mutex>
#include <tuple>
#include <map>
#include <vector>

View File

@ -2,6 +2,7 @@
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <type_traits>
#include "def.h"
@ -21,6 +22,7 @@ namespace detail {
using std::unique_ptr;
using std::unique_lock;
using std::shared_lock;
#else /*__cplusplus < 201703L*/
@ -39,6 +41,12 @@ constexpr auto unique_lock(T&& lc) {
return std::unique_lock<std::decay_t<T>> { std::forward<T>(lc) };
}
// deduction guides for std::shared_lock
template <typename T>
constexpr auto shared_lock(T&& lc) {
return std::shared_lock<std::decay_t<T>> { std::forward<T>(lc) };
}
#endif/*__cplusplus < 201703L*/
template <typename F, typename D>

View File

@ -16,8 +16,6 @@ struct choose;
template <typename Flag>
struct choose<circ::elem_array, Flag> {
using is_fixed = std::true_type;
template <std::size_t DataSize>
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize>;
};

View File

@ -20,8 +20,8 @@ struct prod_cons_impl;
template <>
struct prod_cons_impl<prod_cons<relat::single, relat::single, trans::unicast>> {
std::atomic<circ::u2_t> rd_ { 0 }; // read index
std::atomic<circ::u2_t> wt_ { 0 }; // write index
std::atomic<circ::u2_t> rd_; // read index
std::atomic<circ::u2_t> wt_; // write index
#if __cplusplus >= 201703L
template <std::size_t DataSize>
@ -89,7 +89,7 @@ template <>
struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::unicast>>
: prod_cons_impl<prod_cons<relat::single, relat::multi, trans::unicast>> {
std::atomic<circ::u2_t> ct_ { 0 }; // commit index
std::atomic<circ::u2_t> ct_; // commit index
template <typename E, typename F, typename EB>
bool push(E* /*elems*/, F&& f, EB* elem_start) {
@ -119,7 +119,7 @@ struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::unicast>>
template <>
struct prod_cons_impl<prod_cons<relat::single, relat::multi, trans::broadcast>> {
std::atomic<circ::u2_t> wt_ { 0 }; // write index
std::atomic<circ::u2_t> wt_; // write index
#if __cplusplus >= 201703L
template <std::size_t DataSize>
@ -183,7 +183,7 @@ template <>
struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::broadcast>>
: prod_cons_impl<prod_cons<relat::single, relat::multi, trans::broadcast>> {
std::atomic<circ::u2_t> ct_ { 0 }; // commit index
std::atomic<circ::u2_t> ct_; // commit index
template <typename E, typename F, typename EB>
bool push(E* elems, F&& f, EB* elem_start) {

View File

@ -9,8 +9,10 @@
#include <thread>
#include <chrono>
#include <string>
#include <cassert>
#include "def.h"
#include "shm.h"
#include "rw_lock.h"
#include "platform/waiter.h"
@ -20,11 +22,29 @@ namespace detail {
class queue_waiter {
protected:
ipc::detail::waiter_impl waiter_, cc_waiter_;
std::atomic_bool connected_ { false };
ipc::detail::waiter_impl waiter_;
ipc::detail::waiter_impl cc_waiter_;
bool connected_ = false;
bool dismiss_ = true;
template <typename Elems>
void open(Elems* elems, char const * name) {
Elems* open(char const * name) {
auto elems = static_cast<Elems*>(shm::acquire(name, sizeof(Elems)));
if (elems == nullptr) {
return nullptr;
}
dismiss_ = false;
return elems;
}
template <typename Elems>
void open(Elems*(& elems), char const * name) {
assert(name != nullptr && name[0] != '\0');
if (elems == nullptr) {
elems = open<Elems>(name);
}
assert(elems != nullptr);
waiter_.attach(&(elems->waiter()));
waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str());
cc_waiter_.attach(&(elems->conn_waiter()));
@ -38,22 +58,32 @@ protected:
cc_waiter_.attach(nullptr);
}
template <typename Elems>
void close(Elems* elems) {
if (!dismiss_ && (elems != nullptr)) {
shm::release(elems, sizeof(Elems));
}
dismiss_ = true;
close();
}
public:
queue_waiter() = default;
queue_waiter(const queue_waiter&) = delete;
queue_waiter& operator=(const queue_waiter&) = delete;
bool connected() const noexcept {
return connected_.load(std::memory_order_acquire);
return connected_;
}
template <typename Elems>
std::size_t connect(Elems* elems) {
if (elems == nullptr) return invalid_value;
if (connected_.exchange(true, std::memory_order_acq_rel)) {
if (connected_) {
// if it's already connected, just return an error count
return invalid_value;
}
connected_ = true;
auto ret = elems->connect();
cc_waiter_.broadcast();
return ret;
@ -62,10 +92,11 @@ public:
template <typename Elems>
std::size_t disconnect(Elems* elems) {
if (elems == nullptr) return invalid_value;
if (!connected_.exchange(false, std::memory_order_acq_rel)) {
if (!connected_) {
// if it's already disconnected, just return an error count
return invalid_value;
}
connected_ = false;
auto ret = elems->disconnect();
cc_waiter_.broadcast();
return ret;
@ -98,11 +129,20 @@ public:
queue_base() = default;
explicit queue_base(char const * name)
: queue_base() {
attach(nullptr, name);
}
explicit queue_base(elems_t* els, char const * name = nullptr)
: queue_base() {
attach(els, name);
}
/* not virtual */ ~queue_base(void) {
base_t::close(elems_);
}
constexpr elems_t * elems() const noexcept {
return elems_;
}
@ -123,33 +163,39 @@ public:
return base_t::wait_for_connect(elems_, count);
}
bool valid() const noexcept {
return elems_ != nullptr;
}
bool empty() const noexcept {
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
}
elems_t* attach(elems_t* els, char const * name = nullptr) noexcept {
if (els == nullptr) return nullptr;
auto old = elems_;
elems_ = els;
if (name == nullptr) {
base_t::close();
if (name == nullptr || name[0] == '\0') {
base_t::close(old);
}
else base_t::open(elems_, name);
cursor_ = elems_->cursor();
if (elems_ != nullptr) {
cursor_ = elems_->cursor();
}
return old;
}
elems_t* detach() noexcept {
if (elems_ == nullptr) return nullptr;
base_t::close<elems_t>(nullptr); // not release shm
auto old = elems_;
elems_ = nullptr;
return old;
}
template <typename T, typename F, typename... P>
auto push(F&& f, P&&... params) {
template <typename T, typename... P>
auto push(P&&... params) {
if (elems_ == nullptr) return false;
if (std::forward<F>(f)([&](void* p) {
if (elems_->push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
})) {
this->waiter_.broadcast();
@ -175,45 +221,11 @@ public:
}
};
template <typename Elems, typename IsFixed>
class queue : public queue_base<Elems> {
using base_t = queue_base<Elems>;
public:
using is_fixed = IsFixed;
using base_t::base_t;
template <typename T, typename... P>
auto push(P&&... params) {
return base_t::template push<T>([this](auto&& f) {
return this->elems_->push(std::forward<decltype(f)>(f));
}, std::forward<P>(params)...);
}
};
template <typename Elems>
class queue<Elems, std::false_type> : public queue_base<Elems> {
using base_t = queue_base<Elems>;
public:
using is_fixed = std::false_type;
using base_t::base_t;
template <typename T, typename... P>
auto push(P&&... params) {
return base_t::template push<T>([this](auto&& f) {
return this->elems_->template push<sizeof(T), alignof(T)>(std::forward<decltype(f)>(f));
}, std::forward<P>(params)...);
}
};
} // namespace detail
template <typename T, typename Policy>
class queue : public detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed> {
using base_t = detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed>;
class queue : public detail::queue_base<typename Policy::template elems_t<sizeof(T)>> {
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T)>>;
public:
using base_t::base_t;

View File

@ -176,7 +176,7 @@ struct test_cq<ipc::queue<T...>> {
ca_t* ca_;
test_cq(void*) : ca_(reinterpret_cast<ca_t*>(cq__)) {
::new (ca_) ca_t;
std::memset(ca_, 0, sizeof(ca_t));
}
cn_t* connect() {
@ -243,6 +243,7 @@ constexpr int LoopCount = 1000000;
void Unit::initTestCase() {
TestSuite::initTestCase();
cq__ = new cq_t;
std::memset(cq__, 0, sizeof(cq_t));
}
void Unit::cleanupTestCase() {
@ -378,7 +379,8 @@ void Unit::test_queue() {
QCOMPARE(queue.pop(), msg_t{});
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__));
auto cq = ::new (cq__) decltype(queue)::elems_t;
std::memset(cq__, 0, sizeof(decltype(queue)::elems_t));
auto cq = reinterpret_cast<decltype(queue)::elems_t*>(cq__);
queue.attach(cq);
QVERIFY(queue.detach() != nullptr);