diff --git a/build/src.pro b/build/src.pro index 105b35a..5777716 100644 --- a/build/src.pro +++ b/build/src.pro @@ -25,7 +25,8 @@ HEADERS += \ ../include/rw_lock.h \ ../include/tls_pointer.h \ ../src/route.hpp \ - ../src/channel.hpp + ../src/channel.hpp \ + ../src/id_pool.hpp SOURCES += \ ../src/shm.cpp \ diff --git a/include/circ_queue.h b/include/circ_queue.h index 84b4ffc..85c2b66 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -41,25 +41,25 @@ public: } std::size_t connect() { - if (elems_ == nullptr) return error_count; + if (elems_ == nullptr) return invalid_value; if (connected_.exchange(true, std::memory_order_relaxed)) { // if it's already connected, just return an error count - return error_count; + return invalid_value; } return elems_->connect(); } std::size_t disconnect() { - if (elems_ == nullptr) return error_count; + if (elems_ == nullptr) return invalid_value; if (!connected_.exchange(false, std::memory_order_relaxed)) { // if it's already disconnected, just return an error count - return error_count; + return invalid_value; } return elems_->disconnect(); } std::size_t conn_count() const { - return (elems_ == nullptr) ? error_count : elems_->conn_count(); + return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); } bool connected() const { @@ -116,6 +116,7 @@ public: if (size == 0) throw std::invalid_argument { "Invalid size." }; for (std::size_t i = 0; i < static_cast(size); ++i) { queue* que = ques[i]; + if (que == nullptr) continue; if (que->elems_ == nullptr) throw std::logic_error { "This queue hasn't attached any elem_array." }; diff --git a/include/def.h b/include/def.h index 4edbaa7..ad90dd1 100644 --- a/include/def.h +++ b/include/def.h @@ -27,8 +27,8 @@ using uint_t = typename uint::type; // constants enum : std::size_t { - error_count = (std::numeric_limits::max)(), - data_length = 16 + invalid_value = (std::numeric_limits::max)(), + data_length = 16 }; // concept helpers diff --git a/include/ipc.h b/include/ipc.h index fd30548..c5e37af 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -57,8 +57,9 @@ public: void swap(route& rhs); route& operator=(route rhs); - bool valid() const; - char const * name () const; + bool valid () const; + char const * name () const; + handle_t handle() const; route clone() const; diff --git a/src/channel.hpp b/src/channel.hpp index ec47dba..0468831 100644 --- a/src/channel.hpp +++ b/src/channel.hpp @@ -2,17 +2,28 @@ #include #include +#include +#include +#include +#include +#include #include "def.h" #include "shm.h" +#include "rw_lock.h" + +#include "id_pool.hpp" namespace { using namespace ipc; +#pragma pack(1) struct ch_info_t { - std::atomic> ch_acc_; // only support 256 channels with one name + rw_lock lc_; + id_pool ch_acc_; // only support 255 channels with one name }; +#pragma pack() } // internal-linkage @@ -27,12 +38,19 @@ public: shm::handle h_; route r_; - ch_info_t* info() { - return static_cast(h_.get()); + std::string n_; + std::size_t id_; + + std::unordered_map rts_; + + ~channel_(void) { rts_.clear(); } + + ch_info_t& info() { + return *static_cast(h_.get()); } auto& acc() { - return info()->ch_acc_; + return info().ch_acc_; } }; @@ -69,9 +87,7 @@ bool channel::valid() const { } char const * channel::name() const { - std::string n { impl(p_)->h_.name() }; - n.pop_back(); - return n.c_str(); + return impl(p_)->n_.c_str(); } channel channel::clone() const { @@ -83,38 +99,85 @@ bool channel::connect(char const * name) { return false; } this->disconnect(); - using namespace std::literals::string_literals; - if (!impl(p_)->h_.acquire((name + "_"s).c_str(), sizeof(ch_info_t))) { + if (!impl(p_)->h_.acquire(((impl(p_)->n_ = name) + "_").c_str(), sizeof(ch_info_t))) { return false; } - auto cur_id = impl(p_)->acc().fetch_add(1, std::memory_order_relaxed); - impl(p_)->r_.connect((name + std::to_string(cur_id)).c_str()); + { + std::unique_lock guard { impl(p_)->info().lc_ }; + if (impl(p_)->acc().invalid()) { + impl(p_)->acc().init(); + } + impl(p_)->id_ = impl(p_)->acc().acquire(); + } + if (impl(p_)->id_ == invalid_value) { + return false; + } + impl(p_)->r_.connect((name + std::to_string(impl(p_)->id_)).c_str()); return valid(); } void channel::disconnect() { + if (!valid()) return; + { + std::unique_lock guard { impl(p_)->info().lc_ }; + impl(p_)->acc().release(impl(p_)->id_); + } + impl(p_)->rts_.clear(); impl(p_)->r_.disconnect(); impl(p_)->h_.release(); } std::size_t channel::recv_count() const { - return 0; + return impl(p_)->r_.recv_count(); } -bool channel::send(void const * /*data*/, std::size_t /*size*/) { - return false; +bool channel::send(void const * data, std::size_t size) { + return impl(p_)->r_.send(data, size); } -bool channel::send(buff_t const & /*buff*/) { - return false; +bool channel::send(buff_t const & buff) { + return impl(p_)->r_.send(buff); } -bool channel::send(std::string const & /*str*/) { - return false; +bool channel::send(std::string const & str) { + return impl(p_)->r_.send(str); } buff_t channel::recv() { - return {}; + if (!valid()) return {}; + std::array ques; + return ipc::multi_recv([&] { + std::array acqeds; + std::size_t counter = 0; + std::unordered_map cache; + // get all acquired ids + { + std::shared_lock guard { impl(p_)->info().lc_ }; + impl(p_)->acc().for_each([&](std::size_t id, bool acquired) { + if (id == impl(p_)->id_) return; + if (acquired) { + acqeds[counter++] = id; + } + }); + } + // populate route cache & ques + for (std::size_t i = 0; i < counter; ++i) { + auto id = acqeds[i]; + auto it = impl(p_)->rts_.find(id); + // it's a new id + if (it == impl(p_)->rts_.end()) { + it = cache.emplace(id, (impl(p_)->n_ + std::to_string(id)).c_str()).first; + queue_of(it->second.handle())->connect(); + } + // it's an existing id + else it = cache.insert(impl(p_)->rts_.extract(it)).position; + // get queue of this route + ques[i] = queue_of(it->second.handle()); + } + // update rts mapping + impl(p_)->rts_.swap(cache); + return std::make_tuple(ques.data(), counter); + }); } } // namespace ipc diff --git a/src/id_pool.hpp b/src/id_pool.hpp new file mode 100644 index 0000000..a3d3400 --- /dev/null +++ b/src/id_pool.hpp @@ -0,0 +1,56 @@ +#include + +#include "def.h" + +namespace ipc { + +class id_pool { +public: + enum : std::size_t { + max_count = (std::numeric_limits>::max)() // 255 + }; + +private: + uint_t<8> cursor_ = 0; + uint_t<8> block_[max_count] {}; + +public: + void init() { + for (std::size_t i = 0; i < max_count;) { + i = block_[i] = static_cast>(i + 1); + } + } + + bool invalid() const { + static id_pool inv; + return std::memcmp(this, &inv, sizeof(id_pool)) == 0; + } + + bool empty() const { + return cursor_ == max_count; + } + + std::size_t acquire() { + if (empty()) { + return invalid_value; + } + std::size_t id = cursor_; + cursor_ = block_[id]; // point to next + block_[id] = 0; // clear flag + return id; + } + + void release(std::size_t id) { + block_[id] = cursor_; + cursor_ = static_cast>(id); // put it back + } + + template + void for_each(F&& fr) { + for (std::size_t i = 0; i < max_count; ++i) { + fr(i, block_[i] == 0); + } + } +}; + +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 644bab4..1094153 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -5,14 +5,10 @@ #include #include #include -//#include -//#include #include #include "def.h" #include "circ_queue.h" -//#include "rw_lock.h" -//#include "tls_pointer.h" namespace { @@ -83,7 +79,7 @@ void disconnect(handle_t h) { std::size_t recv_count(handle_t h) { auto que = queue_of(h); if (que == nullptr) { - return error_count; + return invalid_value; } return que->conn_count(); } diff --git a/src/route.hpp b/src/route.hpp index 4558cf5..9bdfc07 100644 --- a/src/route.hpp +++ b/src/route.hpp @@ -42,13 +42,17 @@ route& route::operator=(route rhs) { } bool route::valid() const { - return (impl(p_)->h_ != nullptr); + return (handle() != nullptr); } char const * route::name() const { return impl(p_)->n_.c_str(); } +handle_t route::handle() const { + return impl(p_)->h_; +} + route route::clone() const { return { name() }; } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 2ba10b6..1c72b49 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -124,12 +124,12 @@ struct test_cq> { cn_t* connect() { cn_t* queue = new cn_t { ca_ }; - [&] { QVERIFY(queue->connect() != ipc::error_count); } (); + [&] { QVERIFY(queue->connect() != ipc::invalid_value); } (); return queue; } void disconnect(cn_t* queue) { - QVERIFY(queue->disconnect() != ipc::error_count); + QVERIFY(queue->disconnect() != ipc::invalid_value); QVERIFY(queue->detach() != nullptr); delete queue; } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index dbff110..089e83b 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -122,6 +122,7 @@ private slots: void test_send_recv(); void test_route(); void test_route_performance(); + void test_channel(); } unit__; #include "test_ipc.moc" @@ -388,4 +389,30 @@ void Unit::test_route_performance() { test_performance<1, 10>::start(); } +void Unit::test_channel() { + std::thread t1 {[&] { + ipc::channel cc { "my-ipc-channel" }; + for (std::size_t i = 0;; ++i) { + ipc::buff_t dd = cc.recv(); + if (dd.size() < 2) return; + QCOMPARE(dd, datas__[i]); + } + }}; + + std::thread t2 {[&] { + ipc::channel cc { "my-ipc-channel" }; + while (cc.recv_count() == 0) { + std::this_thread::yield(); + } + for (std::size_t i = 0; i < (std::min)(100, LoopCount); ++i) { + std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; + cc.send(datas__[i]); + } + cc.send(ipc::buff_t { '\0' }); + t1.join(); + }}; + + t2.join(); +} + } // internal-linkage