diff --git a/build/ipc.pro b/build/ipc.pro index a11bccc..3eb9ceb 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -17,9 +17,6 @@ HEADERS += \ ../include/export.h \ ../include/def.h \ ../include/shm.h \ - ../include/elem_def.h \ - ../include/elem_circ.h \ - ../include/elem_link.h \ ../include/waiter.h \ ../include/queue.h \ ../include/ipc.h \ @@ -28,11 +25,15 @@ HEADERS += \ ../include/pool_alloc.h \ ../include/buffer.h \ ../src/memory/detail.h \ - ../src/memory/alloc.hpp \ - ../src/memory/wrapper.hpp \ - ../src/memory/resource.hpp \ + ../src/memory/alloc.h \ + ../src/memory/wrapper.h \ + ../src/memory/resource.h \ ../src/platform/detail.h \ - ../src/platform/waiter.h + ../src/platform/waiter.h \ + ../src/circ/elem_def.h \ + ../src/circ/elem_array.h \ + ../src/prod_cons.h \ + ../src/policy.h SOURCES += \ ../src/shm.cpp \ diff --git a/include/def.h b/include/def.h index e8fae87..5466cf9 100644 --- a/include/def.h +++ b/include/def.h @@ -49,11 +49,6 @@ enum : std::size_t { data_length = 16 }; -enum class orgnz { // data structure organization - linked, - cyclic -}; - enum class relat { // multiplicity of the relationship single, multi @@ -64,10 +59,10 @@ enum class trans { // transmission broadcast }; -// producer-consumer policy declaration +// producer-consumer policy flag -template -struct prod_cons; +template +struct prod_cons {}; // concept helpers diff --git a/include/elem_circ.h b/include/elem_circ.h deleted file mode 100644 index cfca3ab..0000000 --- a/include/elem_circ.h +++ /dev/null @@ -1,323 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "def.h" -#include "rw_lock.h" -#include "elem_def.h" - -#include "platform/waiter.h" - -namespace ipc { - -namespace circ { -namespace detail { - -using u1_t = uint_t<8>; -using u2_t = uint_t<16>; - -constexpr u1_t index_of(u2_t c) noexcept { - return static_cast(c); -} - -struct elem_head { - std::atomic rc_ { 0 }; // read counter -}; - -template -struct elem_t { - elem_head head_; - byte_t data_[DataSize] {}; -}; - -template <> -struct elem_t<0> { - elem_head head_; -}; - -template -elem_t* elem_of(void* ptr) noexcept { - return reinterpret_cast*>(static_cast(ptr) - sizeof(elem_head)); -} - -} // namespace detail -} // namespace circ - -//////////////////////////////////////////////////////////////// -/// producer-consumer policies -//////////////////////////////////////////////////////////////// - -template <> -struct prod_cons { - std::atomic rd_ { 0 }; // read index - std::atomic wt_ { 0 }; // write index - -#if __cplusplus >= 201703L - template - constexpr static std::size_t elem_param = DataSize - sizeof(circ::detail::elem_head); -#else /*__cplusplus < 201703L*/ - template - struct elem_param { - enum : std::size_t { - value = DataSize - sizeof(circ::detail::elem_head) - }; - }; -#endif/*__cplusplus < 201703L*/ - - constexpr circ::detail::u2_t cursor() const noexcept { - return 0; - } - - template - bool push(E* /*elems*/, F&& f, circ::detail::elem_t* elem_start) { - auto cur_wt = circ::detail::index_of(wt_.load(std::memory_order_acquire)); - if (cur_wt == circ::detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) { - return false; // full - } - std::forward(f)(elem_start + cur_wt); - wt_.fetch_add(1, std::memory_order_release); - return true; - } - - template - bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t* elem_start) noexcept { - auto cur_rd = circ::detail::index_of(rd_.load(std::memory_order_acquire)); - if (cur_rd == circ::detail::index_of(wt_.load(std::memory_order_relaxed))) { - return false; // empty - } - std::forward(f)(elem_start + cur_rd); - rd_.fetch_add(1, std::memory_order_release); - return true; - } -}; - -template <> -struct prod_cons - : prod_cons { - - template - bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t* elem_start) noexcept { - byte_t buff[sizeof(circ::detail::elem_t)]; - for (unsigned k = 0;;) { - auto cur_rd = rd_.load(std::memory_order_acquire); - if (circ::detail::index_of(cur_rd) == - circ::detail::index_of(wt_.load(std::memory_order_relaxed))) { - return false; // empty - } - std::memcpy(buff, elem_start + circ::detail::index_of(cur_rd), sizeof(buff)); - if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { - std::forward(f)(buff); - return true; - } - ipc::yield(k); - } - } -}; - -template <> -struct prod_cons - : prod_cons { - - std::atomic ct_ { 0 }; // commit index - - template - bool push(E* /*elems*/, F&& f, circ::detail::elem_t* elem_start) { - circ::detail::u2_t cur_ct, nxt_ct; - while(1) { - cur_ct = ct_.load(std::memory_order_acquire); - if (circ::detail::index_of(nxt_ct = cur_ct + 1) == - circ::detail::index_of(rd_.load(std::memory_order_relaxed))) { - return false; // full - } - if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) { - break; - } - std::this_thread::yield(); - } - std::forward(f)(elem_start + circ::detail::index_of(cur_ct)); - while(1) { - auto exp_wt = cur_ct; - if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { - break; - } - std::this_thread::yield(); - } - return true; - } -}; - -template <> -struct prod_cons { - std::atomic wt_ { 0 }; // write index - -#if __cplusplus >= 201703L - template - constexpr static std::size_t elem_param = DataSize; -#else /*__cplusplus < 201703L*/ - template - struct elem_param { enum : std::size_t { value = DataSize }; }; -#endif/*__cplusplus < 201703L*/ - - /* - std::atomic may not have value_type. - See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type - */ - using rc_t = decltype(circ::detail::elem_head::rc_.load()); - - circ::detail::u2_t cursor() const noexcept { - return wt_.load(std::memory_order_acquire); - } - - template - bool push(E* elems, F&& f, circ::detail::elem_t* elem_start) { - auto conn_cnt = elems->conn_count(); // acquire - if (conn_cnt == 0) return false; - auto el = elem_start + circ::detail::index_of(wt_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - while(1) { - rc_t expected = 0; - if (el->head_.rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_relaxed)) { - break; - } - std::this_thread::yield(); - conn_cnt = elems->conn_count(); // acquire - if (conn_cnt == 0) return false; - } - std::forward(f)(el->data_); - wt_.fetch_add(1, std::memory_order_release); - return true; - } - - template - bool pop(E* /*elems*/, circ::detail::u2_t& cur, F&& f, circ::detail::elem_t* elem_start) noexcept { - if (cur == cursor()) return false; // acquire - auto el = elem_start + circ::detail::index_of(cur++); - std::forward(f)(el->data_); - for (unsigned k = 0;;) { - rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire); - if (cur_rc == 0) { - return true; - } - if (el->head_.rc_.compare_exchange_weak( - cur_rc, cur_rc - 1, std::memory_order_release)) { - return true; - } - ipc::yield(k); - } - } -}; - -template <> -struct prod_cons - : prod_cons { - - std::atomic ct_ { 0 }; // commit index - - template - bool push(E* elems, F&& f, circ::detail::elem_t* elem_start) { - auto conn_cnt = elems->conn_count(); // acquire - if (conn_cnt == 0) return false; - circ::detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed), - nxt_ct = cur_ct + 1; - auto el = elem_start + circ::detail::index_of(cur_ct); - // check all consumers have finished reading this element - while(1) { - rc_t expected = 0; - if (el->head_.rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_relaxed)) { - break; - } - std::this_thread::yield(); - conn_cnt = elems->conn_count(); // acquire - if (conn_cnt == 0) return false; - } - std::forward(f)(el->data_); - while(1) { - auto exp_wt = cur_ct; - if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { - break; - } - std::this_thread::yield(); - } - return true; - } -}; - -template -using prod_cons_circ = prod_cons; - -namespace circ { - -//////////////////////////////////////////////////////////////// -/// element-array implementation -//////////////////////////////////////////////////////////////// - -template -class elem_array : private Policy { -public: - using policy_t = Policy; - using base_t = Policy; - using head_t = ipc::conn_head; -#if __cplusplus >= 201703L - using elem_t = detail::elem_t>; -#else /*__cplusplus < 201703L*/ - using elem_t = detail::elem_t::value>; -#endif/*__cplusplus < 201703L*/ - - enum : std::size_t { - head_size = sizeof(policy_t) + sizeof(head_t), - data_size = DataSize, - elem_max = (std::numeric_limits>::max)() + 1, // default is 255 + 1 - elem_size = sizeof(elem_t), - block_size = elem_size * elem_max - }; - -private: - head_t head_; - ipc::detail::waiter waiter_; - elem_t block_[elem_max]; - -public: - elem_array() = default; - elem_array(const elem_array&) = delete; - elem_array& operator=(const elem_array&) = delete; - - auto & waiter() { return this->waiter_; } - auto const & waiter() const { return this->waiter_; } - - auto & conn_waiter() { return head_.conn_waiter(); } - auto const & conn_waiter() const { return head_.conn_waiter(); } - - std::size_t connect () noexcept { return head_.connect (); } - std::size_t disconnect() noexcept { return head_.disconnect(); } - std::size_t conn_count() const noexcept { return head_.conn_count(); } - - using base_t::cursor; - - template - bool push(F&& f) noexcept { - return base_t::push(this, std::forward(f), block_); - } - - template - bool pop(detail::u2_t* cur, F&& f) noexcept { - if (cur == nullptr) return false; - return base_t::pop(this, *cur, std::forward(f), block_); - } -}; - -template -struct prod_cons { - using is_fixed = std::true_type; - - template - using elems_t = elem_array>; -}; - -} // namespace circ -} // namespace ipc diff --git a/include/elem_def.h b/include/elem_def.h deleted file mode 100644 index fc51f5a..0000000 --- a/include/elem_def.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "platform/waiter.h" - -namespace ipc { - -template -struct conn_head { - ipc::detail::waiter cc_waiter_; - std::atomic cc_ { 0 }; // connection counter - - auto & conn_waiter() { return this->cc_waiter_; } - auto const & conn_waiter() const { return this->cc_waiter_; } - - std::size_t connect() noexcept { - return cc_.fetch_add(1, std::memory_order_release); - } - - std::size_t disconnect() noexcept { - return cc_.fetch_sub(1, std::memory_order_release); - } - - std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { - return cc_.load(order); - } -}; - -} // namespace ipc diff --git a/include/elem_link.h b/include/elem_link.h deleted file mode 100644 index 9148d29..0000000 --- a/include/elem_link.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "def.h" -#include "rw_lock.h" -#include "elem_def.h" - -namespace ipc { - -namespace link { - - - -} // namespace link -} // namespace ipc diff --git a/include/ipc.h b/include/ipc.h index e5f1f30..af0dfae 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -7,14 +7,13 @@ #include "def.h" #include "buffer.h" #include "shm.h" -#include "elem_circ.h" namespace ipc { using handle_t = void*; using buff_t = buffer; -template +template struct IPC_EXPORT channel_detail { static handle_t connect (char const * name); static void disconnect(handle_t h); @@ -138,7 +137,7 @@ public: * (one producer/server/sender to multi consumers/clients/receivers) */ using route = channel_impl + ipc::prod_cons >>; /* @@ -150,7 +149,7 @@ using route = channel_impl + ipc::prod_cons >>; } // namespace ipc diff --git a/src/circ/elem_array.h b/src/circ/elem_array.h new file mode 100644 index 0000000..e8897cd --- /dev/null +++ b/src/circ/elem_array.h @@ -0,0 +1,61 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "def.h" +#include "rw_lock.h" + +#include "circ/elem_def.h" +#include "platform/detail.h" + +namespace ipc { +namespace circ { + +//////////////////////////////////////////////////////////////// +/// element-array implementation +//////////////////////////////////////////////////////////////// + +template +class elem_array : public ipc::circ::conn_head { +public: + using base_t = ipc::circ::conn_head; + using policy_t = Policy; +#if __cplusplus >= 201703L + using elem_t = ipc::circ::elem_t>; +#else /*__cplusplus < 201703L*/ + using elem_t = ipc::circ::elem_t::value>; +#endif/*__cplusplus < 201703L*/ + + enum : std::size_t { + head_size = sizeof(base_t) + sizeof(policy_t), + data_size = DataSize, + elem_max = (std::numeric_limits>::max)() + 1, // default is 255 + 1 + elem_size = sizeof(elem_t), + block_size = elem_size * elem_max + }; + +private: + policy_t head_; + elem_t block_[elem_max]; + +public: + auto cursor() const noexcept { return head_.cursor(); } + + template + bool push(F&& f) { + return head_.push(this, std::forward(f), block_); + } + + template + bool pop(decltype(std::declval().cursor())* cur, F&& f) { + if (cur == nullptr) return false; + return head_.pop(this, *cur, std::forward(f), block_); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h new file mode 100644 index 0000000..b9ee247 --- /dev/null +++ b/src/circ/elem_def.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include + +#include "platform/waiter.h" + +namespace ipc { +namespace circ { + +struct elem_head { + std::atomic rc_ { 0 }; // read-counter +}; + +template +struct elem_t { + elem_head head_; + byte_t data_[DataSize] {}; +}; + +template <> +struct elem_t<0> { + elem_head head_; +}; + +template +elem_t* elem_of(void* ptr) noexcept { + return reinterpret_cast*>(static_cast(ptr) - sizeof(elem_head)); +} + +using u1_t = ipc::uint_t<8>; +using u2_t = ipc::uint_t<16>; + +constexpr u1_t index_of(u2_t c) noexcept { + return static_cast(c); +} + +class conn_head { + ipc::detail::waiter cc_waiter_, waiter_; + std::atomic cc_ { 0 }; // connection counter + +public: + conn_head() = default; + conn_head(const conn_head&) = delete; + conn_head& operator=(const conn_head&) = delete; + + auto & waiter() noexcept { return this->waiter_; } + auto const & waiter() const noexcept { return this->waiter_; } + + auto & conn_waiter() noexcept { return this->cc_waiter_; } + auto const & conn_waiter() const noexcept { return this->cc_waiter_; } + + std::size_t connect() noexcept { + return cc_.fetch_add(1, std::memory_order_release); + } + + std::size_t disconnect() noexcept { + return cc_.fetch_sub(1, std::memory_order_release); + } + + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { + return cc_.load(order); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 02db9c0..f3053fd 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -11,7 +11,8 @@ #include "tls_pointer.h" #include "queue.h" -#include "memory/resource.hpp" +#include "policy.h" +#include "memory/resource.h" namespace { @@ -217,54 +218,57 @@ static buff_t recv(ipc::handle_t h) { }; // detail_impl +template +using policy_t = policy::choose; + } // internal-linkage namespace ipc { -template -ipc::handle_t channel_detail::connect(char const * name) { - return detail_impl::connect(name); +template +ipc::handle_t channel_detail::connect(char const * name) { + return detail_impl>::connect(name); } -template -void channel_detail::disconnect(ipc::handle_t h) { - detail_impl::disconnect(h); +template +void channel_detail::disconnect(ipc::handle_t h) { + detail_impl>::disconnect(h); } -template -std::size_t channel_detail::recv_count(ipc::handle_t h) { - return detail_impl::recv_count(h); +template +std::size_t channel_detail::recv_count(ipc::handle_t h) { + return detail_impl>::recv_count(h); } -template -bool channel_detail::wait_for_recv(ipc::handle_t h, std::size_t r_count) { - return detail_impl::wait_for_recv(h, r_count); +template +bool channel_detail::wait_for_recv(ipc::handle_t h, std::size_t r_count) { + return detail_impl>::wait_for_recv(h, r_count); } -template -void channel_detail::clear_recv(ipc::handle_t h) { - detail_impl::clear_recv(h); +template +void channel_detail::clear_recv(ipc::handle_t h) { + detail_impl>::clear_recv(h); } -template -void channel_detail::clear_recv(char const * name) { - detail_impl::clear_recv(name); +template +void channel_detail::clear_recv(char const * name) { + detail_impl>::clear_recv(name); } -template -bool channel_detail::send(ipc::handle_t h, void const * data, std::size_t size) { - return detail_impl::send(h, data, size); +template +bool channel_detail::send(ipc::handle_t h, void const * data, std::size_t size) { + return detail_impl>::send(h, data, size); } -template -buff_t channel_detail::recv(ipc::handle_t h) { - return detail_impl::recv(h); +template +buff_t channel_detail::recv(ipc::handle_t h) { + return detail_impl>::recv(h); } -template struct channel_detail>; -template struct channel_detail>; -template struct channel_detail>; -template struct channel_detail>; -template struct channel_detail>; +template struct channel_detail>; +template struct channel_detail>; +template struct channel_detail>; +template struct channel_detail>; +template struct channel_detail>; } // namespace ipc diff --git a/src/memory/alloc.hpp b/src/memory/alloc.h similarity index 100% rename from src/memory/alloc.hpp rename to src/memory/alloc.h diff --git a/src/memory/detail.h b/src/memory/detail.h index c756b11..327b691 100644 --- a/src/memory/detail.h +++ b/src/memory/detail.h @@ -2,7 +2,7 @@ #include -#include "memory/alloc.hpp" +#include "memory/alloc.h" #include "platform/detail.h" namespace ipc { diff --git a/src/memory/resource.hpp b/src/memory/resource.h similarity index 93% rename from src/memory/resource.hpp rename to src/memory/resource.h index 554a8d0..f69d174 100644 --- a/src/memory/resource.hpp +++ b/src/memory/resource.h @@ -9,8 +9,8 @@ #include "def.h" -#include "memory/alloc.hpp" -#include "memory/wrapper.hpp" +#include "memory/alloc.h" +#include "memory/wrapper.h" #include "memory/detail.h" #include "platform/detail.h" diff --git a/src/memory/wrapper.hpp b/src/memory/wrapper.h similarity index 99% rename from src/memory/wrapper.hpp rename to src/memory/wrapper.h index adb1cad..c632da5 100644 --- a/src/memory/wrapper.hpp +++ b/src/memory/wrapper.h @@ -15,7 +15,7 @@ #include "rw_lock.h" #include "tls_pointer.h" -#include "memory/alloc.hpp" +#include "memory/alloc.h" #include "memory/detail.h" #include "platform/detail.h" diff --git a/src/platform/detail.h b/src/platform/detail.h index 22296ea..760544a 100644 --- a/src/platform/detail.h +++ b/src/platform/detail.h @@ -4,6 +4,8 @@ #include #include +#include "def.h" + #if __cplusplus >= 201703L namespace std { diff --git a/src/platform/shm_linux.cpp b/src/platform/shm_linux.cpp index c41d57c..d7c435d 100644 --- a/src/platform/shm_linux.cpp +++ b/src/platform/shm_linux.cpp @@ -12,7 +12,7 @@ #include #include "def.h" -#include "memory/resource.hpp" +#include "memory/resource.h" namespace { diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index 3e6a671..6a7cf7c 100644 --- a/src/platform/shm_win.cpp +++ b/src/platform/shm_win.cpp @@ -8,7 +8,7 @@ #include "def.h" -#include "memory/resource.hpp" +#include "memory/resource.h" #include "platform/to_tchar.h" namespace { diff --git a/src/policy.h b/src/policy.h new file mode 100644 index 0000000..d97bfa8 --- /dev/null +++ b/src/policy.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include "def.h" +#include "prod_cons.h" + +#include "circ/elem_array.h" + +namespace ipc { +namespace policy { + +template