mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
fix clang-test crash
This commit is contained in:
parent
4c75b44547
commit
69449c0d4f
@ -21,6 +21,8 @@ IPC_EXPORT handle_t connect (char const * name);
|
||||
IPC_EXPORT void disconnect(handle_t h);
|
||||
|
||||
IPC_EXPORT std::size_t recv_count(handle_t h);
|
||||
IPC_EXPORT void clear_recv(handle_t h);
|
||||
IPC_EXPORT void clear_recv(char const * name);
|
||||
|
||||
IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size);
|
||||
IPC_EXPORT buff_t recv(handle_t h);
|
||||
|
||||
44
src/ipc.cpp
44
src/ipc.cpp
@ -38,8 +38,8 @@ constexpr queue_t* queue_of(handle_t h) {
|
||||
return static_cast<queue_t*>(h);
|
||||
}
|
||||
|
||||
inline std::atomic_size_t* acc_of(queue_t* queue) {
|
||||
return reinterpret_cast<std::atomic_size_t*>(queue->elems()) - 1;
|
||||
inline std::atomic_size_t* acc_of(queue_t* que) {
|
||||
return reinterpret_cast<std::atomic_size_t*>(que->elems()) - 1;
|
||||
}
|
||||
|
||||
} // internal-linkage
|
||||
@ -72,11 +72,25 @@ void disconnect(handle_t h) {
|
||||
}
|
||||
|
||||
std::size_t recv_count(handle_t h) {
|
||||
auto queue = queue_of(h);
|
||||
if (queue == nullptr) {
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return error_count;
|
||||
}
|
||||
return queue->conn_count();
|
||||
return que->conn_count();
|
||||
}
|
||||
|
||||
void clear_recv(handle_t h) {
|
||||
auto* head = acc_of(queue_of(h));
|
||||
if (head == nullptr) {
|
||||
return;
|
||||
}
|
||||
std::memset(head, 0, sizeof(shm_info_t));
|
||||
}
|
||||
|
||||
void clear_recv(char const * name) {
|
||||
auto h = ipc::connect(name);
|
||||
ipc::clear_recv(h);
|
||||
ipc::disconnect(h);
|
||||
}
|
||||
|
||||
bool send(handle_t h, void const * data, std::size_t size) {
|
||||
@ -86,12 +100,12 @@ bool send(handle_t h, void const * data, std::size_t size) {
|
||||
if (size == 0) {
|
||||
return false;
|
||||
}
|
||||
auto queue = queue_of(h);
|
||||
if (queue == nullptr) {
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return false;
|
||||
}
|
||||
// calc a new message id
|
||||
auto msg_id = acc_of(queue)->fetch_add(1, std::memory_order_relaxed);
|
||||
auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed);
|
||||
// push message fragment, one fragment size is data_length
|
||||
int offset = 0;
|
||||
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
||||
@ -100,7 +114,7 @@ bool send(handle_t h, void const * data, std::size_t size) {
|
||||
msg_id, { 0 }
|
||||
};
|
||||
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length);
|
||||
queue->push(msg);
|
||||
que->push(msg);
|
||||
}
|
||||
// if remain > 0, this is the last message fragment
|
||||
int remain = static_cast<int>(size) - offset;
|
||||
@ -111,7 +125,7 @@ bool send(handle_t h, void const * data, std::size_t size) {
|
||||
};
|
||||
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
|
||||
static_cast<std::size_t>(remain));
|
||||
queue->push(msg);
|
||||
que->push(msg);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -124,10 +138,10 @@ buff_t recv(handle_t const * hs, std::size_t size) {
|
||||
thread_local std::vector<queue_t*> q_arr(size);
|
||||
q_arr.clear(); // make the size to 0
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto queue = queue_of(hs[i]);
|
||||
if (queue == nullptr) continue;
|
||||
queue->connect(); // wouldn't connect twice
|
||||
q_arr.push_back(queue);
|
||||
auto que = queue_of(hs[i]);
|
||||
if (que == nullptr) continue;
|
||||
que->connect(); // wouldn't connect twice
|
||||
q_arr.push_back(que);
|
||||
}
|
||||
if (q_arr.empty()) {
|
||||
return {};
|
||||
@ -190,6 +204,7 @@ route::route(route&& rhs)
|
||||
}
|
||||
|
||||
route::~route() {
|
||||
disconnect();
|
||||
p_->clear();
|
||||
}
|
||||
|
||||
@ -223,6 +238,7 @@ bool route::connect(char const * name) {
|
||||
|
||||
void route::disconnect() {
|
||||
ipc::disconnect(impl(p_)->h_);
|
||||
impl(p_)->h_ = nullptr;
|
||||
}
|
||||
|
||||
std::size_t route::recv_count() const {
|
||||
|
||||
13
src/shm.cpp
13
src/shm.cpp
@ -10,20 +10,14 @@ namespace shm {
|
||||
|
||||
class handle::handle_ : public pimpl<handle_> {
|
||||
public:
|
||||
handle* t_ = nullptr;
|
||||
void* m_ = nullptr;
|
||||
void* m_ = nullptr;
|
||||
|
||||
std::string n_ {};
|
||||
std::string n_;
|
||||
std::size_t s_ = 0;
|
||||
|
||||
handle_() = default;
|
||||
handle_(handle* t) : t_{t} {}
|
||||
|
||||
~handle_() { t_->release(); }
|
||||
};
|
||||
|
||||
handle::handle()
|
||||
: p_(p_->make(this)) {
|
||||
: p_(p_->make()) {
|
||||
}
|
||||
|
||||
handle::handle(char const * name, std::size_t size)
|
||||
@ -37,6 +31,7 @@ handle::handle(handle&& rhs)
|
||||
}
|
||||
|
||||
handle::~handle() {
|
||||
release();
|
||||
p_->clear();
|
||||
}
|
||||
|
||||
|
||||
@ -47,8 +47,10 @@ template <>
|
||||
struct test_verify<void> {
|
||||
test_verify (int) {}
|
||||
void prepare (void*) {}
|
||||
void push_data(int, ...) {}
|
||||
void verify (int, int) {}
|
||||
|
||||
template <typename U>
|
||||
void push_data(int, U&&) {}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@ -70,10 +72,7 @@ void benchmark_prod_cons(T* cq) {
|
||||
t = std::thread{[&, cid] {
|
||||
vf.prepare(&t);
|
||||
auto cn = tcq.connect();
|
||||
|
||||
using namespace std::placeholders;
|
||||
tcq.recv(cn, std::bind(&test_verify<V>::push_data, &vf, cid, _1));
|
||||
|
||||
tcq.recv(cn, [&](auto&& msg) { vf.push_data(cid, msg); });
|
||||
tcq.disconnect(cn);
|
||||
if (++fini != std::extent<decltype(consumers)>::value) return;
|
||||
sw.print_elapsed(N, M, Loops);
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
#include <new>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
|
||||
#include "circ_elem_array.h"
|
||||
#include "circ_queue.h"
|
||||
|
||||
@ -45,8 +45,7 @@ struct test_cq<ipc::route> {
|
||||
|
||||
test_cq(void*)
|
||||
: conn_name_("test-ipc-route") {
|
||||
auto watcher = connect();
|
||||
QCOMPARE(watcher.recv_count(), static_cast<std::size_t>(0));
|
||||
ipc::clear_recv(conn_name_.c_str());
|
||||
}
|
||||
|
||||
cn_t connect() {
|
||||
@ -58,8 +57,8 @@ struct test_cq<ipc::route> {
|
||||
}
|
||||
|
||||
void wait_start(int M) {
|
||||
auto watcher = connect();
|
||||
while (watcher.recv_count() != static_cast<std::size_t>(M)) {
|
||||
auto watcher = ipc::connect(conn_name_.c_str());
|
||||
while (ipc::recv_count(watcher) != static_cast<std::size_t>(M)) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
}
|
||||
@ -257,6 +256,7 @@ void Unit::test_rw_lock() {
|
||||
void Unit::test_send_recv() {
|
||||
auto h = ipc::connect("my-ipc");
|
||||
QVERIFY(h != nullptr);
|
||||
ipc::clear_recv(h);
|
||||
char data[] = "hello ipc!";
|
||||
QVERIFY(ipc::send(h, data, sizeof(data)));
|
||||
auto got = ipc::recv(h);
|
||||
@ -265,6 +265,8 @@ void Unit::test_send_recv() {
|
||||
}
|
||||
|
||||
void Unit::test_route() {
|
||||
ipc::clear_recv("my-ipc-route");
|
||||
|
||||
auto wait_for_handshake = [](int id) {
|
||||
ipc::route cc { "my-ipc-route" };
|
||||
std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user