diff --git a/include/ipc.h b/include/ipc.h index a3e5480..b0eb043 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -21,6 +21,8 @@ IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT void disconnect(handle_t h); IPC_EXPORT std::size_t recv_count(handle_t h); +IPC_EXPORT void clear_recv(handle_t h); +IPC_EXPORT void clear_recv(char const * name); IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); IPC_EXPORT buff_t recv(handle_t h); diff --git a/src/ipc.cpp b/src/ipc.cpp index 65907df..33f09c1 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -38,8 +38,8 @@ constexpr queue_t* queue_of(handle_t h) { return static_cast(h); } -inline std::atomic_size_t* acc_of(queue_t* queue) { - return reinterpret_cast(queue->elems()) - 1; +inline std::atomic_size_t* acc_of(queue_t* que) { + return reinterpret_cast(que->elems()) - 1; } } // internal-linkage @@ -72,11 +72,25 @@ void disconnect(handle_t h) { } std::size_t recv_count(handle_t h) { - auto queue = queue_of(h); - if (queue == nullptr) { + auto que = queue_of(h); + if (que == nullptr) { return error_count; } - return queue->conn_count(); + return que->conn_count(); +} + +void clear_recv(handle_t h) { + auto* head = acc_of(queue_of(h)); + if (head == nullptr) { + return; + } + std::memset(head, 0, sizeof(shm_info_t)); +} + +void clear_recv(char const * name) { + auto h = ipc::connect(name); + ipc::clear_recv(h); + ipc::disconnect(h); } bool send(handle_t h, void const * data, std::size_t size) { @@ -86,12 +100,12 @@ bool send(handle_t h, void const * data, std::size_t size) { if (size == 0) { return false; } - auto queue = queue_of(h); - if (queue == nullptr) { + auto que = queue_of(h); + if (que == nullptr) { return false; } // calc a new message id - auto msg_id = acc_of(queue)->fetch_add(1, std::memory_order_relaxed); + auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed); // push message fragment, one fragment size is data_length int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { @@ -100,7 +114,7 @@ bool send(handle_t h, void const * data, std::size_t size) { msg_id, { 0 } }; std::memcpy(msg.data_, static_cast(data) + offset, data_length); - queue->push(msg); + que->push(msg); } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; @@ -111,7 +125,7 @@ bool send(handle_t h, void const * data, std::size_t size) { }; std::memcpy(msg.data_, static_cast(data) + offset, static_cast(remain)); - queue->push(msg); + que->push(msg); } return true; } @@ -124,10 +138,10 @@ buff_t recv(handle_t const * hs, std::size_t size) { thread_local std::vector q_arr(size); q_arr.clear(); // make the size to 0 for (size_t i = 0; i < size; ++i) { - auto queue = queue_of(hs[i]); - if (queue == nullptr) continue; - queue->connect(); // wouldn't connect twice - q_arr.push_back(queue); + auto que = queue_of(hs[i]); + if (que == nullptr) continue; + que->connect(); // wouldn't connect twice + q_arr.push_back(que); } if (q_arr.empty()) { return {}; @@ -190,6 +204,7 @@ route::route(route&& rhs) } route::~route() { + disconnect(); p_->clear(); } @@ -223,6 +238,7 @@ bool route::connect(char const * name) { void route::disconnect() { ipc::disconnect(impl(p_)->h_); + impl(p_)->h_ = nullptr; } std::size_t route::recv_count() const { diff --git a/src/shm.cpp b/src/shm.cpp index a59a57c..69e1635 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -10,20 +10,14 @@ namespace shm { class handle::handle_ : public pimpl { public: - handle* t_ = nullptr; - void* m_ = nullptr; + void* m_ = nullptr; - std::string n_ {}; + std::string n_; std::size_t s_ = 0; - - handle_() = default; - handle_(handle* t) : t_{t} {} - - ~handle_() { t_->release(); } }; handle::handle() - : p_(p_->make(this)) { + : p_(p_->make()) { } handle::handle(char const * name, std::size_t size) @@ -37,6 +31,7 @@ handle::handle(handle&& rhs) } handle::~handle() { + release(); p_->clear(); } diff --git a/test/test.h b/test/test.h index cdc7765..696a219 100644 --- a/test/test.h +++ b/test/test.h @@ -47,8 +47,10 @@ template <> struct test_verify { test_verify (int) {} void prepare (void*) {} - void push_data(int, ...) {} void verify (int, int) {} + + template + void push_data(int, U&&) {} }; template @@ -70,10 +72,7 @@ void benchmark_prod_cons(T* cq) { t = std::thread{[&, cid] { vf.prepare(&t); auto cn = tcq.connect(); - - using namespace std::placeholders; - tcq.recv(cn, std::bind(&test_verify::push_data, &vf, cid, _1)); - + tcq.recv(cn, [&](auto&& msg) { vf.push_data(cid, msg); }); tcq.disconnect(cn); if (++fini != std::extent::value) return; sw.print_elapsed(N, M, Loops); diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 1d87e8d..2ba10b6 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include "circ_elem_array.h" #include "circ_queue.h" diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index d3b2621..dbff110 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -45,8 +45,7 @@ struct test_cq { test_cq(void*) : conn_name_("test-ipc-route") { - auto watcher = connect(); - QCOMPARE(watcher.recv_count(), static_cast(0)); + ipc::clear_recv(conn_name_.c_str()); } cn_t connect() { @@ -58,8 +57,8 @@ struct test_cq { } void wait_start(int M) { - auto watcher = connect(); - while (watcher.recv_count() != static_cast(M)) { + auto watcher = ipc::connect(conn_name_.c_str()); + while (ipc::recv_count(watcher) != static_cast(M)) { std::this_thread::yield(); } } @@ -257,6 +256,7 @@ void Unit::test_rw_lock() { void Unit::test_send_recv() { auto h = ipc::connect("my-ipc"); QVERIFY(h != nullptr); + ipc::clear_recv(h); char data[] = "hello ipc!"; QVERIFY(ipc::send(h, data, sizeof(data))); auto got = ipc::recv(h); @@ -265,6 +265,8 @@ void Unit::test_send_recv() { } void Unit::test_route() { + ipc::clear_recv("my-ipc-route"); + auto wait_for_handshake = [](int id) { ipc::route cc { "my-ipc-route" }; std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm;