re-add ipc::channel (TBD); use hpp instead of cpp to implement channel/route, cause it could visit the internal entities which in ipc.cpp

This commit is contained in:
mutouyun 2018-12-19 16:37:26 +08:00
parent e860eb5a4f
commit d0b4999af4
6 changed files with 237 additions and 79 deletions

View File

@ -23,12 +23,13 @@ HEADERS += \
../include/ipc.h \
../include/def.h \
../include/rw_lock.h \
../include/tls_pointer.h
../include/tls_pointer.h \
../src/route.hpp \
../src/channel.hpp
SOURCES += \
../src/shm.cpp \
../src/ipc.cpp \
../src/route.cpp
../src/ipc.cpp
unix {

View File

@ -6,6 +6,7 @@
#include <utility>
#include <algorithm>
#include <atomic>
#include <tuple>
#include "def.h"
#include "circ_elem_array.h"
@ -108,40 +109,42 @@ public:
return true;
}
template <typename QArr, typename At>
static T multi_pop(QArr& ques, std::size_t size, At&& at) {
if (size == 0) throw std::invalid_argument { "Invalid size." };
template <typename F>
static queue* multi_wait_for(F&& upd) {
while (1) {
auto [ques, size] = upd();
if (size == 0) throw std::invalid_argument { "Invalid size." };
for (std::size_t i = 0; i < size; ++i) {
queue* cq = at(ques, i);
if (cq->elems_ == nullptr) throw std::logic_error {
queue* que = ques[i];
if (que->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array."
};
if (cq->cursor_ != cq->elems_->cursor()) {
auto item_ptr = static_cast<T*>(cq->elems_->take(cq->cursor_++));
T item = std::move(*item_ptr);
cq->elems_->put(item_ptr);
return item;
if (que->cursor_ != que->elems_->cursor()) {
return que;
}
}
std::this_thread::yield();
}
}
static T multi_pop(queue* ques, std::size_t size) {
if (ques == nullptr) throw std::invalid_argument { "Invalid ques pointer." };
return multi_pop(ques, size, [](queue* ques, std::size_t i) {
return ques + i;
});
static T pop(queue* que) {
if (que == nullptr) throw std::invalid_argument {
"Invalid ques pointer."
};
if (que->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array."
};
auto item_ptr = static_cast<T*>(que->elems_->take(que->cursor_++));
T item = std::move(*item_ptr);
que->elems_->put(item_ptr);
return item;
}
static T multi_pop(std::vector<queue*>& ques) {
return multi_pop(ques, ques.size(), [](auto& ques, std::size_t i) {
return ques[i];
});
T pop() {
return pop(multi_wait_for([this] {
return std::make_tuple(&this, 1);
}));
}
T pop() { return multi_pop(this, 1); }
};
} // namespace circ

View File

@ -78,28 +78,43 @@ private:
route_* p_;
};
///*
// * class channel
//*/
//class IPC_EXPORT channel {
//public:
// channel();
// channel(char const * name);
// channel(channel&& rhs);
/*
* class channel
*
* You could use multi producers/servers/senders for sending messages to a channel,
* then all the consumers/clients/receivers which are receiving with this channel,
* would receive your sent messages.
*/
class IPC_EXPORT channel {
public:
channel();
channel(char const * name);
channel(channel&& rhs);
// ~channel();
~channel();
// void swap(channel& rhs);
// channel& operator=(channel rhs);
void swap(channel& rhs);
channel& operator=(channel rhs);
// bool valid() const;
// char const * name () const;
bool valid() const;
char const * name () const;
// channel clone() const;
channel clone() const;
//private:
// class channel_;
// channel_* p_;
//};
bool connect(char const * name);
void disconnect();
std::size_t recv_count() const;
bool send(void const * data, std::size_t size);
bool send(buff_t const & buff);
bool send(std::string const & str);
buff_t recv();
private:
class channel_;
channel_* p_;
};
} // namespace ipc

120
src/channel.hpp Normal file
View File

@ -0,0 +1,120 @@
#include "ipc.h"
#include <atomic>
#include <string>
#include "def.h"
#include "shm.h"
namespace {
using namespace ipc;
struct ch_info_t {
std::atomic<uint_t<8>> ch_acc_; // only support 256 channels with one name
};
} // internal-linkage
////////////////////////////////////////////////////////////////
/// class channel implementation
////////////////////////////////////////////////////////////////
namespace ipc {
class channel::channel_ : public pimpl<channel_> {
public:
shm::handle h_;
route r_;
ch_info_t* info() {
return static_cast<ch_info_t*>(h_.get());
}
auto& acc() {
return info()->ch_acc_;
}
};
channel::channel()
: p_(p_->make()) {
}
channel::channel(char const * name)
: channel() {
this->connect(name);
}
channel::channel(channel&& rhs)
: channel() {
swap(rhs);
}
channel::~channel() {
disconnect();
p_->clear();
}
void channel::swap(channel& rhs) {
std::swap(p_, rhs.p_);
}
channel& channel::operator=(channel rhs) {
swap(rhs);
return *this;
}
bool channel::valid() const {
return impl(p_)->h_.valid() && impl(p_)->r_.valid();
}
char const * channel::name() const {
std::string n { impl(p_)->h_.name() };
n.pop_back();
return n.c_str();
}
channel channel::clone() const {
return { name() };
}
bool channel::connect(char const * name) {
if (name == nullptr || name[0] == '\0') {
return false;
}
this->disconnect();
using namespace std::literals::string_literals;
if (!impl(p_)->h_.acquire((name + "_"s).c_str(), sizeof(ch_info_t))) {
return false;
}
auto cur_id = impl(p_)->acc().fetch_add(1, std::memory_order_relaxed);
impl(p_)->r_.connect((name + std::to_string(cur_id)).c_str());
return valid();
}
void channel::disconnect() {
impl(p_)->r_.disconnect();
impl(p_)->h_.release();
}
std::size_t channel::recv_count() const {
return 0;
}
bool channel::send(void const *data, std::size_t size) {
return false;
}
bool channel::send(buff_t const & buff) {
return false;
}
bool channel::send(std::string const & str) {
return false;
}
buff_t channel::recv() {
return {};
}
} // namespace ipc

View File

@ -42,6 +42,15 @@ inline std::atomic_size_t* acc_of(queue_t* que) {
return reinterpret_cast<std::atomic_size_t*>(que->elems()) - 1;
}
inline auto& recv_cache() {
/*
* the performance of tls::pointer is not good enough
* so regardless of the mingw-crash-problem for the moment
*/
thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rc;
return rc;
}
} // internal-linkage
namespace ipc {
@ -130,8 +139,39 @@ bool send(handle_t h, void const * data, std::size_t size) {
return true;
}
buff_t recv(handle_t h) {
return recv(&h, 1);
template <typename F>
buff_t updating_recv(F&& upd) {
auto& rc = recv_cache();
while(1) {
// pop a new message
auto msg = queue_t::pop(queue_t::multi_wait_for(upd));
// msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_);
// find cache with msg.id_
auto cache_it = rc.find(msg.id_);
if (cache_it == rc.end()) {
if (remain <= data_length) {
return make_buff(msg.data_, remain);
}
// cache the first message fragment
else rc.emplace(msg.id_, make_buff(msg.data_));
}
// has cached before this message
else {
auto& cache = cache_it->second;
// this is the last message fragment
if (msg.remain_ <= 0) {
cache.insert(cache.end(), msg.data_, msg.data_ + remain);
// finish this message, erase it from cache
auto buf = std::move(cache);
rc.erase(cache_it);
return buf;
}
// there are remain datas after this message
cache.insert(cache.end(), msg.data_, msg.data_ + data_length);
}
}
}
buff_t recv(handle_t const * hs, std::size_t size) {
@ -146,41 +186,16 @@ buff_t recv(handle_t const * hs, std::size_t size) {
if (q_arr.empty()) {
return {};
}
/*
* the performance of tls::pointer is not good enough
* so regardless of the mingw-crash-problem for the moment
*/
thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rcs;
while(1) {
// pop a new message
auto msg = queue_t::multi_pop(q_arr);
// msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_);
// find cache with msg.id_
auto cache_it = rcs.find(msg.id_);
if (cache_it == rcs.end()) {
if (remain <= data_length) {
return make_buff(msg.data_, remain);
}
// cache the first message fragment
else rcs.emplace(msg.id_, make_buff(msg.data_));
}
// has cached before this message
else {
auto& cache = cache_it->second;
// this is the last message fragment
if (msg.remain_ <= 0) {
cache.insert(cache.end(), msg.data_, msg.data_ + remain);
// finish this message, erase it from cache
auto buf = std::move(cache);
rcs.erase(cache_it);
return buf;
}
// there are remain datas after this message
cache.insert(cache.end(), msg.data_, msg.data_ + data_length);
}
}
return updating_recv([&] {
return std::forward_as_tuple(q_arr, q_arr.size());
});
}
buff_t recv(handle_t h) {
return recv(&h, 1);
}
} // namespace ipc
#include "route.hpp"
#include "channel.hpp"

View File

@ -1,6 +1,10 @@
#include "ipc.h"
#include "def.h"
////////////////////////////////////////////////////////////////
/// class route implementation
////////////////////////////////////////////////////////////////
namespace ipc {
class route::route_ : public pimpl<route_> {