redesigning & refactoring (TBD)

This commit is contained in:
mutouyun 2018-12-18 20:09:31 +08:00
parent 2e05a0260d
commit 69bc556a52
11 changed files with 192 additions and 247 deletions

View File

@ -17,36 +17,31 @@ struct alignas(std::max_align_t) elem_array_head {
std::atomic<u2_t> cc_ { 0 }; // connection counter, using for broadcast std::atomic<u2_t> cc_ { 0 }; // connection counter, using for broadcast
std::atomic<u2_t> wt_ { 0 }; // write index std::atomic<u2_t> wt_ { 0 }; // write index
std::atomic<u2_t> lc_ { 0 }; // write spin lock flag
static u1_t index_of(u2_t c) { return static_cast<u1_t>(c); } static u1_t index_of(u2_t c) { return static_cast<u1_t>(c); }
std::size_t connect() { std::size_t connect() {
return cc_.fetch_add(1, std::memory_order_release); return cc_.fetch_add(1, std::memory_order_relaxed);
} }
std::size_t disconnect() { std::size_t disconnect() {
return cc_.fetch_sub(1, std::memory_order_release); return cc_.fetch_sub(1, std::memory_order_relaxed);
} }
std::size_t conn_count() const { std::size_t conn_count() const {
return cc_.load(std::memory_order_acquire); return cc_.load(std::memory_order_relaxed);
} }
u2_t cursor() const { u2_t cursor() const {
return wt_.load(std::memory_order_acquire); return wt_.load(std::memory_order_relaxed);
} }
auto acquire() { auto acquire() {
while (lc_.exchange(1, std::memory_order_acquire)) { return index_of(wt_.load(std::memory_order_acquire));
std::this_thread::yield();
}
return index_of(wt_.load(std::memory_order_relaxed));
} }
void commit() { void commit() {
wt_.fetch_add(1, std::memory_order_relaxed); wt_.fetch_add(1, std::memory_order_release);
lc_.store(0, std::memory_order_release);
} }
}; };
@ -112,10 +107,11 @@ public:
if (el->head_.rc_.compare_exchange_weak( if (el->head_.rc_.compare_exchange_weak(
expected, expected,
static_cast<uint_t<32>>(conn_count()), static_cast<uint_t<32>>(conn_count()),
std::memory_order_release)) { std::memory_order_relaxed)) {
break; break;
} }
std::this_thread::yield(); std::this_thread::yield();
std::atomic_thread_fence(std::memory_order_acquire);
} }
return el->data_; return el->data_;
} }

View File

