mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
fix some bugs
This commit is contained in:
parent
d6afba1d7a
commit
7982eb94c9
33
src/ipc.cpp
33
src/ipc.cpp
@ -9,6 +9,7 @@
|
||||
#include <utility>
|
||||
#include <shared_mutex>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
|
||||
#include "def.h"
|
||||
#include "circ_queue.h"
|
||||
@ -21,15 +22,22 @@ using namespace ipc;
|
||||
|
||||
using data_t = byte_t[data_length];
|
||||
|
||||
#pragma pack(1)
|
||||
struct msg_t {
|
||||
int remain_;
|
||||
unsigned id_;
|
||||
data_t data_;
|
||||
int remain_;
|
||||
std::size_t id_;
|
||||
data_t data_;
|
||||
};
|
||||
#pragma pack()
|
||||
|
||||
using queue_t = circ::queue<msg_t>;
|
||||
using guard_t = std::unique_ptr<std::remove_pointer_t<handle_t>, void(*)(handle_t)>;
|
||||
|
||||
struct shm_info_t {
|
||||
std::atomic_size_t id_acc_; // message id accumulator
|
||||
queue_t::array_t elems_; // the circ_elem_array in shm
|
||||
};
|
||||
|
||||
/*
|
||||
* thread_local stl object's destructor causing crash
|
||||
* See: https://sourceforge.net/p/mingw-w64/bugs/527/
|
||||
@ -41,7 +49,7 @@ tls::pointer<std::unordered_map<decltype(msg_t::id_), std::vector<byte_t>>> recv
|
||||
std::unordered_map<handle_t, queue_t> h2q__;
|
||||
rw_lock h2q_lc__;
|
||||
|
||||
queue_t* queue_of(handle_t h) {
|
||||
inline queue_t* queue_of(handle_t h) {
|
||||
if (h == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
@ -56,17 +64,22 @@ queue_t* queue_of(handle_t h) {
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
inline std::atomic_size_t* acc_of(queue_t* queue) {
|
||||
auto elems = queue->elems();
|
||||
return reinterpret_cast<std::atomic_size_t*>(elems) - 1;
|
||||
}
|
||||
|
||||
} // internal-linkage
|
||||
|
||||
namespace ipc {
|
||||
|
||||
handle_t connect(char const * name) {
|
||||
auto h = shm::acquire(name, sizeof(queue_t));
|
||||
auto h = shm::acquire(name, sizeof(shm_info_t));
|
||||
if (h == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
guard_t h_guard {
|
||||
h, [](handle_t h) { shm::release(h, sizeof(queue_t)); }
|
||||
h, [](handle_t h) { shm::release(h, sizeof(shm_info_t)); }
|
||||
};
|
||||
auto mem = shm::open(h);
|
||||
if (mem == nullptr) {
|
||||
@ -74,7 +87,7 @@ handle_t connect(char const * name) {
|
||||
}
|
||||
{
|
||||
std::unique_lock<rw_lock> guard { h2q_lc__ };
|
||||
h2q__[h].attach(static_cast<queue_t::array_t*>(mem));
|
||||
h2q__[h].attach(&(static_cast<shm_info_t*>(mem)->elems_));
|
||||
}
|
||||
h_guard.release();
|
||||
return h;
|
||||
@ -105,8 +118,9 @@ bool send(handle_t h, void* data, std::size_t size) {
|
||||
if (queue == nullptr) {
|
||||
return false;
|
||||
}
|
||||
static unsigned msg_id = 0;
|
||||
++msg_id; // calc a new message id, atomic is unnecessary
|
||||
// calc a new message id
|
||||
auto msg_id = acc_of(queue)->fetch_add(1, std::memory_order_relaxed);
|
||||
// push message fragment, one fragment size is data_length
|
||||
int offset = 0;
|
||||
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
||||
msg_t msg {
|
||||
@ -116,6 +130,7 @@ bool send(handle_t h, void* data, std::size_t size) {
|
||||
std::memcpy(msg.data_, static_cast<byte_t*>(data) + offset, data_length);
|
||||
queue->push(msg);
|
||||
}
|
||||
// if remain > 0, this is the last message fragment
|
||||
int remain = static_cast<int>(size) - offset;
|
||||
if (remain > 0) {
|
||||
msg_t msg {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user