diff --git a/build/test.pro b/build/test.pro index f0caf3a..0a4a077 100644 --- a/build/test.pro +++ b/build/test.pro @@ -1,6 +1,7 @@ TEMPLATE = app QT += core testlib +QT -= gui CONFIG += console c++14 CONFIG -= app_bundle @@ -17,6 +18,7 @@ HEADERS += \ SOURCES += \ ../test/main.cpp \ ../test/test_shm.cpp \ - ../test/test_circ.cpp + ../test/test_circ.cpp \ + ../test/test_ipc.cpp LIBS += -L$${DESTDIR} -lipc diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index 1cc6339..df838ec 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -65,7 +65,7 @@ private: } static ui_t index_of(uc_t c) { - return static_cast(c & std::numeric_limits::max()); + return static_cast(c); } ui_t index_of(elem_t* el) { @@ -96,7 +96,7 @@ public: while (lc_.exchange(1, std::memory_order_acquire)) { std::this_thread::yield(); } - elem_t* el = elem(wt_.load(std::memory_order_relaxed)); + elem_t* el = elem(index_of(wt_.load(std::memory_order_relaxed))); // check all consumers have finished reading while(1) { std::uint32_t expected = 0; diff --git a/include/ipc.h b/include/ipc.h index dd5a4e4..4c2e7da 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -13,7 +13,24 @@ using shm::handle_t; IPC_EXPORT handle_t connect (std::string const & name); IPC_EXPORT void disconnect(handle_t h); -IPC_EXPORT bool send(handle_t h, byte_t* data, int size); +IPC_EXPORT bool send(handle_t h, void* data, int size); IPC_EXPORT std::vector recv(handle_t h); +class channel_; +class IPC_EXPORT channel { +public: + channel(void); + channel(std::string const & name); + channel(channel&& rhs); + + ~channel(void); + + void swap(channel& rhs); + channel& operator=(channel rhs); + +private: + friend class channel_; + channel_* p_; +}; + } // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 5b388bf..a21d7fa 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "ipc.h" #include "circ_queue.h" @@ -14,8 +15,9 @@ using namespace ipc; using data_t = byte_t[data_length]; struct msg_t { - int remain_; - data_t data_; + int remain_; + unsigned id_; + data_t data_; }; using queue_t = circ::queue; @@ -67,7 +69,7 @@ void disconnect(handle_t h) { h2q__.erase(it); } -bool send(handle_t h, byte_t* data, int size) { +bool send(handle_t h, void* data, int size) { if (data == nullptr) { return false; } @@ -78,51 +80,91 @@ bool send(handle_t h, byte_t* data, int size) { if (queue == nullptr) { return false; } - queue_t drop_box { queue->elems() }; + static unsigned msg_id = 0; + ++msg_id; // calc a new message id int offset = 0; for (int i = 0; i < (size / static_cast(data_length)); ++i, offset += data_length) { msg_t msg { size - offset - static_cast(data_length), - { 0 } + msg_id, { 0 } }; - std::memcpy(msg.data_, data + offset, data_length); - drop_box.push(msg); + std::memcpy(msg.data_, static_cast(data) + offset, data_length); + queue->push(msg); } int remain = size - offset; if (remain > 0) { - msg_t msg { remain - static_cast(data_length), { 0 } }; - std::memcpy(msg.data_, data + offset, static_cast(remain)); - drop_box.push(msg); + msg_t msg { + remain - static_cast(data_length), + msg_id, { 0 } + }; + std::memcpy(msg.data_, static_cast(data) + offset, + static_cast(remain)); + queue->push(msg); } return true; } std::vector recv(handle_t h) { - std::vector all; auto queue = queue_of(h); if (queue == nullptr) { - return all; + return {}; } if (!queue->connected()) { queue->connect(); } + static thread_local std::unordered_map> all; do { auto msg = queue->pop(); - auto last_size = all.size(); + // here comes a new message + auto& cache = all[msg.id_]; // find the cache using message id + auto last_size = cache.size(); if (msg.remain_ > 0) { - all.resize(last_size + data_length); - std::memcpy(all.data() + last_size, msg.data_, data_length); + cache.resize(last_size + data_length); + std::memcpy(cache.data() + last_size, msg.data_, data_length); } else { // remain_ is minus & abs(remain_) < data_length std::size_t remain = static_cast( static_cast(data_length) + msg.remain_); - all.resize(last_size + remain); - std::memcpy(all.data() + last_size, msg.data_, remain); - break; + cache.resize(last_size + remain); + std::memcpy(cache.data() + last_size, msg.data_, remain); + // finish this message, erase it from cache + auto ret { std::move(cache) }; + all.erase(msg.id_); + return std::move(ret); } } while(1); - return all; +} + +class channel_ { +public: +}; + +channel::channel(void) + : p_(new channel_) { +} + +channel::channel(std::string const & /*name*/) + : channel() { + +} + +channel::channel(channel&& rhs) + : channel() { + swap(rhs); +} + +channel::~channel(void) { + delete p_; +} + +void channel::swap(channel& rhs) { + std::swap(p_, rhs.p_); +} + +channel& channel::operator=(channel rhs) { + swap(rhs); + return *this; } } // namespace ipc diff --git a/test/main.cpp b/test/main.cpp index 4d5d5ac..c50fafe 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include "test.h" @@ -18,6 +20,14 @@ TestSuite::TestSuite(void) { _.suites_ << this; } +const char* TestSuite::name(void) const { + return ""; +} + +void TestSuite::initTestCase(void) { + qDebug() << QString("#### Start: %1 ####").arg(name()); +} + int main(int argc, char* argv[]) { QCoreApplication app(argc, argv); Q_UNUSED(app) diff --git a/test/test.h b/test/test.h index 260b2df..5fee10f 100644 --- a/test/test.h +++ b/test/test.h @@ -8,4 +8,10 @@ class TestSuite : public QObject public: explicit TestSuite(void); + +protected: + virtual const char* name(void) const; + +protected slots: + virtual void initTestCase(void); }; diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 95fc5cf..f8f953d 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -9,14 +9,18 @@ #include "circ_elem_array.h" #include "circ_queue.h" -#include "test.h" #include "stopwatch.hpp" +#include "test.h" namespace { class Unit : public TestSuite { Q_OBJECT + const char* name(void) const { + return "test_circ"; + } + private slots: void initTestCase(void); void cleanupTestCase(void); @@ -36,6 +40,7 @@ using cq_t = ipc::circ::elem_array<12>; cq_t* cq__; void Unit::initTestCase(void) { + TestSuite::initTestCase(); cq__ = new cq_t; } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp new file mode 100644 index 0000000..31d1748 --- /dev/null +++ b/test/test_ipc.cpp @@ -0,0 +1,29 @@ +#include "ipc.h" +#include "test.h" + +namespace { + +class Unit : public TestSuite { + Q_OBJECT + + const char* name(void) const { + return "test_ipc"; + } + +private slots: + void test_send_recv(void); +} unit__; + +#include "test_ipc.moc" + +void Unit::test_send_recv(void) { + auto h = ipc::connect("my-ipc"); + QVERIFY(h != nullptr); + char data[] = "hello ipc!"; + QVERIFY(ipc::send(h, data, sizeof(data))); + auto got = ipc::recv(h); + QCOMPARE((char*)got.data(), data); + ipc::disconnect(h); +} + +} // internal-linkage diff --git a/test/test_shm.cpp b/test/test_shm.cpp index 6f2d722..9d91a81 100644 --- a/test/test_shm.cpp +++ b/test/test_shm.cpp @@ -12,6 +12,10 @@ namespace { class Unit : public TestSuite { Q_OBJECT + const char* name(void) const { + return "test_shm"; + } + private slots: void test_acquire(void); void test_release(void);