From 7982eb94c9a797cf1c2544337e7e9d70faabc339 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 16 Dec 2018 10:33:00 +0800 Subject: [PATCH] fix some bugs --- src/ipc.cpp | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 81a33e9..148b75e 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "def.h" #include "circ_queue.h" @@ -21,15 +22,22 @@ using namespace ipc; using data_t = byte_t[data_length]; +#pragma pack(1) struct msg_t { - int remain_; - unsigned id_; - data_t data_; + int remain_; + std::size_t id_; + data_t data_; }; +#pragma pack() using queue_t = circ::queue; using guard_t = std::unique_ptr, void(*)(handle_t)>; +struct shm_info_t { + std::atomic_size_t id_acc_; // message id accumulator + queue_t::array_t elems_; // the circ_elem_array in shm +}; + /* * thread_local stl object's destructor causing crash * See: https://sourceforge.net/p/mingw-w64/bugs/527/ @@ -41,7 +49,7 @@ tls::pointer>> recv std::unordered_map h2q__; rw_lock h2q_lc__; -queue_t* queue_of(handle_t h) { +inline queue_t* queue_of(handle_t h) { if (h == nullptr) { return nullptr; } @@ -56,17 +64,22 @@ queue_t* queue_of(handle_t h) { return &(it->second); } +inline std::atomic_size_t* acc_of(queue_t* queue) { + auto elems = queue->elems(); + return reinterpret_cast(elems) - 1; +} + } // internal-linkage namespace ipc { handle_t connect(char const * name) { - auto h = shm::acquire(name, sizeof(queue_t)); + auto h = shm::acquire(name, sizeof(shm_info_t)); if (h == nullptr) { return nullptr; } guard_t h_guard { - h, [](handle_t h) { shm::release(h, sizeof(queue_t)); } + h, [](handle_t h) { shm::release(h, sizeof(shm_info_t)); } }; auto mem = shm::open(h); if (mem == nullptr) { @@ -74,7 +87,7 @@ handle_t connect(char const * name) { } { std::unique_lock guard { h2q_lc__ }; - h2q__[h].attach(static_cast(mem)); + h2q__[h].attach(&(static_cast(mem)->elems_)); } h_guard.release(); return h; @@ -105,8 +118,9 @@ bool send(handle_t h, void* data, std::size_t size) { if (queue == nullptr) { return false; } - static unsigned msg_id = 0; - ++msg_id; // calc a new message id, atomic is unnecessary + // calc a new message id + auto msg_id = acc_of(queue)->fetch_add(1, std::memory_order_relaxed); + // push message fragment, one fragment size is data_length int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { msg_t msg { @@ -116,6 +130,7 @@ bool send(handle_t h, void* data, std::size_t size) { std::memcpy(msg.data_, static_cast(data) + offset, data_length); queue->push(msg); } + // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; if (remain > 0) { msg_t msg {