diff --git a/include/ipc.h b/include/ipc.h index f910744..99a8a65 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -10,12 +10,18 @@ namespace ipc { using shm::handle_t; +using buff_t = std::vector; + +IPC_EXPORT buff_t make_buff(void const * data, std::size_t size); + +template +buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); } IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT void disconnect(handle_t h); -IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); -IPC_EXPORT std::vector recv(handle_t h); +IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); +IPC_EXPORT buff_t recv(handle_t h); class IPC_EXPORT channel { public: @@ -37,10 +43,10 @@ public: void disconnect(void); bool send(void const * data, std::size_t size); - bool send(std::vector const & buff); + bool send(buff_t const & buff); bool send(std::string const & str); - std::vector recv(); + buff_t recv(); private: class channel_; diff --git a/include/rw_lock.h b/include/rw_lock.h index aa8351f..d14a114 100644 --- a/include/rw_lock.h +++ b/include/rw_lock.h @@ -107,7 +107,7 @@ public: } // otherwise try cas lc + 1 (set r-lock) else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_acquire)) { - break; + return; } // set r-lock failed, old has been updated } diff --git a/src/ipc.cpp b/src/ipc.cpp index 13be34f..ce2f89f 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -18,7 +18,6 @@ namespace { using namespace ipc; - using data_t = byte_t[data_length]; #pragma pack(1) @@ -43,7 +42,7 @@ struct shm_info_t { * https://sourceforge.net/p/mingw-w64/bugs/727/ */ /*thread_local*/ -tls::pointer>> recv_caches__; +tls::pointer> recv_caches__; std::unordered_map h2q__; rw_lock h2q_lc__; @@ -72,6 +71,13 @@ inline std::atomic_size_t* acc_of(queue_t* queue) { namespace ipc { +buff_t make_buff(void const * data, std::size_t size) { + return { + static_cast(data), + static_cast(data) + size + }; +} + handle_t connect(char const * name) { auto h = shm::acquire(name, sizeof(shm_info_t)); if (h == nullptr) { @@ -137,13 +143,13 @@ bool send(handle_t h, void const * data, std::size_t size) { msg_id, { 0 } }; std::memcpy(msg.data_, static_cast(data) + offset, - static_cast(remain)); + static_cast(remain)); queue->push(msg); } return true; } -std::vector recv(handle_t h) { +buff_t recv(handle_t h) { auto queue = queue_of(h); if (queue == nullptr) { return {}; @@ -155,30 +161,32 @@ std::vector recv(handle_t h) { while(1) { // pop a new message auto msg = queue->pop(); - // remain_ may minus & abs(remain_) < data_length + // msg.remain_ may minus & abs(msg.remain_) < data_length std::size_t remain = static_cast( static_cast(data_length) + msg.remain_); + // find cache with msg.id_ auto cache_it = rcs->find(msg.id_); if (cache_it == rcs->end()) { - std::vector buf(remain); - std::memcpy(buf.data(), msg.data_, remain); - return buf; + if (remain <= data_length) { + return make_buff(msg.data_, remain); + } + // cache the first message fragment + else rcs->emplace(msg.id_, make_buff(msg.data_)); } - // has cache before this message - auto& cache = cache_it->second; - auto last_size = cache.size(); - // this is the last message fragment - if (msg.remain_ <= 0) { - cache.resize(last_size + remain); - std::memcpy(cache.data() + last_size, msg.data_, remain); - // finish this message, erase it from cache - auto buf = std::move(cache); - rcs->erase(cache_it); - return buf; + // has cached before this message + else { + auto& cache = cache_it->second; + // this is the last message fragment + if (msg.remain_ <= 0) { + cache.insert(cache.end(), msg.data_, msg.data_ + remain); + // finish this message, erase it from cache + auto buf = std::move(cache); + rcs->erase(cache_it); + return buf; + } + // there are remain datas after this message + cache.insert(cache.end(), msg.data_, msg.data_ + data_length); } - // there are remain datas after this message - cache.resize(last_size + data_length); - std::memcpy(cache.data() + last_size, msg.data_, data_length); } } @@ -243,7 +251,7 @@ bool channel::send(void const *data, std::size_t size) { return ipc::send(impl(p_)->h_, data, size); } -bool channel::send(std::vector const & buff) { +bool channel::send(buff_t const & buff) { return channel::send(buff.data(), buff.size()); } @@ -251,7 +259,7 @@ bool channel::send(std::string const & str) { return channel::send(str.c_str(), str.size() + 1); } -std::vector channel::recv() { +buff_t channel::recv() { return ipc::recv(impl(p_)->h_); } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 61cc68a..b42d2a5 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -31,7 +31,7 @@ private slots: void test_prod_cons_performance(); void test_queue(); -} /*unit__*/; +} unit__; #include "test_circ.moc" diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index c0a9988..5cd8722 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -138,10 +138,10 @@ void test_performance() { } void Unit::test_rw_lock() { -// test_performance<1, 1>(); -// test_performance<4, 4>(); -// test_performance<1, 8>(); -// test_performance<8, 1>(); + test_performance<1, 1>(); + test_performance<4, 4>(); + test_performance<1, 8>(); + test_performance<8, 1>(); } void Unit::test_send_recv() { @@ -155,33 +155,37 @@ void Unit::test_send_recv() { } void Unit::test_channel() { - auto wait_for_connected = [](int id) { + auto wait_for_handshake = [](int id) { std::string ack = "copy:" + std::to_string(id); ipc::channel cc { "my-ipc-channel" }; std::atomic_bool unmatched { true }; std::thread re {[&] { + bool re_ack = false; do { auto dd = cc.recv(); - std::cout << id << "-recv: " - << std::string { reinterpret_cast(dd.data()), dd.size() } - << "[" << dd.size() << "]" << std::endl; - if ((unmatched = (std::memcmp(dd.data(), ack.c_str(), (std::min)(dd.size(), ack.size())) != 0))) { + QVERIFY(!dd.empty()); + std::string got { reinterpret_cast(dd.data()), dd.size() - 1 }; + std::cout << id << "-recv: " << got << "[" << dd.size() << "]" << std::endl; + if (ack != got) { const char cp[] = "copy:"; if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) { - std::cout << "cc.send(dd)" << std::endl; - cc.send(dd); + std::cout << id << " re_ack cc.send(dd)" << std::endl; + QVERIFY(re_ack = cc.send(dd)); } } - else std::cout << id << " matched!" << std::endl; - } while (unmatched.load(std::memory_order_relaxed)); + else if (unmatched.load(std::memory_order_relaxed)) { + unmatched.store(false, std::memory_order_release); + std::cout << id << " matched!" << std::endl; + } + } while (!re_ack || unmatched.load(std::memory_order_relaxed)); }}; - while (unmatched.load(std::memory_order_relaxed)) { + while (unmatched.load(std::memory_order_acquire)) { if (!cc.send(ack)) { std::cout << "send failed!" << std::endl; unmatched = false; break; } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } re.join(); std::cout << "fini conn " << id << std::endl; @@ -200,16 +204,22 @@ void Unit::test_channel() { }; std::thread t1 {[&] { - auto cc = wait_for_connected(1); - for (std::size_t i = 0; i < datas.size(); ++i) { - auto dd = cc.recv(); + auto cc = wait_for_handshake(1); + const char cp[] = "copy:"; + bool unchecked = true; + for (std::size_t i = 0; i < datas.size(); ++i, unchecked = false) { + ipc::buff_t dd; + do { + dd = cc.recv(); + } while (unchecked && (dd.size() > sizeof(cp)) && + std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0); QCOMPARE(dd.size(), std::strlen(datas[i]) + 1); QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0); } }}; std::thread t2 {[&] { - auto cc = wait_for_connected(2); + auto cc = wait_for_handshake(2); for (std::size_t i = 0; i < datas.size(); ++i) { std::cout << "sending: " << datas[i] << std::endl; cc.send(datas[i]);