diff --git a/build/src.pro b/build/src.pro index 0fb29af..105b35a 100644 --- a/build/src.pro +++ b/build/src.pro @@ -23,12 +23,13 @@ HEADERS += \ ../include/ipc.h \ ../include/def.h \ ../include/rw_lock.h \ - ../include/tls_pointer.h + ../include/tls_pointer.h \ + ../src/route.hpp \ + ../src/channel.hpp SOURCES += \ ../src/shm.cpp \ - ../src/ipc.cpp \ - ../src/route.cpp + ../src/ipc.cpp unix { diff --git a/include/circ_queue.h b/include/circ_queue.h index f5c191e..d97c03a 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "def.h" #include "circ_elem_array.h" @@ -108,40 +109,42 @@ public: return true; } - template - static T multi_pop(QArr& ques, std::size_t size, At&& at) { - if (size == 0) throw std::invalid_argument { "Invalid size." }; + template + static queue* multi_wait_for(F&& upd) { while (1) { + auto [ques, size] = upd(); + if (size == 0) throw std::invalid_argument { "Invalid size." }; for (std::size_t i = 0; i < size; ++i) { - queue* cq = at(ques, i); - if (cq->elems_ == nullptr) throw std::logic_error { + queue* que = ques[i]; + if (que->elems_ == nullptr) throw std::logic_error { "This queue hasn't attached any elem_array." }; - if (cq->cursor_ != cq->elems_->cursor()) { - auto item_ptr = static_cast(cq->elems_->take(cq->cursor_++)); - T item = std::move(*item_ptr); - cq->elems_->put(item_ptr); - return item; + if (que->cursor_ != que->elems_->cursor()) { + return que; } } std::this_thread::yield(); } } - static T multi_pop(queue* ques, std::size_t size) { - if (ques == nullptr) throw std::invalid_argument { "Invalid ques pointer." }; - return multi_pop(ques, size, [](queue* ques, std::size_t i) { - return ques + i; - }); + static T pop(queue* que) { + if (que == nullptr) throw std::invalid_argument { + "Invalid ques pointer." + }; + if (que->elems_ == nullptr) throw std::logic_error { + "This queue hasn't attached any elem_array." + }; + auto item_ptr = static_cast(que->elems_->take(que->cursor_++)); + T item = std::move(*item_ptr); + que->elems_->put(item_ptr); + return item; } - static T multi_pop(std::vector& ques) { - return multi_pop(ques, ques.size(), [](auto& ques, std::size_t i) { - return ques[i]; - }); + T pop() { + return pop(multi_wait_for([this] { + return std::make_tuple(&this, 1); + })); } - - T pop() { return multi_pop(this, 1); } }; } // namespace circ diff --git a/include/ipc.h b/include/ipc.h index 38690ab..fd30548 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -78,28 +78,43 @@ private: route_* p_; }; -///* -// * class channel -//*/ -//class IPC_EXPORT channel { -//public: -// channel(); -// channel(char const * name); -// channel(channel&& rhs); +/* + * class channel + * + * You could use multi producers/servers/senders for sending messages to a channel, + * then all the consumers/clients/receivers which are receiving with this channel, + * would receive your sent messages. +*/ +class IPC_EXPORT channel { +public: + channel(); + channel(char const * name); + channel(channel&& rhs); -// ~channel(); + ~channel(); -// void swap(channel& rhs); -// channel& operator=(channel rhs); + void swap(channel& rhs); + channel& operator=(channel rhs); -// bool valid() const; -// char const * name () const; + bool valid() const; + char const * name () const; -// channel clone() const; + channel clone() const; -//private: -// class channel_; -// channel_* p_; -//}; + bool connect(char const * name); + void disconnect(); + + std::size_t recv_count() const; + + bool send(void const * data, std::size_t size); + bool send(buff_t const & buff); + bool send(std::string const & str); + + buff_t recv(); + +private: + class channel_; + channel_* p_; +}; } // namespace ipc diff --git a/src/channel.hpp b/src/channel.hpp new file mode 100644 index 0000000..004452c --- /dev/null +++ b/src/channel.hpp @@ -0,0 +1,120 @@ +#include "ipc.h" + +#include +#include + +#include "def.h" +#include "shm.h" + +namespace { + +using namespace ipc; + +struct ch_info_t { + std::atomic> ch_acc_; // only support 256 channels with one name +}; + +} // internal-linkage + +//////////////////////////////////////////////////////////////// +/// class channel implementation +//////////////////////////////////////////////////////////////// + +namespace ipc { + +class channel::channel_ : public pimpl { +public: + shm::handle h_; + route r_; + + ch_info_t* info() { + return static_cast(h_.get()); + } + + auto& acc() { + return info()->ch_acc_; + } +}; + +channel::channel() + : p_(p_->make()) { +} + +channel::channel(char const * name) + : channel() { + this->connect(name); +} + +channel::channel(channel&& rhs) + : channel() { + swap(rhs); +} + +channel::~channel() { + disconnect(); + p_->clear(); +} + +void channel::swap(channel& rhs) { + std::swap(p_, rhs.p_); +} + +channel& channel::operator=(channel rhs) { + swap(rhs); + return *this; +} + +bool channel::valid() const { + return impl(p_)->h_.valid() && impl(p_)->r_.valid(); +} + +char const * channel::name() const { + std::string n { impl(p_)->h_.name() }; + n.pop_back(); + return n.c_str(); +} + +channel channel::clone() const { + return { name() }; +} + +bool channel::connect(char const * name) { + if (name == nullptr || name[0] == '\0') { + return false; + } + this->disconnect(); + using namespace std::literals::string_literals; + if (!impl(p_)->h_.acquire((name + "_"s).c_str(), sizeof(ch_info_t))) { + return false; + } + auto cur_id = impl(p_)->acc().fetch_add(1, std::memory_order_relaxed); + impl(p_)->r_.connect((name + std::to_string(cur_id)).c_str()); + return valid(); +} + +void channel::disconnect() { + impl(p_)->r_.disconnect(); + impl(p_)->h_.release(); +} + +std::size_t channel::recv_count() const { + return 0; +} + +bool channel::send(void const *data, std::size_t size) { + return false; +} + +bool channel::send(buff_t const & buff) { + return false; +} + +bool channel::send(std::string const & str) { + return false; +} + +buff_t channel::recv() { + return {}; +} + +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 6901928..45a9209 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -42,6 +42,15 @@ inline std::atomic_size_t* acc_of(queue_t* que) { return reinterpret_cast(que->elems()) - 1; } +inline auto& recv_cache() { + /* + * the performance of tls::pointer is not good enough + * so regardless of the mingw-crash-problem for the moment + */ + thread_local std::unordered_map rc; + return rc; +} + } // internal-linkage namespace ipc { @@ -130,8 +139,39 @@ bool send(handle_t h, void const * data, std::size_t size) { return true; } -buff_t recv(handle_t h) { - return recv(&h, 1); +template +buff_t updating_recv(F&& upd) { + auto& rc = recv_cache(); + while(1) { + // pop a new message + auto msg = queue_t::pop(queue_t::multi_wait_for(upd)); + // msg.remain_ may minus & abs(msg.remain_) < data_length + std::size_t remain = static_cast( + static_cast(data_length) + msg.remain_); + // find cache with msg.id_ + auto cache_it = rc.find(msg.id_); + if (cache_it == rc.end()) { + if (remain <= data_length) { + return make_buff(msg.data_, remain); + } + // cache the first message fragment + else rc.emplace(msg.id_, make_buff(msg.data_)); + } + // has cached before this message + else { + auto& cache = cache_it->second; + // this is the last message fragment + if (msg.remain_ <= 0) { + cache.insert(cache.end(), msg.data_, msg.data_ + remain); + // finish this message, erase it from cache + auto buf = std::move(cache); + rc.erase(cache_it); + return buf; + } + // there are remain datas after this message + cache.insert(cache.end(), msg.data_, msg.data_ + data_length); + } + } } buff_t recv(handle_t const * hs, std::size_t size) { @@ -146,41 +186,16 @@ buff_t recv(handle_t const * hs, std::size_t size) { if (q_arr.empty()) { return {}; } - /* - * the performance of tls::pointer is not good enough - * so regardless of the mingw-crash-problem for the moment - */ - thread_local std::unordered_map rcs; - while(1) { - // pop a new message - auto msg = queue_t::multi_pop(q_arr); - // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast( - static_cast(data_length) + msg.remain_); - // find cache with msg.id_ - auto cache_it = rcs.find(msg.id_); - if (cache_it == rcs.end()) { - if (remain <= data_length) { - return make_buff(msg.data_, remain); - } - // cache the first message fragment - else rcs.emplace(msg.id_, make_buff(msg.data_)); - } - // has cached before this message - else { - auto& cache = cache_it->second; - // this is the last message fragment - if (msg.remain_ <= 0) { - cache.insert(cache.end(), msg.data_, msg.data_ + remain); - // finish this message, erase it from cache - auto buf = std::move(cache); - rcs.erase(cache_it); - return buf; - } - // there are remain datas after this message - cache.insert(cache.end(), msg.data_, msg.data_ + data_length); - } - } + return updating_recv([&] { + return std::forward_as_tuple(q_arr, q_arr.size()); + }); +} + +buff_t recv(handle_t h) { + return recv(&h, 1); } } // namespace ipc + +#include "route.hpp" +#include "channel.hpp" diff --git a/src/route.cpp b/src/route.hpp similarity index 90% rename from src/route.cpp rename to src/route.hpp index 4748b27..2b684ff 100644 --- a/src/route.cpp +++ b/src/route.hpp @@ -1,6 +1,10 @@ #include "ipc.h" #include "def.h" +//////////////////////////////////////////////////////////////// +/// class route implementation +//////////////////////////////////////////////////////////////// + namespace ipc { class route::route_ : public pimpl {