This commit is contained in:
mutouyun 2019-01-23 11:44:26 +08:00
parent 61cc1fd863
commit 881b060d1f
6 changed files with 171 additions and 103 deletions

View File

@ -4,6 +4,7 @@
#include <thread> #include <thread>
#include <cstring> #include <cstring>
#include <utility> #include <utility>
#include <type_traits>
#include "def.h" #include "def.h"
#include "rw_lock.h" #include "rw_lock.h"
@ -299,7 +300,7 @@ public:
using base_t::cursor; using base_t::cursor;
template <typename F, typename... P> template <typename F, typename... P>
bool push(F&& f, P&&...) noexcept { bool push(F&& f) noexcept {
return base_t::push(this, std::forward<F>(f), block_); return base_t::push(this, std::forward<F>(f), block_);
} }
@ -312,6 +313,8 @@ public:
template <relat Rp, relat Rc, trans Ts> template <relat Rp, relat Rc, trans Ts>
struct prod_cons { struct prod_cons {
using is_fixed = std::true_type;
template <std::size_t DataSize> template <std::size_t DataSize>
using elems_t = elem_array<DataSize, prod_cons_circ<Rp, Rc, Ts>>; using elems_t = elem_array<DataSize, prod_cons_circ<Rp, Rc, Ts>>;
}; };

View File

