diff --git a/build/ipc.pro b/build/ipc.pro index c894a13..4d14f8e 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -27,9 +27,6 @@ HEADERS += \ ../include/tls_pointer.h \ ../include/pool_alloc.h \ ../include/buffer.h \ - ../src/channel.inc \ - ../src/route.inc \ - ../src/id_pool.inc \ ../src/memory/alloc.hpp \ ../src/memory/wrapper.hpp \ ../src/memory/resource.hpp diff --git a/include/circ_queue.h b/include/circ_queue.h index 6913067..03d4251 100644 Binary files a/include/circ_queue.h and b/include/circ_queue.h differ diff --git a/include/ipc.h b/include/ipc.h index e59196a..583cc90 100644 Binary files a/include/ipc.h and b/include/ipc.h differ diff --git a/src/channel.inc b/src/channel.inc deleted file mode 100644 index a77920c..0000000 --- a/src/channel.inc +++ /dev/null @@ -1,197 +0,0 @@ -#include "ipc.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "def.h" -#include "shm.h" -#include "rw_lock.h" - -#include "id_pool.inc" - -namespace { - -using namespace ipc; - -#pragma pack(1) -struct ch_info_t { - rw_lock lc_; - id_pool ch_acc_; // only support 255 channels with one name -}; -#pragma pack() - -} // internal-linkage - -//////////////////////////////////////////////////////////////// -/// class channel implementation -//////////////////////////////////////////////////////////////// - -namespace ipc { - -class channel::channel_ : public pimpl { -public: - shm::handle h_; - route r_; - - std::string n_; - std::size_t id_; - bool marked_ = false; - - std::array rts_; - - ch_info_t& info() { - return *static_cast(h_.get()); - } - - auto& acc() { - return info().ch_acc_; - } - - void mark_id() { - if (marked_) return; - marked_ = true; - [[maybe_unused]] auto guard = std::unique_lock { info().lc_ }; - acc().mark_acquired(id_); - } - - auto& sender() { - mark_id(); - return r_; - } -}; - -channel::channel() - : p_(p_->make()) { -} - -channel::channel(char const * name) - : channel() { - this->connect(name); -} - -channel::channel(channel&& rhs) - : channel() { - swap(rhs); -} - -channel::~channel() { - disconnect(); - p_->clear(); -} - -void channel::swap(channel& rhs) { - std::swap(p_, rhs.p_); -} - -channel& channel::operator=(channel rhs) { - swap(rhs); - return *this; -} - -bool channel::valid() const { - return impl(p_)->h_.valid() && impl(p_)->r_.valid(); -} - -char const * channel::name() const { - return impl(p_)->n_.c_str(); -} - -channel channel::clone() const { - return channel { name() }; -} - -bool channel::connect(char const * name) { - if (name == nullptr || name[0] == '\0') { - return false; - } - this->disconnect(); - if (!impl(p_)->h_.acquire(((impl(p_)->n_ = name) + "_").c_str(), sizeof(ch_info_t))) { - return false; - } - { - [[maybe_unused]] auto guard = std::unique_lock { impl(p_)->info().lc_ }; - if (impl(p_)->acc().invalid()) { - impl(p_)->acc().init(); - } - impl(p_)->id_ = impl(p_)->acc().acquire(); - } - if (impl(p_)->id_ == invalid_value) { - return false; - } - impl(p_)->r_.connect((name + std::to_string(impl(p_)->id_)).c_str()); - return valid(); -} - -void channel::disconnect() { - if (!valid()) return; - { - [[maybe_unused]] auto guard = std::unique_lock { impl(p_)->info().lc_ }; - impl(p_)->acc().release(impl(p_)->id_); - } - for (auto& rt : impl(p_)->rts_) { - rt.disconnect(); - } - impl(p_)->r_.disconnect(); - impl(p_)->h_.release(); -} - -std::size_t channel::recv_count() const { - return impl(p_)->r_.recv_count(); -} - -void channel::wait_for_recv(std::size_t r_count) const { - for (unsigned k = 0; impl(p_)->sender().recv_count() < r_count;) { - ipc::sleep(k); - } -} - -bool channel::send(void const * data, std::size_t size) { - return impl(p_)->sender().send(data, size); -} - -bool channel::send(buff_t const & buff) { - return impl(p_)->sender().send(buff); -} - -bool channel::send(std::string const & str) { - return impl(p_)->sender().send(str); -} - -buff_t channel::recv() { - if (!valid()) return {}; - std::array ques; - return ipc::multi_recv([&] { - std::array acqs; - std::size_t counter = 0; - // get all acquired ids - { - [[maybe_unused]] auto guard = std::shared_lock { impl(p_)->info().lc_ }; - impl(p_)->acc().for_acquired([this, &acqs, &counter](std::size_t id) { - if (id == impl(p_)->id_) return; - acqs[counter++] = id; - }); - } - // populate route cache & ques - for (std::size_t k = 0; k < counter; ++k) { - std::size_t id = acqs[k]; - auto& it = impl(p_)->rts_[id]; - // it's a new id - if (!it.valid()) { - it.connect((impl(p_)->n_ + std::to_string(id)).c_str()); - queue_of(it.handle())->connect(); - } - // get queue of this route - ques[k] = queue_of(it.handle()); - } - return std::make_tuple(ques.data(), counter); - }); -} - -} // namespace ipc diff --git a/src/id_pool.inc b/src/id_pool.inc deleted file mode 100644 index 1fed0c2..0000000 --- a/src/id_pool.inc +++ /dev/null @@ -1,83 +0,0 @@ -#include - -#include "def.h" - -namespace ipc { - -class id_pool { -public: - enum : std::size_t { - max_count = (std::numeric_limits>::max)() // 255 - }; - -private: - uint_t<8> acquir_ = 0; - uint_t<8> cursor_ = 0; - uint_t<8> next_[max_count] {}; - -public: - void init() { - acquir_ = max_count; - for (std::size_t i = 0; i < max_count;) { - i = next_[i] = static_cast>(i + 1); - } - } - - bool invalid() const { - static id_pool inv; - return std::memcmp(this, &inv, sizeof(id_pool)) == 0; - } - - bool empty() const { - return cursor_ == max_count; - } - - std::size_t acquire() { - if (empty()) { - return invalid_value; - } - std::size_t id = cursor_; - cursor_ = next_[id]; // point to next - return id; - } - - void mark_acquired(std::size_t id) { - next_[id] = acquir_; - acquir_ = static_cast>(id); // put it in acquired list - } - - bool release(std::size_t id) { - if (acquir_ == max_count) return false; - if (acquir_ == id) { - acquir_ = next_[id]; // point to next - } - else { - auto a = next_[acquir_], l = acquir_; - while (1) { - if (a == max_count) { - return false; // found nothing - } - if (a == id) { - next_[l] = next_[a]; - break; - } - l = a; - a = next_[a]; - } - } - next_[id] = cursor_; - cursor_ = static_cast>(id); // put it back - return true; - } - - template - void for_acquired(F&& fr) { - auto a = acquir_; - while (a != max_count) { - fr(a); - a = next_[a]; - } - } -}; - -} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 3490b55..a7a1878 100644 Binary files a/src/ipc.cpp and b/src/ipc.cpp differ diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index 2b4f612..39fa203 100644 Binary files a/src/platform/shm_win.cpp and b/src/platform/shm_win.cpp differ diff --git a/src/route.inc b/src/route.inc deleted file mode 100644 index b098e93..0000000 --- a/src/route.inc +++ /dev/null @@ -1,94 +0,0 @@ -#include "ipc.h" -#include "def.h" - -//////////////////////////////////////////////////////////////// -/// class route implementation -//////////////////////////////////////////////////////////////// - -namespace ipc { - -class route::route_ : public pimpl { -public: - handle_t h_ = nullptr; - std::string n_; -}; - -route::route() - : p_(p_->make()) { -} - -route::route(char const * name) - : route() { - this->connect(name); -} - -route::route(route&& rhs) - : route() { - swap(rhs); -} - -route::~route() { - disconnect(); - p_->clear(); -} - -void route::swap(route& rhs) { - std::swap(p_, rhs.p_); -} - -route& route::operator=(route rhs) { - swap(rhs); - return *this; -} - -bool route::valid() const { - return (handle() != nullptr); -} - -char const * route::name() const { - return impl(p_)->n_.c_str(); -} - -handle_t route::handle() const { - return impl(p_)->h_; -} - -route route::clone() const { - return route { name() }; -} - -bool route::connect(char const * name) { - if (name == nullptr || name[0] == '\0') return false; - this->disconnect(); - impl(p_)->h_ = ipc::connect((impl(p_)->n_ = name).c_str()); - return valid(); -} - -void route::disconnect() { - if (!valid()) return; - ipc::disconnect(impl(p_)->h_); - impl(p_)->h_ = nullptr; - impl(p_)->n_.clear(); -} - -std::size_t route::recv_count() const { - return ipc::recv_count(impl(p_)->h_); -} - -bool route::send(void const * data, std::size_t size) { - return ipc::send(impl(p_)->h_, data, size); -} - -bool route::send(buff_t const & buff) { - return route::send(buff.data(), buff.size()); -} - -bool route::send(std::string const & str) { - return route::send(str.c_str(), str.size() + 1); -} - -buff_t route::recv() { - return ipc::recv(impl(p_)->h_); -} - -} // namespace ipc diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 0fe66b6..4ca7c68 100644 Binary files a/test/test_circ.cpp and b/test/test_circ.cpp differ diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 3bb481c..1d941f8 100644 Binary files a/test/test_ipc.cpp and b/test/test_ipc.cpp differ