@ -5,6 +5,7 @@
#include <exception> #include <exception>
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include <atomic>
#include "def.h" #include "def.h"
#include "circ_elem_array.h" #include "circ_elem_array.h"
@ -20,7 +21,7 @@ public:
private: private:
array_t* elems_ = nullptr; array_t* elems_ = nullptr;
typename array_t::u2_t cursor_ = 0; typename array_t::u2_t cursor_ = 0;
bool connected_ = false; std::atomic_bool connected_ { false };
public: public:
queue() = default; queue() = default;
@ -29,36 +30,30 @@ public:
attach(arr); attach(arr);
} }
queue(queue&& rhs) : queue() { queue(const queue&) = delete;
swap(rhs); queue& operator=(const queue&) = delete;
} queue(queue&&) = delete;
queue& operator=(queue&&) = delete;
void swap(queue& rhs) { constexpr array_t * elems() const {
std::swap(elems_ , rhs.elems_ );
std::swap(cursor_ , rhs.cursor_ );
std::swap(connected_, rhs.connected_);
}
queue& operator=(queue rhs) {
swap(rhs);
return *this;
}
array_t * elems() {
return elems_; return elems_;
} }
std::size_t connect() { std::size_t connect() {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return error_count;
if (connected_) return error_count; if (connected_.exchange(true, std::memory_order_relaxed)) {
connected_ = true; // if it's already connected, just return an error count
return error_count;
}
return elems_->connect(); return elems_->connect();
} }
std::size_t disconnect() { std::size_t disconnect() {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return error_count;
if (!connected_) return error_count; if (!connected_.exchange(false, std::memory_order_relaxed)) {
connected_ = false; // if it's already disconnected, just return an error count
return error_count;
}
return elems_->disconnect(); return elems_->disconnect();
} }
@ -67,7 +62,7 @@ public:
} }
bool connected() const { bool connected() const {
return connected_; return connected_.load(std::memory_order_relaxed);
} }
array_t* attach(array_t* arr) { array_t* attach(array_t* arr) {
@ -95,7 +90,7 @@ public:
template <typename P> template <typename P>
auto push(P&& param) // disable this if P is the same as T auto push(P&& param) // disable this if P is the same as T
-> std::enable_if_t<!std::is_same<std::remove_reference_t<P>, T>::value, bool> { -> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
::new (ptr) T { std::forward<P>(param) }; ::new (ptr) T { std::forward<P>(param) };
@ -105,7 +100,7 @@ public:
template <typename... P> template <typename... P>
auto push(P&&... params) // some old compilers are not support this well auto push(P&&... params) // some old compilers are not support this well
-> std::enable_if_t<(sizeof...(P) != 1), bool> { -> Requires<(sizeof...(P) != 1), bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
::new (ptr) T { std::forward<P>(params)... }; ::new (ptr) T { std::forward<P>(params)... };
@ -113,18 +108,40 @@ public:
return true; return true;
} }
T pop() { template <typename QArr, typename At>
if (elems_ == nullptr) throw std::invalid_argument { static T multi_pop(QArr& ques, std::size_t size, At&& at) {
"This queue hasn't attached any elem_array." if (size == 0) throw std::invalid_argument { "Invalid size." };
}; while (1) {
while (cursor_ == elems_->cursor()) { for (std::size_t i = 0; i < size; ++i) {
queue* cq = at(ques, i);
if (cq->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array."
};
if (cq->cursor_ != cq->elems_->cursor()) {
auto item_ptr = static_cast<T*>(cq->elems_->take(cq->cursor_++));
T item = std::move(*item_ptr);
cq->elems_->put(item_ptr);
return item;
}
}
std::this_thread::yield(); std::this_thread::yield();
} }
auto item_ptr = static_cast<T*>(elems_->take(cursor_++));
T item = *item_ptr;
elems_->put(item_ptr);
return item;
} }
static T multi_pop(queue* ques, std::size_t size) {
if (ques == nullptr) throw std::invalid_argument { "Invalid ques pointer." };
return multi_pop(ques, size, [](queue* ques, std::size_t i) {
return ques + i;
});
}
static T multi_pop(std::vector<queue*>& ques) {
return multi_pop(ques, ques.size(), [](auto& ques, std::size_t i) {
return ques[i];
});
}
T pop() { return multi_pop(this, 1); }
}; };
} // namespace circ } // namespace circ

View File

@ -9,8 +9,8 @@
namespace ipc { namespace ipc {
using shm::handle_t; using handle_t = void*;
using buff_t = std::vector<byte_t>; using buff_t = std::vector<byte_t>;
IPC_EXPORT buff_t make_buff(void const * data, std::size_t size); IPC_EXPORT buff_t make_buff(void const * data, std::size_t size);
@ -25,21 +25,30 @@ IPC_EXPORT std::size_t recv_count(handle_t h);
IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size);
IPC_EXPORT buff_t recv(handle_t h); IPC_EXPORT buff_t recv(handle_t h);
class IPC_EXPORT channel { /*
* This function could wait & recv messages from multi handles
* which have the *SAME* connected name.
*/
IPC_EXPORT buff_t recv(handle_t const * hs, std::size_t size);
template <std::size_t N>
buff_t recv(handle_t const (& hs)[N]) { return recv(hs, N); }
class IPC_EXPORT route {
public: public:
channel(); route();
channel(char const * name); route(char const * name);
channel(channel&& rhs); route(route&& rhs);
~channel(); ~route();
void swap(channel& rhs); void swap(route& rhs);
channel& operator=(channel rhs); route& operator=(route rhs);
bool valid() const; bool valid() const;
char const * name () const; char const * name () const;
channel clone() const; route clone() const;
bool connect(char const * name); bool connect(char const * name);
void disconnect(); void disconnect();
@ -53,8 +62,8 @@ public:
buff_t recv(); buff_t recv();
private: private:
class channel_; class route_;
channel_* p_; route_* p_;
}; };
} // namespace ipc } // namespace ipc

View File

