From c3f66d47bbf008b06cde54bdd447e1bb3462d354 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 25 Jan 2019 18:28:39 +0800 Subject: [PATCH] prepare multi routes implementation --- build/ipc.pro | 6 ++- include/def.h | 21 ++------ include/ipc.h | 37 +++++++++----- src/channel.cpp | 114 ++++++++++++++++++++++++++++++++++++++++++ src/id_pool.h | 85 +++++++++++++++++++++++++++++++ src/platform/detail.h | 32 ++++++++++++ src/queue.h | 1 + 7 files changed, 263 insertions(+), 33 deletions(-) create mode 100644 src/channel.cpp create mode 100644 src/id_pool.h diff --git a/build/ipc.pro b/build/ipc.pro index 43e68c4..f9adb70 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -33,14 +33,16 @@ HEADERS += \ ../src/prod_cons.h \ ../src/policy.h \ ../src/queue.h \ - ../src/log.h + ../src/log.h \ + ../src/id_pool.h SOURCES += \ ../src/shm.cpp \ ../src/ipc.cpp \ ../src/pool_alloc.cpp \ ../src/buffer.cpp \ - ../src/waiter.cpp + ../src/waiter.cpp \ + ../src/channel.cpp unix { diff --git a/include/def.h b/include/def.h index 9571382..b9eb348 100644 --- a/include/def.h +++ b/include/def.h @@ -9,24 +9,6 @@ namespace ipc { -// pre-defined - -#ifdef IPC_UNUSED_ -# error "IPC_UNUSED_ has been defined." -#endif - -#if __cplusplus >= 201703L -# define IPC_UNUSED_ [[maybe_unused]] -#else /*__cplusplus < 201703L*/ -#if defined(_MSC_VER) -# define IPC_UNUSED_ __pragma(warning(suppress: 4100 4101 4189)) -#elif defined(__GNUC__) -# define IPC_UNUSED_ __attribute__((__unused__)) -#else -# define IPC_UNUSED_ -#endif -#endif/*__cplusplus < 201703L*/ - // types using byte_t = std::uint8_t; @@ -64,6 +46,9 @@ enum class trans { // transmission template struct prod_cons {}; +// implement with multi routes +struct prod_cons_routes {}; + // concept helpers template diff --git a/include/ipc.h b/include/ipc.h index 8cf0418..501b225 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -25,9 +25,23 @@ struct IPC_EXPORT channel_detail { static buff_t recv(handle_t h); }; -template +template <> +struct IPC_EXPORT channel_detail { + static handle_t connect (char const * name); + static void disconnect(handle_t h); + + static std::size_t recv_count(handle_t h); + static bool wait_for_recv(handle_t h, std::size_t r_count); + + static bool send(handle_t h, void const * data, std::size_t size); + static buff_t recv(handle_t h); +}; + +template class channel_impl { private: + using detail_t = channel_detail; + handle_t h_ = nullptr; std::string n_; @@ -75,23 +89,23 @@ public: bool connect(char const * name) { if (name == nullptr || name[0] == '\0') return false; this->disconnect(); - h_ = Detail::connect((n_ = name).c_str()); + h_ = detail_t::connect((n_ = name).c_str()); return valid(); } void disconnect() { if (!valid()) return; - Detail::disconnect(h_); + detail_t::disconnect(h_); h_ = nullptr; n_.clear(); } std::size_t recv_count() const { - return Detail::recv_count(h_); + return detail_t::recv_count(h_); } bool wait_for_recv(std::size_t r_count) const { - return Detail::wait_for_recv(h_, r_count); + return detail_t::wait_for_recv(h_, r_count); } static bool wait_for_recv(char const * name, std::size_t r_count) { @@ -99,7 +113,7 @@ public: } bool send(void const * data, std::size_t size) { - return Detail::send(h_, data, size); + return detail_t::send(h_, data, size); } bool send(buff_t const & buff) { @@ -111,7 +125,7 @@ public: } buff_t recv() { - return Detail::recv(h_); + return detail_t::recv(h_); } }; @@ -125,9 +139,8 @@ public: * A route could only be used in 1 to N * (one producer/server/sender to multi consumers/clients/receivers) */ -using route = channel_impl ->>; + +using route = channel_impl>; /* * class channel @@ -137,8 +150,6 @@ using route = channel_impl ->>; +using channel = channel_impl>; } // namespace ipc diff --git a/src/channel.cpp b/src/channel.cpp new file mode 100644 index 0000000..a3a1e58 --- /dev/null +++ b/src/channel.cpp @@ -0,0 +1,114 @@ +#include +#include + +#include "ipc.h" +#include "shm.h" +#include "rw_lock.h" +#include "id_pool.h" + +#include "platform/detail.h" + +namespace { + +using namespace ipc; + +struct ch_info_t { + rw_lock lc_; + id_pool ch_acc_; // only support 255 channels with one name +}; + +struct ch_multi_routes { + shm::handle h_; + route r_; + + std::size_t id_; + bool marked_ = false; + + std::array rts_; + + ch_info_t& info() { + return *static_cast(h_.get()); + } + + auto& acc() { + return info().ch_acc_; + } + + void mark_id() { + if (marked_) return; + marked_ = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_); + acc().mark_acquired(id_); + } + + auto& sender() { + mark_id(); + return r_; + } + + bool valid() const { + return h_.valid() && r_.valid(); + } + + bool connect(char const * name) { + if (name == nullptr || name[0] == '\0') { + return false; + } + disconnect(); + if (!h_.acquire((std::string { name } + "_").c_str(), sizeof(ch_info_t))) { + return false; + } + { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_); + if (acc().invalid()) acc().init(); + id_ = acc().acquire(); + } + if (id_ == invalid_value) { + return false; + } + r_.connect((name + std::to_string(id_)).c_str()); + return valid(); + } + + void disconnect() { + if (!valid()) return; + { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(info().lc_); + acc().release(id_); + } + for (auto& rt : rts_) { + rt.disconnect(); + } + r_.disconnect(); + h_.release(); + } +}; + +} // internal-linkage + +namespace ipc { + +ipc::handle_t channel_detail::connect(char const * /*name*/) { + return nullptr; +} + +void channel_detail::disconnect(ipc::handle_t /*h*/) { +} + +std::size_t channel_detail::recv_count(ipc::handle_t /*h*/) { + return 0; +} + +bool channel_detail::wait_for_recv(ipc::handle_t /*h*/, std::size_t /*r_count*/) { + return false; +} + +bool channel_detail::send(ipc::handle_t /*h*/, void const * /*data*/, std::size_t /*size*/) { + return false; +} + +buff_t channel_detail::recv(ipc::handle_t /*h*/) { + return {}; +} + +} // namespace ipc diff --git a/src/id_pool.h b/src/id_pool.h new file mode 100644 index 0000000..39fb94b --- /dev/null +++ b/src/id_pool.h @@ -0,0 +1,85 @@ +#pragma once + +#include + +#include "def.h" + +namespace ipc { + +class id_pool { +public: + enum : std::size_t { + max_count = (std::numeric_limits>::max)() // 255 + }; + +private: + uint_t<8> acquir_ = 0; + uint_t<8> cursor_ = 0; + uint_t<8> next_[max_count] {}; + +public: + void init() { + acquir_ = max_count; + for (std::size_t i = 0; i < max_count;) { + i = next_[i] = static_cast>(i + 1); + } + } + + bool invalid() const { + static id_pool inv; + return std::memcmp(this, &inv, sizeof(id_pool)) == 0; + } + + bool empty() const { + return cursor_ == max_count; + } + + std::size_t acquire() { + if (empty()) { + return invalid_value; + } + std::size_t id = cursor_; + cursor_ = next_[id]; // point to next + return id; + } + + void mark_acquired(std::size_t id) { + next_[id] = acquir_; + acquir_ = static_cast>(id); // put it in acquired list + } + + bool release(std::size_t id) { + if (acquir_ == max_count) return false; + if (acquir_ == id) { + acquir_ = next_[id]; // point to next + } + else { + auto a = next_[acquir_], l = acquir_; + while (1) { + if (a == max_count) { + return false; // found nothing + } + if (a == id) { + next_[l] = next_[a]; + break; + } + l = a; + a = next_[a]; + } + } + next_[id] = cursor_; + cursor_ = static_cast>(id); // put it back + return true; + } + + template + void for_acquired(F&& fr) { + auto a = acquir_; + while (a != max_count) { + fr(a); + a = next_[a]; + } + } +}; + +} // namespace ipc diff --git a/src/platform/detail.h b/src/platform/detail.h index 3f82d61..8ad3e00 100644 --- a/src/platform/detail.h +++ b/src/platform/detail.h @@ -4,9 +4,41 @@ #include #include #include +#include #include "def.h" +// pre-defined + +#ifdef IPC_UNUSED_ +# error "IPC_UNUSED_ has been defined." +#endif + +#if __cplusplus >= 201703L +# define IPC_UNUSED_ [[maybe_unused]] +#else /*__cplusplus < 201703L*/ +#if defined(_MSC_VER) +# define IPC_UNUSED_ __pragma(warning(suppress: 4100 4101 4189)) +#elif defined(__GNUC__) +# define IPC_UNUSED_ __attribute__((__unused__)) +#else +# define IPC_UNUSED_ +#endif +#endif/*__cplusplus < 201703L*/ + +#ifdef IPC_STBIND_ +# error "IPC_STBIND_ has been defined." +#endif + +#if __cplusplus >= 201703L +# define IPC_STBIND_(A, B, ...) auto [A, B] = __VA_ARGS__ +#else /*__cplusplus < 201703L*/ +# define IPC_STBIND_(A, B, ...) \ + auto tp = __VA_ARGS__ \ + auto A = std::get<0>(tp); \ + auto B = std::get<1>(tp) +#endif/*__cplusplus < 201703L*/ + #if __cplusplus >= 201703L namespace std { diff --git a/src/queue.h b/src/queue.h index 7042769..2ad17a8 100644 --- a/src/queue.h +++ b/src/queue.h @@ -16,6 +16,7 @@ #include "rw_lock.h" #include "platform/waiter_wrapper.h" +#include "platform/detail.h" namespace ipc { namespace detail {