complete all functions of ipc::channel (TODO: ut, benchmark)

This commit is contained in:
mutouyun 2018-12-20 16:31:38 +08:00
parent 70e1ac6865
commit e54e9898fc
10 changed files with 186 additions and 37 deletions

View File

@ -25,7 +25,8 @@ HEADERS += \
../include/rw_lock.h \ ../include/rw_lock.h \
../include/tls_pointer.h \ ../include/tls_pointer.h \
../src/route.hpp \ ../src/route.hpp \
../src/channel.hpp ../src/channel.hpp \
../src/id_pool.hpp
SOURCES += \ SOURCES += \
../src/shm.cpp \ ../src/shm.cpp \

View File

@ -41,25 +41,25 @@ public:
} }
std::size_t connect() { std::size_t connect() {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return invalid_value;
if (connected_.exchange(true, std::memory_order_relaxed)) { if (connected_.exchange(true, std::memory_order_relaxed)) {
// if it's already connected, just return an error count // if it's already connected, just return an error count
return error_count; return invalid_value;
} }
return elems_->connect(); return elems_->connect();
} }
std::size_t disconnect() { std::size_t disconnect() {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return invalid_value;
if (!connected_.exchange(false, std::memory_order_relaxed)) { if (!connected_.exchange(false, std::memory_order_relaxed)) {
// if it's already disconnected, just return an error count // if it's already disconnected, just return an error count
return error_count; return invalid_value;
} }
return elems_->disconnect(); return elems_->disconnect();
} }
std::size_t conn_count() const { std::size_t conn_count() const {
return (elems_ == nullptr) ? error_count : elems_->conn_count(); return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
} }
bool connected() const { bool connected() const {
@ -116,6 +116,7 @@ public:
if (size == 0) throw std::invalid_argument { "Invalid size." }; if (size == 0) throw std::invalid_argument { "Invalid size." };
for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) { for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
queue* que = ques[i]; queue* que = ques[i];
if (que == nullptr) continue;
if (que->elems_ == nullptr) throw std::logic_error { if (que->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array." "This queue hasn't attached any elem_array."
}; };

View File

@ -27,8 +27,8 @@ using uint_t = typename uint<N>::type;
// constants // constants
enum : std::size_t { enum : std::size_t {
error_count = (std::numeric_limits<std::size_t>::max)(), invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 16 data_length = 16
}; };
// concept helpers // concept helpers

View File

@ -57,8 +57,9 @@ public:
void swap(route& rhs); void swap(route& rhs);
route& operator=(route rhs); route& operator=(route rhs);
bool valid() const; bool valid () const;
char const * name () const; char const * name () const;
handle_t handle() const;
route clone() const; route clone() const;

View File

@ -2,17 +2,28 @@
#include <atomic> #include <atomic>
#include <string> #include <string>
#include <array>
#include <limits>
#include <shared_mutex>
#include <mutex>
#include <unordered_map>
#include "def.h" #include "def.h"
#include "shm.h" #include "shm.h"
#include "rw_lock.h"
#include "id_pool.hpp"
namespace { namespace {
using namespace ipc; using namespace ipc;
#pragma pack(1)
struct ch_info_t { struct ch_info_t {
std::atomic<uint_t<8>> ch_acc_; // only support 256 channels with one name rw_lock lc_;
id_pool ch_acc_; // only support 255 channels with one name
}; };
#pragma pack()
} // internal-linkage } // internal-linkage
@ -27,12 +38,19 @@ public:
shm::handle h_; shm::handle h_;
route r_; route r_;
ch_info_t* info() { std::string n_;
return static_cast<ch_info_t*>(h_.get()); std::size_t id_;
std::unordered_map<std::size_t, route> rts_;
~channel_(void) { rts_.clear(); }
ch_info_t& info() {
return *static_cast<ch_info_t*>(h_.get());
} }
auto& acc() { auto& acc() {
return info()->ch_acc_; return info().ch_acc_;
} }
}; };
@ -69,9 +87,7 @@ bool channel::valid() const {
} }
char const * channel::name() const { char const * channel::name() const {
std::string n { impl(p_)->h_.name() }; return impl(p_)->n_.c_str();
n.pop_back();
return n.c_str();
} }
channel channel::clone() const { channel channel::clone() const {
@ -83,38 +99,85 @@ bool channel::connect(char const * name) {
return false; return false;
} }
this->disconnect(); this->disconnect();
using namespace std::literals::string_literals; if (!impl(p_)->h_.acquire(((impl(p_)->n_ = name) + "_").c_str(), sizeof(ch_info_t))) {
if (!impl(p_)->h_.acquire((name + "_"s).c_str(), sizeof(ch_info_t))) {
return false; return false;
} }
auto cur_id = impl(p_)->acc().fetch_add(1, std::memory_order_relaxed); {
impl(p_)->r_.connect((name + std::to_string(cur_id)).c_str()); std::unique_lock<rw_lock> guard { impl(p_)->info().lc_ };
if (impl(p_)->acc().invalid()) {
impl(p_)->acc().init();
}
impl(p_)->id_ = impl(p_)->acc().acquire();
}
if (impl(p_)->id_ == invalid_value) {
return false;
}
impl(p_)->r_.connect((name + std::to_string(impl(p_)->id_)).c_str());
return valid(); return valid();
} }
void channel::disconnect() { void channel::disconnect() {
if (!valid()) return;
{
std::unique_lock<rw_lock> guard { impl(p_)->info().lc_ };
impl(p_)->acc().release(impl(p_)->id_);
}
impl(p_)->rts_.clear();
impl(p_)->r_.disconnect(); impl(p_)->r_.disconnect();
impl(p_)->h_.release(); impl(p_)->h_.release();
} }
std::size_t channel::recv_count() const { std::size_t channel::recv_count() const {
return 0; return impl(p_)->r_.recv_count();
} }
bool channel::send(void const * /*data*/, std::size_t /*size*/) { bool channel::send(void const * data, std::size_t size) {
return false; return impl(p_)->r_.send(data, size);
} }
bool channel::send(buff_t const & /*buff*/) { bool channel::send(buff_t const & buff) {
return false; return impl(p_)->r_.send(buff);
} }
bool channel::send(std::string const & /*str*/) { bool channel::send(std::string const & str) {
return false; return impl(p_)->r_.send(str);
} }
buff_t channel::recv() { buff_t channel::recv() {
return {}; if (!valid()) return {};
std::array<queue_t*, id_pool::max_count> ques;
return ipc::multi_recv([&] {
std::array<std::size_t, id_pool::max_count> acqeds;
std::size_t counter = 0;
std::unordered_map<std::size_t, route> cache;
// get all acquired ids
{
std::shared_lock<rw_lock> guard { impl(p_)->info().lc_ };
impl(p_)->acc().for_each([&](std::size_t id, bool acquired) {
if (id == impl(p_)->id_) return;
if (acquired) {
acqeds[counter++] = id;
}
});
}
// populate route cache & ques
for (std::size_t i = 0; i < counter; ++i) {
auto id = acqeds[i];
auto it = impl(p_)->rts_.find(id);
// it's a new id
if (it == impl(p_)->rts_.end()) {
it = cache.emplace(id, (impl(p_)->n_ + std::to_string(id)).c_str()).first;
queue_of(it->second.handle())->connect();
}
// it's an existing id
else it = cache.insert(impl(p_)->rts_.extract(it)).position;
// get queue of this route
ques[i] = queue_of(it->second.handle());
}
// update rts mapping
impl(p_)->rts_.swap(cache);
return std::make_tuple(ques.data(), counter);
});
} }
} // namespace ipc } // namespace ipc