@ -7,12 +7,8 @@
namespace ipc { namespace ipc {
namespace shm { namespace shm {
using handle_t = void*; IPC_EXPORT void* acquire(char const * name, std::size_t size);
IPC_EXPORT void release(void* mem, std::size_t size);
IPC_EXPORT handle_t acquire(char const * name, std::size_t size);
IPC_EXPORT void release(handle_t h, std::size_t size);
IPC_EXPORT void* open (handle_t h);
IPC_EXPORT void close (void* mem);
class IPC_EXPORT handle { class IPC_EXPORT handle {
public: public:
@ -32,8 +28,7 @@ public:
bool acquire(char const * name, std::size_t size); bool acquire(char const * name, std::size_t size);
void release(); void release();
void* get (); void* get() const;
void close();
private: private:
class handle_; class handle_;

View File

@ -1,18 +1,17 @@
#include "ipc.h" #include "ipc.h"
#include <unordered_map> #include <unordered_map>
#include <memory>
#include <type_traits> #include <type_traits>
#include <cstring> #include <cstring>
#include <algorithm> #include <algorithm>
#include <utility> #include <utility>
#include <shared_mutex> //#include <shared_mutex>
#include <mutex> //#include <mutex>
#include <atomic> #include <atomic>
#include "def.h" #include "def.h"
#include "circ_queue.h" #include "circ_queue.h"
#include "rw_lock.h" //#include "rw_lock.h"
//#include "tls_pointer.h" //#include "tls_pointer.h"
namespace { namespace {
@ -29,42 +28,18 @@ struct msg_t {
#pragma pack() #pragma pack()
using queue_t = circ::queue<msg_t>; using queue_t = circ::queue<msg_t>;
using guard_t = std::unique_ptr<std::remove_pointer_t<handle_t>, void(*)(handle_t)>;
struct shm_info_t { struct shm_info_t {
std::atomic_size_t id_acc_; // message id accumulator std::atomic_size_t id_acc_; // message id accumulator
queue_t::array_t elems_; // the circ_elem_array in shm queue_t::array_t elems_; // the circ_elem_array in shm
}; };
///* constexpr queue_t* queue_of(handle_t h) {
// * thread_local stl object's destructor causing crash return static_cast<queue_t*>(h);
// * See: https://sourceforge.net/p/mingw-w64/bugs/527/
// * https://sourceforge.net/p/mingw-w64/bugs/727/
//*/
///*thread_local*/
//tls::pointer<std::unordered_map<decltype(msg_t::id_), buff_t>> recv_caches__;
std::unordered_map<handle_t, queue_t> h2q__;
rw_lock h2q_lc__;
inline queue_t* queue_of(handle_t h) {
if (h == nullptr) {
return nullptr;
}
std::shared_lock<rw_lock> guard { h2q_lc__ };
auto it = h2q__.find(h);
if (it == h2q__.end()) {
return nullptr;
}
if (it->second.elems() == nullptr) {
return nullptr;
}
return &(it->second);
} }
inline std::atomic_size_t* acc_of(queue_t* queue) { constexpr std::atomic_size_t* acc_of(queue_t* queue) {
auto elems = queue->elems(); return reinterpret_cast<std::atomic_size_t*>(queue->elems()) - 1;
return reinterpret_cast<std::atomic_size_t*>(elems) - 1;
} }
} // internal-linkage } // internal-linkage
@ -79,40 +54,21 @@ buff_t make_buff(void const * data, std::size_t size) {
} }
handle_t connect(char const * name) { handle_t connect(char const * name) {
auto h = shm::acquire(name, sizeof(shm_info_t)); auto mem = shm::acquire(name, sizeof(shm_info_t));
if (h == nullptr) {
return nullptr;
}
guard_t h_guard {
h, [](handle_t h) { shm::release(h, sizeof(shm_info_t)); }
};
auto mem = shm::open(h);
if (mem == nullptr) { if (mem == nullptr) {
return nullptr; return nullptr;
} }
{ return new queue_t { &(static_cast<shm_info_t*>(mem)->elems_) };
std::unique_lock<rw_lock> guard { h2q_lc__ };
h2q__[h].attach(&(static_cast<shm_info_t*>(mem)->elems_));
}
h_guard.release();
return h;
} }
void disconnect(handle_t h) { void disconnect(handle_t h) {
if (h == nullptr) { queue_t* que = queue_of(h);
if (que == nullptr) {
return; return;
} }
void* mem = nullptr; que->disconnect(); // needn't to detach, cause it will be deleted soon.
{ shm::release(acc_of(que), sizeof(shm_info_t));
std::unique_lock<rw_lock> guard { h2q_lc__ }; delete que;
auto it = h2q__.find(h);
if (it == h2q__.end()) return;
it->second.disconnect();
mem = it->second.elems(); // needn't to detach
h2q__.erase(it);
}
shm::close(mem);
shm::release(h, sizeof(queue_t));
} }
std::size_t recv_count(handle_t h) { std::size_t recv_count(handle_t h) {
@ -161,17 +117,29 @@ bool send(handle_t h, void const * data, std::size_t size) {
} }
buff_t recv(handle_t h) { buff_t recv(handle_t h) {
auto queue = queue_of(h); return recv(&h, 1);
if (queue == nullptr) { }
buff_t recv(handle_t const * hs, std::size_t size) {
thread_local std::vector<queue_t*> q_arr(size);
q_arr.clear(); // make the size to 0
for (size_t i = 0; i < size; ++i) {
auto queue = queue_of(hs[i]);
if (queue == nullptr) continue;
queue->connect(); // wouldn't connect twice
q_arr.push_back(queue);
}
if (q_arr.empty()) {
return {}; return {};
} }
if (!queue->connected()) { /*
queue->connect(); * the performance of tls::pointer is not good enough
} * so regardless of the mingw-crash-problem for the moment
*/
thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rcs; thread_local std::unordered_map<decltype(msg_t::id_), buff_t> rcs;
while(1) { while(1) {
// pop a new message // pop a new message
auto msg = queue->pop(); auto msg = queue_t::multi_pop(q_arr);
// msg.remain_ may minus & abs(msg.remain_) < data_length // msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::size_t>( std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_); static_cast<int>(data_length) + msg.remain_);
@ -201,79 +169,79 @@ buff_t recv(handle_t h) {
} }
} }
class channel::channel_ : public pimpl<channel_> { class route::route_ : public pimpl<route_> {
public: public:
handle_t h_ = nullptr; handle_t h_ = nullptr;
std::string n_; std::string n_;
}; };
channel::channel() route::route()
: p_(p_->make()) { : p_(p_->make()) {
} }
channel::channel(char const * name) route::route(char const * name)
: channel() { : route() {
this->connect(name); this->connect(name);
} }
channel::channel(channel&& rhs) route::route(route&& rhs)
: channel() { : route() {
swap(rhs); swap(rhs);
} }
channel::~channel() { route::~route() {
p_->clear(); p_->clear();
} }
void channel::swap(channel& rhs) { void route::swap(route& rhs) {
std::swap(p_, rhs.p_); std::swap(p_, rhs.p_);
} }
channel& channel::operator=(channel rhs) { route& route::operator=(route rhs) {
swap(rhs); swap(rhs);
return *this; return *this;
} }
bool channel::valid() const { bool route::valid() const {
return (impl(p_)->h_ != nullptr); return (impl(p_)->h_ != nullptr);
} }
char const * channel::name() const { char const * route::name() const {
return impl(p_)->n_.c_str(); return impl(p_)->n_.c_str();
} }
channel channel::clone() const { route route::clone() const {
return { name() }; return { name() };
} }
bool channel::connect(char const * name) { bool route::connect(char const * name) {
if (name == nullptr || name[0] == '\0') return false; if (name == nullptr || name[0] == '\0') return false;
this->disconnect(); this->disconnect();
impl(p_)->h_ = ipc::connect((impl(p_)->n_ = name).c_str()); impl(p_)->h_ = ipc::connect((impl(p_)->n_ = name).c_str());
return valid(); return valid();
} }
void channel::disconnect() { void route::disconnect() {
ipc::disconnect(impl(p_)->h_); ipc::disconnect(impl(p_)->h_);
} }
std::size_t channel::recv_count() const { std::size_t route::recv_count() const {
return ipc::recv_count(impl(p_)->h_); return ipc::recv_count(impl(p_)->h_);
} }
bool channel::send(void const *data, std::size_t size) { bool route::send(void const *data, std::size_t size) {
return ipc::send(impl(p_)->h_, data, size); return ipc::send(impl(p_)->h_, data, size);
} }
bool channel::send(buff_t const & buff) { bool route::send(buff_t const & buff) {
return channel::send(buff.data(), buff.size()); return route::send(buff.data(), buff.size());
} }
bool channel::send(std::string const & str) { bool route::send(std::string const & str) {
return channel::send(str.c_str(), str.size() + 1); return route::send(str.c_str(), str.size() + 1);
} }
buff_t channel::recv() { buff_t route::recv() {
return ipc::recv(impl(p_)->h_); return ipc::recv(impl(p_)->h_);
} }

View File

@ -9,7 +9,7 @@
namespace ipc { namespace ipc {
namespace shm { namespace shm {
handle_t acquire(char const * name, std::size_t size) { void* acquire(char const * name, std::size_t size) {
if (name == nullptr || name[0] == '\0' || size == 0) { if (name == nullptr || name[0] == '\0' || size == 0) {
return nullptr; return nullptr;
} }
@ -32,18 +32,11 @@ handle_t acquire(char const * name, std::size_t size) {
return mem; return mem;
} }
void release(handle_t h, std::size_t size) { void release(void* mem, std::size_t size) {
if (h == nullptr) { if (mem == nullptr) {
return; return;
} }
::munmap(h, size); ::munmap(mem, size);
}
void* open(handle_t h) {
return h;
}
void close(void* /*mem*/) {
} }
} // namespace shm } // namespace shm

View File

@ -7,6 +7,7 @@
#include <locale> #include <locale>
#include <codecvt> #include <codecvt>
#include <utility> #include <utility>
#include <unordered_map>
#include "def.h" #include "def.h"
@ -25,50 +26,44 @@ constexpr auto to_tchar(std::string && str) -> IsSame<T, std::wstring> {
return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str)); return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str));
} }
std::unordered_map<void*, HANDLE> m2h__;
} // internal-linkage } // internal-linkage
namespace ipc { namespace ipc {
namespace shm { namespace shm {
handle_t acquire(char const * name, std::size_t size) { void* acquire(char const * name, std::size_t size) {
if (name == nullptr || name[0] == '\0' || size == 0) { if (name == nullptr || name[0] == '\0' || size == 0) {
return nullptr; return nullptr;
} }
HANDLE h = ::CreateFileMapping( HANDLE h = ::CreateFileMapping(INVALID_HANDLE_VALUE, NULL,
INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE | SEC_COMMIT,
PAGE_READWRITE | SEC_COMMIT, 0, static_cast<DWORD>(size),
0, static_cast<DWORD>(size), to_tchar(std::string{"__SHM__"} + name).c_str());
to_tchar(std::string{"__SHM__"} + name).c_str()
);
if (h == NULL) { if (h == NULL) {
return nullptr; return nullptr;
} }
return h;
}
void release(handle_t h, std::size_t /*size*/) {
if (h == nullptr) {
return;
}
::CloseHandle(h);
}
void* open(handle_t h) {
if (h == nullptr) {
return nullptr;
}
LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0); LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (mem == NULL) { if (mem == NULL) {
::CloseHandle(h);
return nullptr; return nullptr;
} }
m2h__.emplace(mem, h);
return mem; return mem;
} }
void close(void* mem) { void release(void* mem, std::size_t /*size*/) {
if (mem == nullptr) { if (mem == nullptr) {
return; return;
} }
auto it = m2h__.find(mem);
if (it == m2h__.end()) {
return;
}
::UnmapViewOfFile(mem); ::UnmapViewOfFile(mem);
::CloseHandle(it->second);
m2h__.erase(it);
} }
} // namespace shm } // namespace shm

View File

@ -10,9 +10,8 @@ namespace shm {
class handle::handle_ : public pimpl<handle_> { class handle::handle_ : public pimpl<handle_> {
public: public:
handle* t_ = nullptr; handle* t_ = nullptr;
handle_t h_ = nullptr; void* m_ = nullptr;
void* m_ = nullptr;
std::string n_ {}; std::string n_ {};
std::size_t s_ = 0; std::size_t s_ = 0;
@ -20,10 +19,7 @@ public:
handle_() = default; handle_() = default;
handle_(handle* t) : t_{t} {} handle_(handle* t) : t_{t} {}
~handle_() { ~handle_() { t_->release(); }
t_->close();
t_->release();
}
}; };
handle::handle() handle::handle()
@ -54,7 +50,7 @@ handle& handle::operator=(handle rhs) {
} }
bool handle::valid() const { bool handle::valid() const {
return impl(p_)->h_ != nullptr; return impl(p_)->m_ != nullptr;
} }
std::size_t handle::size() const { std::size_t handle::size() const {
@ -66,33 +62,23 @@ char const * handle::name() const {
} }
bool handle::acquire(char const * name, std::size_t size) { bool handle::acquire(char const * name, std::size_t size) {
close();
release(); release();
impl(p_)->h_ = shm::acquire((impl(p_)->n_ = name).c_str(), impl(p_)->m_ = shm::acquire((impl(p_)->n_ = name).c_str(),
impl(p_)->s_ = size); impl(p_)->s_ = size);
return valid(); return valid();
} }
void handle::release() { void handle::release() {
if (!valid()) return; if (!valid()) return;
shm::release(impl(p_)->h_, impl(p_)->s_); shm::release(impl(p_)->m_, impl(p_)->s_);
impl(p_)->h_ = nullptr; impl(p_)->m_ = nullptr;
impl(p_)->s_ = 0; impl(p_)->s_ = 0;
impl(p_)->n_.clear(); impl(p_)->n_.clear();
} }
void* handle::get() { void* handle::get() const {
if (!valid()) return nullptr; if (!valid()) return nullptr;
if (impl(p_)->m_ == nullptr) { return impl(p_)->m_;
return impl(p_)->m_ = shm::open(impl(p_)->h_);
}
else return impl(p_)->m_;
}
void handle::close() {
if (!valid()) return;
shm::close(impl(p_)->m_);
impl(p_)->m_ = nullptr;
} }
} // namespace shm } // namespace shm

View File

@ -123,18 +123,16 @@ struct test_cq<ipc::circ::queue<T>> {
::new (ca_) ca_t; ::new (ca_) ca_t;
} }
cn_t connect() { cn_t* connect() {
cn_t queue; cn_t* queue = new cn_t { ca_ };
[&] { [&] { QVERIFY(queue->connect() != ipc::error_count); } ();
queue.attach(ca_);
QVERIFY(queue.connect() != ipc::error_count);
} ();
return queue; return queue;
} }
void disconnect(cn_t& queue) { void disconnect(cn_t* queue) {
QVERIFY(queue.disconnect() != ipc::error_count); QVERIFY(queue->disconnect() != ipc::error_count);
QVERIFY(queue.detach() != nullptr); QVERIFY(queue->detach() != nullptr);
delete queue;
} }
void wait_start(int M) { void wait_start(int M) {
@ -144,9 +142,9 @@ struct test_cq<ipc::circ::queue<T>> {
} }
template <typename F> template <typename F>
void recv(cn_t& queue, F&& proc) { void recv(cn_t* queue, F&& proc) {
do { do {
auto msg = queue.pop(); auto msg = queue->pop();
if (msg.pid_ < 0) return; if (msg.pid_ < 0) return;
proc(msg); proc(msg);
} while(1); } while(1);
@ -173,9 +171,7 @@ private slots:
void test_inst(); void test_inst();
void test_prod_cons_1v1(); void test_prod_cons_1v1();
void test_prod_cons_1v3(); void test_prod_cons_1v3();
void test_prod_cons_3v1();
void test_prod_cons_performance(); void test_prod_cons_performance();
void test_queue(); void test_queue();
} unit__; } unit__;
@ -223,10 +219,6 @@ void Unit::test_prod_cons_1v3() {
test_prod_cons<1, 3>(); test_prod_cons<1, 3>();
} }
void Unit::test_prod_cons_3v1() {
test_prod_cons<3, 1>();
}
template <int P, int C> template <int P, int C>
struct test_performance { struct test_performance {
static void start() { static void start() {
@ -259,10 +251,8 @@ struct test_performance<1, 1> {
}; };
void Unit::test_prod_cons_performance() { void Unit::test_prod_cons_performance() {
test_performance<1 , 10>::start(); test_performance<1, 10>::start();
test_performance<10, 1 >::start(); test_prod_cons <1, 10>(); // test & verify
test_performance<10, 10>::start();
test_prod_cons <3 , 3 >(); // test & verify
} }
#ifndef QVERIFY_EXCEPTION_THROWN #ifndef QVERIFY_EXCEPTION_THROWN
@ -299,9 +289,10 @@ void Unit::test_queue() {
queue.attach(cq); queue.attach(cq);
QVERIFY(queue.detach() != nullptr); QVERIFY(queue.detach() != nullptr);
benchmark_prod_cons<1, 3, LoopCount>((ipc::circ::queue<msg_t>*)nullptr); benchmark_prod_cons<1, 1, LoopCount>((ipc::circ::queue<msg_t>*)nullptr);
benchmark_prod_cons<3, 1, LoopCount>((ipc::circ::queue<msg_t>*)nullptr); benchmark_prod_cons<1, 2, LoopCount>((ipc::circ::queue<msg_t>*)nullptr);
benchmark_prod_cons<3, 3, LoopCount>((ipc::circ::queue<msg_t>*)nullptr); benchmark_prod_cons<1, 4, LoopCount>((ipc::circ::queue<msg_t>*)nullptr);
benchmark_prod_cons<1, 8, LoopCount>((ipc::circ::queue<msg_t>*)nullptr);
} }
} // internal-linkage } // internal-linkage

View File

@ -38,13 +38,13 @@ constexpr int LoopCount = 100000;
} // internal-linkage } // internal-linkage
template <> template <>
struct test_cq<ipc::channel> { struct test_cq<ipc::route> {
using cn_t = ipc::channel; using cn_t = ipc::route;
std::string conn_name_; std::string conn_name_;
test_cq(void*) test_cq(void*)
: conn_name_("test-ipc-channel") { : conn_name_("test-ipc-route") {
auto watcher = connect(); auto watcher = connect();
QCOMPARE(watcher.recv_count(), static_cast<std::size_t>(0)); QCOMPARE(watcher.recv_count(), static_cast<std::size_t>(0));
} }
@ -84,7 +84,7 @@ struct test_cq<ipc::channel> {
}; };
template <> template <>
struct test_verify<ipc::channel> { struct test_verify<ipc::route> {
std::unordered_map<int, std::vector<ipc::buff_t>> list_; std::unordered_map<int, std::vector<ipc::buff_t>> list_;
int lcount_; int lcount_;
@ -121,8 +121,8 @@ private slots:
void test_rw_lock(); void test_rw_lock();
void test_send_recv(); void test_send_recv();
void test_channel(); void test_route();
void test_channel_performance(); void test_route_performance();
} unit__; } unit__;
#include "test_ipc.moc" #include "test_ipc.moc"
@ -264,9 +264,9 @@ void Unit::test_send_recv() {
ipc::disconnect(h); ipc::disconnect(h);
} }
void Unit::test_channel() { void Unit::test_route() {
auto wait_for_handshake = [](int id) { auto wait_for_handshake = [](int id) {
ipc::channel cc { "my-ipc-channel" }; ipc::route cc { "my-ipc-route" };
std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm; std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm;
std::atomic_bool unmatched { true }; std::atomic_bool unmatched { true };
std::thread re {[&] { std::thread re {[&] {
@ -347,7 +347,7 @@ void Unit::test_channel() {
template <int N, int M, bool V = true, int Loops = LoopCount> template <int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() { void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, ipc::channel, void>>((ipc::channel*)nullptr); benchmark_prod_cons<N, M, Loops, std::conditional_t<V, ipc::route, void>>((ipc::route*)nullptr);
} }
template <int P, int C> template <int P, int C>
@ -381,12 +381,9 @@ struct test_performance<1, 1> {
} }
}; };
void Unit::test_channel_performance() { void Unit::test_route_performance() {
test_prod_cons<1, 1>(); test_prod_cons<1, 1>();
test_performance<1, 10>::start();
// test_performance<1, 8>::start();
// test_performance<8, 1>::start();
// test_performance<8, 8>::start();
} }
} // internal-linkage } // internal-linkage

View File

@ -62,8 +62,8 @@ void Unit::test_get() {
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0); QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0);
shm_hd__.close(); handle shm_other(shm_hd__.name(), shm_hd__.size());
QVERIFY(mem == shm_hd__.get()); QVERIFY(shm_other.get() != shm_hd__.get());
} }
void Unit::test_hello() { void Unit::test_hello() {
@ -72,7 +72,6 @@ void Unit::test_hello() {
constexpr char hello[] = "hello!"; constexpr char hello[] = "hello!";
std::memcpy(mem, hello, sizeof(hello)); std::memcpy(mem, hello, sizeof(hello));
shm_hd__.close();
QCOMPARE((char*)shm_hd__.get(), hello); QCOMPARE((char*)shm_hd__.get(), hello);
} }
@ -81,7 +80,6 @@ void Unit::test_mt() {
[] { [] {
handle shm_mt(shm_hd__.name(), shm_hd__.size()); handle shm_mt(shm_hd__.name(), shm_hd__.size());
shm_hd__.close();
shm_hd__.release(); shm_hd__.release();
constexpr char hello[] = "hello!"; constexpr char hello[] = "hello!";