@ -10,4 +10,9 @@
namespace ipc { namespace ipc {
namespace link {
} // namespace link
} // namespace ipc } // namespace ipc

View File

@ -7,14 +7,14 @@
#include "def.h" #include "def.h"
#include "buffer.h" #include "buffer.h"
#include "shm.h" #include "shm.h"
#include "queue.h" #include "elem_circ.h"
namespace ipc { namespace ipc {
using handle_t = void*; using handle_t = void*;
using buff_t = buffer; using buff_t = buffer;
template <template <typename...> class Queue, typename Policy> template <typename Policy>
struct IPC_EXPORT channel_detail { struct IPC_EXPORT channel_detail {
static handle_t connect (char const * name); static handle_t connect (char const * name);
static void disconnect(handle_t h); static void disconnect(handle_t h);
@ -138,7 +138,7 @@ public:
* (one producer/server/sender to multi consumers/clients/receivers) * (one producer/server/sender to multi consumers/clients/receivers)
*/ */
using route = channel_impl<channel_detail< using route = channel_impl<channel_detail<
ipc::queue, ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast> ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast>
>>; >>;
/* /*
@ -150,7 +150,7 @@ using route = channel_impl<channel_detail<
*/ */
using channel = channel_impl<channel_detail< using channel = channel_impl<channel_detail<
ipc::queue, ipc::circ::prod_cons<relat::multi, relat::multi, trans::broadcast> ipc::circ::prod_cons<relat::multi, relat::multi, trans::broadcast>
>>; >>;
} // namespace ipc } // namespace ipc

View File

@ -17,54 +17,101 @@
#include "platform/waiter.h" #include "platform/waiter.h"
namespace ipc { namespace ipc {
namespace detail {
template <typename T, class queue_waiter {
typename Policy = ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast>> protected:
class queue {
public:
using elems_t = typename Policy::template elems_t<sizeof(T)>;
using policy_t = typename elems_t::policy_t;
private:
elems_t* elems_ = nullptr;
ipc::detail::waiter_impl waiter_, cc_waiter_; ipc::detail::waiter_impl waiter_, cc_waiter_;
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
std::atomic_bool connected_ { false }; std::atomic_bool connected_ { false };
public: template <typename Elems>
queue() = default; void open(Elems* elems, char const * name) {
waiter_.attach(&(elems->waiter()));
explicit queue(elems_t* els, char const * name = nullptr) : queue() { waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str());
attach(els, name); cc_waiter_.attach(&(elems->conn_waiter()));
cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str());
} }
queue(const queue&) = delete; void close() {
queue& operator=(const queue&) = delete; waiter_.close();
waiter_.attach(nullptr);
cc_waiter_.close();
cc_waiter_.attach(nullptr);
}
public:
queue_waiter() = default;
queue_waiter(const queue_waiter&) = delete;
queue_waiter& operator=(const queue_waiter&) = delete;
bool connected() const noexcept {
return connected_.load(std::memory_order_acquire);
}
template <typename Elems>
std::size_t connect(Elems* elems) {
if (elems == nullptr) return invalid_value;
if (connected_.exchange(true, std::memory_order_acq_rel)) {
// if it's already connected, just return an error count
return invalid_value;
}
auto ret = elems->connect();
cc_waiter_.broadcast();
return ret;
}
template <typename Elems>
std::size_t disconnect(Elems* elems) {
if (elems == nullptr) return invalid_value;
if (!connected_.exchange(false, std::memory_order_acq_rel)) {
// if it's already disconnected, just return an error count
return invalid_value;
}
auto ret = elems->disconnect();
cc_waiter_.broadcast();
return ret;
}
template <typename Elems>
bool wait_for_connect(Elems* elems, std::size_t count) {
if (elems == nullptr) return false;
for (unsigned k = 0; elems->conn_count() < count;) {
ipc::sleep(k, [this] { return cc_waiter_.wait(); });
}
return true;
}
};
template <typename Elems>
class queue_base : public queue_waiter {
using base_t = queue_waiter;
public:
using elems_t = Elems;
using policy_t = typename elems_t::policy_t;
protected:
elems_t* elems_ = nullptr;
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
public:
using base_t::base_t;
explicit queue_base(elems_t* els, char const * name = nullptr)
: queue_base() {
attach(els, name);
}
constexpr elems_t * elems() const noexcept { constexpr elems_t * elems() const noexcept {
return elems_; return elems_;
} }
std::size_t connect() { std::size_t connect() {
if (elems_ == nullptr) return invalid_value; return base_t::connect(elems_);
if (connected_.exchange(true, std::memory_order_acq_rel)) {
// if it's already connected, just return an error count
return invalid_value;
}
auto ret = elems_->connect();
cc_waiter_.broadcast();
return ret;
} }
std::size_t disconnect() { std::size_t disconnect() {
if (elems_ == nullptr) return invalid_value; return base_t::disconnect(elems_);
if (!connected_.exchange(false, std::memory_order_acq_rel)) {
// if it's already disconnected, just return an error count
return invalid_value;
}
auto ret = elems_->disconnect();
cc_waiter_.broadcast();
return ret;
} }
std::size_t conn_count() const noexcept { std::size_t conn_count() const noexcept {
@ -72,37 +119,21 @@ public:
} }
bool wait_for_connect(std::size_t count) { bool wait_for_connect(std::size_t count) {
if (elems_ == nullptr) return false; return base_t::wait_for_connect(elems_, count);
for (unsigned k = 0; elems_->conn_count() < count;) {
ipc::sleep(k, [this] { return cc_waiter_.wait(); });
}
return true;
} }
bool empty() const noexcept { bool empty() const noexcept {
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor()); return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
} }
bool connected() const noexcept {
return connected_.load(std::memory_order_acquire);
}
elems_t* attach(elems_t* els, char const * name = nullptr) noexcept { elems_t* attach(elems_t* els, char const * name = nullptr) noexcept {
if (els == nullptr) return nullptr; if (els == nullptr) return nullptr;
auto old = elems_; auto old = elems_;
elems_ = els; elems_ = els;
if (name == nullptr) { if (name == nullptr) {
waiter_.close(); base_t::close();
waiter_.attach(nullptr);
cc_waiter_.close();
cc_waiter_.attach(nullptr);
}
else {
waiter_.attach(&(elems_->waiter()));
waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str());
cc_waiter_.attach(&(elems_->conn_waiter()));
cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str());
} }
else base_t::open(elems_, name);
cursor_ = elems_->cursor(); cursor_ = elems_->cursor();
return old; return old;
} }
@ -113,33 +144,64 @@ public:
elems_ = nullptr; elems_ = nullptr;
return old; return old;
} }
};
template <typename... P> template <typename Elems, typename IsFixed>
class queue : public queue_base<Elems> {
using base_t = queue_base<Elems>;
public:
using is_fixed = IsFixed;
using base_t::base_t;
template <typename T, typename... P>
auto push(P&&... params) { auto push(P&&... params) {
if (elems_ == nullptr) return false; if (this->elems_ == nullptr) return false;
if (elems_->push([&](void* p) { if (this->elems_->push([&](void* p) {
::new (p) T(std::forward<P>(params)...); ::new (p) T(std::forward<P>(params)...);
}, std::forward<P>(params)...)) { })) {
waiter_.broadcast(); this->waiter_.broadcast();
return true; return true;
} }
return false; return false;
} }
template <typename T>
T pop() { T pop() {
if (elems_ == nullptr) { if (this->elems_ == nullptr) {
return {}; return {};
} }
T item; T item;
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
if (elems_->pop(&cursor_, [&item](void* p) { if (this->elems_->pop(&this->cursor_, [&item](void* p) {
::new (&item) T(std::move(*static_cast<T*>(p))); ::new (&item) T(std::move(*static_cast<T*>(p)));
})) { })) {
return item; return item;
} }
ipc::sleep(k, [this] { return waiter_.wait(); }); ipc::sleep(k, [this] { return this->waiter_.wait(); });
} }
} }
}; };
} // namespace detail
template <typename T,
typename Policy = ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast>>
class queue : public detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed> {
using base_t = detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed>;
public:
using base_t::base_t;
template <typename... P>
auto push(P&&... params) {
return base_t::push<T>(std::forward<P>(params)...);
}
T pop() {
return base_t::pop<T>();
}
};
} // namespace ipc } // namespace ipc

View File

