prepare multi routes implementation

This commit is contained in:
mutouyun 2019-01-25 18:28:39 +08:00
parent 7461dc88ed
commit c3f66d47bb
7 changed files with 263 additions and 33 deletions

View File

@ -33,14 +33,16 @@ HEADERS += \
../src/prod_cons.h \ ../src/prod_cons.h \
../src/policy.h \ ../src/policy.h \
../src/queue.h \ ../src/queue.h \
../src/log.h ../src/log.h \
../src/id_pool.h
SOURCES += \ SOURCES += \
../src/shm.cpp \ ../src/shm.cpp \
../src/ipc.cpp \ ../src/ipc.cpp \
../src/pool_alloc.cpp \ ../src/pool_alloc.cpp \
../src/buffer.cpp \ ../src/buffer.cpp \
../src/waiter.cpp ../src/waiter.cpp \
../src/channel.cpp
unix { unix {

View File

@ -9,24 +9,6 @@
namespace ipc { namespace ipc {
// pre-defined
#ifdef IPC_UNUSED_
# error "IPC_UNUSED_ has been defined."
#endif
#if __cplusplus >= 201703L
# define IPC_UNUSED_ [[maybe_unused]]
#else /*__cplusplus < 201703L*/
#if defined(_MSC_VER)
# define IPC_UNUSED_ __pragma(warning(suppress: 4100 4101 4189))
#elif defined(__GNUC__)
# define IPC_UNUSED_ __attribute__((__unused__))
#else
# define IPC_UNUSED_
#endif
#endif/*__cplusplus < 201703L*/
// types // types
using byte_t = std::uint8_t; using byte_t = std::uint8_t;
@ -64,6 +46,9 @@ enum class trans { // transmission
template <relat Rp, relat Rc, trans Ts> template <relat Rp, relat Rc, trans Ts>
struct prod_cons {}; struct prod_cons {};
// implement with multi routes
struct prod_cons_routes {};
// concept helpers // concept helpers
template <bool Cond, typename R> template <bool Cond, typename R>

View File

@ -25,9 +25,23 @@ struct IPC_EXPORT channel_detail {
static buff_t recv(handle_t h); static buff_t recv(handle_t h);
}; };
template <typename Detail> template <>
struct IPC_EXPORT channel_detail<prod_cons_routes> {
static handle_t connect (char const * name);
static void disconnect(handle_t h);
static std::size_t recv_count(handle_t h);
static bool wait_for_recv(handle_t h, std::size_t r_count);
static bool send(handle_t h, void const * data, std::size_t size);
static buff_t recv(handle_t h);
};
template <typename Flag>
class channel_impl { class channel_impl {
private: private:
using detail_t = channel_detail<Flag>;
handle_t h_ = nullptr; handle_t h_ = nullptr;
std::string n_; std::string n_;
@ -75,23 +89,23 @@ public:
bool connect(char const * name) { bool connect(char const * name) {
if (name == nullptr || name[0] == '\0') return false; if (name == nullptr || name[0] == '\0') return false;
this->disconnect(); this->disconnect();
h_ = Detail::connect((n_ = name).c_str()); h_ = detail_t::connect((n_ = name).c_str());
return valid(); return valid();
} }
void disconnect() { void disconnect() {
if (!valid()) return; if (!valid()) return;
Detail::disconnect(h_); detail_t::disconnect(h_);
h_ = nullptr; h_ = nullptr;
n_.clear(); n_.clear();
} }
std::size_t recv_count() const { std::size_t recv_count() const {
return Detail::recv_count(h_); return detail_t::recv_count(h_);
} }
bool wait_for_recv(std::size_t r_count) const { bool wait_for_recv(std::size_t r_count) const {
return Detail::wait_for_recv(h_, r_count); return detail_t::wait_for_recv(h_, r_count);
} }
static bool wait_for_recv(char const * name, std::size_t r_count) { static bool wait_for_recv(char const * name, std::size_t r_count) {
@ -99,7 +113,7 @@ public:
} }
bool send(void const * data, std::size_t size) { bool send(void const * data, std::size_t size) {
return Detail::send(h_, data, size); return detail_t::send(h_, data, size);
} }
bool send(buff_t const & buff) { bool send(buff_t const & buff) {
@ -111,7 +125,7 @@ public:
} }
buff_t recv() { buff_t recv() {
return Detail::recv(h_); return detail_t::recv(h_);
} }
}; };
@ -125,9 +139,8 @@ public:
* A route could only be used in 1 to N * A route could only be used in 1 to N
* (one producer/server/sender to multi consumers/clients/receivers) * (one producer/server/sender to multi consumers/clients/receivers)
*/ */
using route = channel_impl<channel_detail<
ipc::prod_cons<relat::single, relat::multi, trans::broadcast> using route = channel_impl<ipc::prod_cons<relat::single, relat::multi, trans::broadcast>>;
>>;
/* /*
* class channel * class channel
@ -137,8 +150,6 @@ using route = channel_impl<channel_detail<
* would receive your sent messages. * would receive your sent messages.
*/ */
using channel = channel_impl<channel_detail< using channel = channel_impl<ipc::prod_cons<relat::multi, relat::multi, trans::broadcast>>;
ipc::prod_cons<relat::multi, relat::multi, trans::broadcast>
>>;
} // namespace ipc } // namespace ipc

114
src/channel.cpp Normal file
View File

@ -0,0 +1,114 @@
#include <array>
#include <string>
#include "ipc.h"
#include "shm.h"
#include "rw_lock.h"
#include "id_pool.h"
#include "platform/detail.h"
namespace {
using namespace ipc;
struct ch_info_t {
rw_lock lc_;
id_pool ch_acc_; // only support 255 channels with one name
};
struct ch_multi_routes {
shm::handle h_;
route r_;
std::size_t id_;
bool marked_ = false;
std::array<route, id_pool::max_count> rts_;
ch_info_t& info() {
return *static_cast<ch_info_t*>(h_.get());
}
auto& acc() {
return info().ch_acc_;
}
void mark_id() {
if (marked_) return;
marked_ = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_);
acc().mark_acquired(id_);
}
auto& sender() {
mark_id();
return r_;
}
bool valid() const {
return h_.valid() && r_.valid();
}
bool connect(char const * name) {
if (name == nullptr || name[0] == '\0') {
return false;
}
disconnect();
if (!h_.acquire((std::string { name } + "_").c_str(), sizeof(ch_info_t))) {
return false;
}
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_);
if (acc().invalid()) acc().init();
id_ = acc().acquire();
}
if (id_ == invalid_value) {
return false;
}
r_.connect((name + std::to_string(id_)).c_str());
return valid();
}
void disconnect() {
if (!valid()) return;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_);
acc().release(id_);
}
for (auto& rt : rts_) {
rt.disconnect();
}
r_.disconnect();
h_.release();
}
};
} // internal-linkage
namespace ipc {
ipc::handle_t channel_detail<prod_cons_routes>::connect(char const * /*name*/) {
return nullptr;
}
void channel_detail<prod_cons_routes>::disconnect(ipc::handle_t /*h*/) {
}
std::size_t channel_detail<prod_cons_routes>::recv_count(ipc::handle_t /*h*/) {
return 0;
}
bool channel_detail<prod_cons_routes>::wait_for_recv(ipc::handle_t /*h*/, std::size_t /*r_count*/) {
return false;
}
bool channel_detail<prod_cons_routes>::send(ipc::handle_t /*h*/, void const * /*data*/, std::size_t /*size*/) {
return false;
}
buff_t channel_detail<prod_cons_routes>::recv(ipc::handle_t /*h*/) {
return {};
}
} // namespace ipc