56
src/id_pool.hpp Normal file
View File

@ -0,0 +1,56 @@
#include <cstring>
#include "def.h"
namespace ipc {
class id_pool {
public:
enum : std::size_t {
max_count = (std::numeric_limits<uint_t<8>>::max)() // 255
};
private:
uint_t<8> cursor_ = 0;
uint_t<8> block_[max_count] {};
public:
void init() {
for (std::size_t i = 0; i < max_count;) {
i = block_[i] = static_cast<uint_t<8>>(i + 1);
}
}
bool invalid() const {
static id_pool inv;
return std::memcmp(this, &inv, sizeof(id_pool)) == 0;
}
bool empty() const {
return cursor_ == max_count;
}
std::size_t acquire() {
if (empty()) {
return invalid_value;
}
std::size_t id = cursor_;
cursor_ = block_[id]; // point to next
block_[id] = 0; // clear flag
return id;
}
void release(std::size_t id) {
block_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back
}
template <typename F>
void for_each(F&& fr) {
for (std::size_t i = 0; i < max_count; ++i) {
fr(i, block_[i] == 0);
}
}
};
} // namespace ipc

View File

@ -5,14 +5,10 @@
#include <cstring> #include <cstring>
#include <algorithm> #include <algorithm>
#include <utility> #include <utility>
//#include <shared_mutex>
//#include <mutex>
#include <atomic> #include <atomic>
#include "def.h" #include "def.h"
#include "circ_queue.h" #include "circ_queue.h"
//#include "rw_lock.h"
//#include "tls_pointer.h"
namespace { namespace {
@ -83,7 +79,7 @@ void disconnect(handle_t h) {
std::size_t recv_count(handle_t h) { std::size_t recv_count(handle_t h) {
auto que = queue_of(h); auto que = queue_of(h);
if (que == nullptr) { if (que == nullptr) {
return error_count; return invalid_value;
} }
return que->conn_count(); return que->conn_count();
} }

View File

@ -42,13 +42,17 @@ route& route::operator=(route rhs) {
} }
bool route::valid() const { bool route::valid() const {
return (impl(p_)->h_ != nullptr); return (handle() != nullptr);
} }
char const * route::name() const { char const * route::name() const {
return impl(p_)->n_.c_str(); return impl(p_)->n_.c_str();
} }
handle_t route::handle() const {
return impl(p_)->h_;
}
route route::clone() const { route route::clone() const {
return { name() }; return { name() };
} }

View File

@ -124,12 +124,12 @@ struct test_cq<ipc::circ::queue<T>> {
cn_t* connect() { cn_t* connect() {
cn_t* queue = new cn_t { ca_ }; cn_t* queue = new cn_t { ca_ };
[&] { QVERIFY(queue->connect() != ipc::error_count); } (); [&] { QVERIFY(queue->connect() != ipc::invalid_value); } ();
return queue; return queue;
} }
void disconnect(cn_t* queue) { void disconnect(cn_t* queue) {
QVERIFY(queue->disconnect() != ipc::error_count); QVERIFY(queue->disconnect() != ipc::invalid_value);
QVERIFY(queue->detach() != nullptr); QVERIFY(queue->detach() != nullptr);
delete queue; delete queue;
} }

View File

@ -122,6 +122,7 @@ private slots:
void test_send_recv(); void test_send_recv();
void test_route(); void test_route();
void test_route_performance(); void test_route_performance();
void test_channel();
} unit__; } unit__;
#include "test_ipc.moc" #include "test_ipc.moc"
@ -388,4 +389,30 @@ void Unit::test_route_performance() {
test_performance<1, 10>::start(); test_performance<1, 10>::start();
} }
void Unit::test_channel() {
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {
ipc::buff_t dd = cc.recv();
if (dd.size() < 2) return;
QCOMPARE(dd, datas__[i]);
}
}};
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
for (std::size_t i = 0; i < (std::min)(100, LoopCount); ++i) {
std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
cc.send(datas__[i]);
}
cc.send(ipc::buff_t { '\0' });
t1.join();
}};
t2.join();
}
} // internal-linkage } // internal-linkage