diff --git a/include/buffer.h b/include/buffer.h index 1835ba5..27fcd4c 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -3,6 +3,7 @@ #include #include #include +#include #include "export.h" #include "def.h" @@ -35,6 +36,16 @@ public: void * data() noexcept; void const * data() const noexcept; + template + auto data() noexcept -> std::enable_if_t, T*> { + return static_cast(data()); + } + + template + auto data() const noexcept -> std::enable_if_t, T*> { + return static_cast(data()); + } + std::size_t size() const noexcept; std::tuple to_tuple() { diff --git a/include/circ_queue.h b/include/circ_queue.h index 5d5b04f..58da813 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -99,27 +99,19 @@ public: }); } - static auto pop(queue* que) noexcept { - std::tuple ret; - if (que == nullptr) { - return ret; + T pop() noexcept { + if (elems_ == nullptr) { + return {}; } - if (que->elems_ == nullptr) { - return ret; - } - for (unsigned k = 0; que->elems_->cursor() == que->cursor_;) { + T item; + for (unsigned k = 0;;) { + if (elems_->pop(cursor_, [&item](void* p) { + ::new (&item) T(std::move(*static_cast(p))); + })) { + return item; + } ipc::sleep(k); } - if (!que->elems_->pop(que->cursor_, [&ret](void* p) { - ::new (&std::get<1>(ret)) T(std::move(*static_cast(p))); - })) { - return ret; - } - return ret; - } - - T pop() noexcept { - return std::get<1>(pop(this)); } }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 6c06907..ae1c641 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -183,10 +183,9 @@ static buff_t recv(handle_t h) { auto& rc = recv_cache(); while (1) { // pop a new message - auto tp = que->pop(que); - auto& msg = std::get<1>(tp); - if (msg.que_ == std::get<0>(tp)) return {}; + auto msg = que->pop(); if (msg.id_ == 0) return {}; + if (msg.que_ == que) continue; // pop next // msg.remain_ may minus & abs(msg.remain_) < data_length std::size_t remain = static_cast( static_cast(data_length) + msg.remain_); diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index f6c1031..768250b 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -30,6 +30,7 @@ std::vector datas__; constexpr int DataMin = 2; constexpr int DataMax = 256; constexpr int LoopCount = 100000; +//constexpr int LoopCount = 10000; } // internal-linkage @@ -439,10 +440,11 @@ void Unit::test_channel_rtt() { for (std::size_t i = 0;; ++i) { auto dd = cc.recv(); if (dd.size() < 2) return; -// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; + //std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; while (!cc.send(ipc::buff_t('a'))) { cc.wait_for_recv(1); } + //std::cout << "sent ack." << std::endl; } }}; @@ -451,12 +453,13 @@ void Unit::test_channel_rtt() { cc.wait_for_recv(1); sw.start(); for (std::size_t i = 0; i < LoopCount; ++i) { -// std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; + //std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; cc.send(datas__[i]); /*auto dd = */cc.recv(); -// if (dd.size() != 1 || dd[0] != 'a') { -// QVERIFY(false); -// } + //if (dd.size() != 1 || dd.data()[0] != 'a') { + // std::cout << "recv ack fail: " << i << "-[" << dd.size() << "]" << std::endl; + // QVERIFY(false); + //} } cc.send(ipc::buff_t('\0')); t1.join();