85
src/id_pool.h Normal file
View File

@ -0,0 +1,85 @@
#pragma once
#include <cstring>
#include "def.h"
namespace ipc {
class id_pool {
public:
enum : std::size_t {
max_count = (std::numeric_limits<uint_t<8>>::max)() // 255
};
private:
uint_t<8> acquir_ = 0;
uint_t<8> cursor_ = 0;
uint_t<8> next_[max_count] {};
public:
void init() {
acquir_ = max_count;
for (std::size_t i = 0; i < max_count;) {
i = next_[i] = static_cast<uint_t<8>>(i + 1);
}
}
bool invalid() const {
static id_pool inv;
return std::memcmp(this, &inv, sizeof(id_pool)) == 0;
}
bool empty() const {
return cursor_ == max_count;
}
std::size_t acquire() {
if (empty()) {
return invalid_value;
}
std::size_t id = cursor_;
cursor_ = next_[id]; // point to next
return id;
}
void mark_acquired(std::size_t id) {
next_[id] = acquir_;
acquir_ = static_cast<uint_t<8>>(id); // put it in acquired list
}
bool release(std::size_t id) {
if (acquir_ == max_count) return false;
if (acquir_ == id) {
acquir_ = next_[id]; // point to next
}
else {
auto a = next_[acquir_], l = acquir_;
while (1) {
if (a == max_count) {
return false; // found nothing
}
if (a == id) {
next_[l] = next_[a];
break;
}
l = a;
a = next_[a];
}
}
next_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back
return true;
}
template <typename F>
void for_acquired(F&& fr) {
auto a = acquir_;
while (a != max_count) {
fr(a);
a = next_[a];
}
}
};
} // namespace ipc

View File

@ -4,9 +4,41 @@
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <type_traits> #include <type_traits>
#include <tuple>
#include "def.h" #include "def.h"
// pre-defined
#ifdef IPC_UNUSED_
# error "IPC_UNUSED_ has been defined."
#endif
#if __cplusplus >= 201703L
# define IPC_UNUSED_ [[maybe_unused]]
#else /*__cplusplus < 201703L*/
#if defined(_MSC_VER)
# define IPC_UNUSED_ __pragma(warning(suppress: 4100 4101 4189))
#elif defined(__GNUC__)
# define IPC_UNUSED_ __attribute__((__unused__))
#else
# define IPC_UNUSED_
#endif
#endif/*__cplusplus < 201703L*/
#ifdef IPC_STBIND_
# error "IPC_STBIND_ has been defined."
#endif
#if __cplusplus >= 201703L
# define IPC_STBIND_(A, B, ...) auto [A, B] = __VA_ARGS__
#else /*__cplusplus < 201703L*/
# define IPC_STBIND_(A, B, ...) \
auto tp = __VA_ARGS__ \
auto A = std::get<0>(tp); \
auto B = std::get<1>(tp)
#endif/*__cplusplus < 201703L*/
#if __cplusplus >= 201703L #if __cplusplus >= 201703L
namespace std { namespace std {

View File

@ -16,6 +16,7 @@
#include "rw_lock.h" #include "rw_lock.h"
#include "platform/waiter_wrapper.h" #include "platform/waiter_wrapper.h"
#include "platform/detail.h"
namespace ipc { namespace ipc {
namespace detail { namespace detail {