@ -97,7 +97,7 @@ inline void sleep(K& k) noexcept {
namespace ipc { namespace ipc {
class spin_lock { class spin_lock {
std::atomic<std::size_t> lc_ { 0 }; std::atomic<unsigned> lc_ { 0 };
public: public:
void lock(void) noexcept { void lock(void) noexcept {
@ -112,7 +112,7 @@ public:
}; };
class rw_lock { class rw_lock {
using lc_ui_t = std::size_t; using lc_ui_t = unsigned;
std::atomic<lc_ui_t> lc_ { 0 }; std::atomic<lc_ui_t> lc_ { 0 };
enum : lc_ui_t { enum : lc_ui_t {

View File

@ -9,6 +9,7 @@
#include "def.h" #include "def.h"
#include "shm.h" #include "shm.h"
#include "tls_pointer.h" #include "tls_pointer.h"
#include "queue.h"
#include "memory/resource.hpp" #include "memory/resource.hpp"
@ -23,11 +24,8 @@ inline auto acc_of_msg() {
return static_cast<std::atomic<msg_id_t>*>(g_shm.get()); return static_cast<std::atomic<msg_id_t>*>(g_shm.get());
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy, typename IsFixed = typename Policy::is_fixed>
struct detail_impl; struct detail_impl {
template <typename Policy>
struct detail_impl<ipc::queue, Policy> {
#pragma pack(1) #pragma pack(1)
struct msg_t { struct msg_t {
@ -217,56 +215,56 @@ static buff_t recv(ipc::handle_t h) {
} }
} }
}; // detail_impl<ipc::queue> }; // detail_impl<Policy>
} // internal-linkage } // internal-linkage
namespace ipc { namespace ipc {
template <template <typename...> class Queue, typename Policy> template <typename Policy>
ipc::handle_t channel_detail<Queue, Policy>::connect(char const * name) { ipc::handle_t channel_detail<Policy>::connect(char const * name) {
return detail_impl<Queue, Policy>::connect(name); return detail_impl<Policy>::connect(name);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
void channel_detail<Queue, Policy>::disconnect(ipc::handle_t h) { void channel_detail<Policy>::disconnect(ipc::handle_t h) {
detail_impl<Queue, Policy>::disconnect(h); detail_impl<Policy>::disconnect(h);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
std::size_t channel_detail<Queue, Policy>::recv_count(ipc::handle_t h) { std::size_t channel_detail<Policy>::recv_count(ipc::handle_t h) {
return detail_impl<Queue, Policy>::recv_count(h); return detail_impl<Policy>::recv_count(h);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
bool channel_detail<Queue, Policy>::wait_for_recv(ipc::handle_t h, std::size_t r_count) { bool channel_detail<Policy>::wait_for_recv(ipc::handle_t h, std::size_t r_count) {
return detail_impl<Queue, Policy>::wait_for_recv(h, r_count); return detail_impl<Policy>::wait_for_recv(h, r_count);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
void channel_detail<Queue, Policy>::clear_recv(ipc::handle_t h) { void channel_detail<Policy>::clear_recv(ipc::handle_t h) {
detail_impl<Queue, Policy>::clear_recv(h); detail_impl<Policy>::clear_recv(h);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
void channel_detail<Queue, Policy>::clear_recv(char const * name) { void channel_detail<Policy>::clear_recv(char const * name) {
detail_impl<Queue, Policy>::clear_recv(name); detail_impl<Policy>::clear_recv(name);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
bool channel_detail<Queue, Policy>::send(ipc::handle_t h, void const * data, std::size_t size) { bool channel_detail<Policy>::send(ipc::handle_t h, void const * data, std::size_t size) {
return detail_impl<Queue, Policy>::send(h, data, size); return detail_impl<Policy>::send(h, data, size);
} }
template <template <typename...> class Queue, typename Policy> template <typename Policy>
buff_t channel_detail<Queue, Policy>::recv(ipc::handle_t h) { buff_t channel_detail<Policy>::recv(ipc::handle_t h) {
return detail_impl<Queue, Policy>::recv(h); return detail_impl<Policy>::recv(h);
} }
template struct channel_detail<ipc::queue, ipc::circ::prod_cons<relat::single, relat::single, trans::unicast >>; template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::single, trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::circ::prod_cons<relat::single, relat::multi , trans::unicast >>; template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::multi , trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::circ::prod_cons<relat::multi , relat::multi , trans::unicast >>; template struct channel_detail<ipc::circ::prod_cons<relat::multi , relat::multi , trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::circ::prod_cons<relat::single, relat::multi , trans::broadcast>>; template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::multi , trans::broadcast>>;
template struct channel_detail<ipc::queue, ipc::circ::prod_cons<relat::multi , relat::multi , trans::broadcast>>; template struct channel_detail<ipc::circ::prod_cons<relat::multi , relat::multi , trans::broadcast>>;
} // namespace ipc } // namespace ipc