mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
fix bugs; modify test case; add some helper using & functions
This commit is contained in:
parent
59a9df60c4
commit
d61aa5e5df
@ -10,12 +10,18 @@
|
|||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
using shm::handle_t;
|
using shm::handle_t;
|
||||||
|
using buff_t = std::vector<byte_t>;
|
||||||
|
|
||||||
|
IPC_EXPORT buff_t make_buff(void const * data, std::size_t size);
|
||||||
|
|
||||||
|
template <std::size_t N>
|
||||||
|
buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); }
|
||||||
|
|
||||||
IPC_EXPORT handle_t connect (char const * name);
|
IPC_EXPORT handle_t connect (char const * name);
|
||||||
IPC_EXPORT void disconnect(handle_t h);
|
IPC_EXPORT void disconnect(handle_t h);
|
||||||
|
|
||||||
IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size);
|
IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size);
|
||||||
IPC_EXPORT std::vector<byte_t> recv(handle_t h);
|
IPC_EXPORT buff_t recv(handle_t h);
|
||||||
|
|
||||||
class IPC_EXPORT channel {
|
class IPC_EXPORT channel {
|
||||||
public:
|
public:
|
||||||
@ -37,10 +43,10 @@ public:
|
|||||||
void disconnect(void);
|
void disconnect(void);
|
||||||
|
|
||||||
bool send(void const * data, std::size_t size);
|
bool send(void const * data, std::size_t size);
|
||||||
bool send(std::vector<byte_t> const & buff);
|
bool send(buff_t const & buff);
|
||||||
bool send(std::string const & str);
|
bool send(std::string const & str);
|
||||||
|
|
||||||
std::vector<byte_t> recv();
|
buff_t recv();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class channel_;
|
class channel_;
|
||||||
|
|||||||
@ -107,7 +107,7 @@ public:
|
|||||||
}
|
}
|
||||||
// otherwise try cas lc + 1 (set r-lock)
|
// otherwise try cas lc + 1 (set r-lock)
|
||||||
else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_acquire)) {
|
else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_acquire)) {
|
||||||
break;
|
return;
|
||||||
}
|
}
|
||||||
// set r-lock failed, old has been updated
|
// set r-lock failed, old has been updated
|
||||||
}
|
}
|
||||||
|
|||||||
56
src/ipc.cpp
56
src/ipc.cpp
@ -18,7 +18,6 @@
|
|||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using namespace ipc;
|
using namespace ipc;
|
||||||
|
|
||||||
using data_t = byte_t[data_length];
|
using data_t = byte_t[data_length];
|
||||||
|
|
||||||
#pragma pack(1)
|
#pragma pack(1)
|
||||||
@ -43,7 +42,7 @@ struct shm_info_t {
|
|||||||
* https://sourceforge.net/p/mingw-w64/bugs/727/
|
* https://sourceforge.net/p/mingw-w64/bugs/727/
|
||||||
*/
|
*/
|
||||||
/*thread_local*/
|
/*thread_local*/
|
||||||
tls::pointer<std::unordered_map<decltype(msg_t::id_), std::vector<byte_t>>> recv_caches__;
|
tls::pointer<std::unordered_map<decltype(msg_t::id_), buff_t>> recv_caches__;
|
||||||
|
|
||||||
std::unordered_map<handle_t, queue_t> h2q__;
|
std::unordered_map<handle_t, queue_t> h2q__;
|
||||||
rw_lock h2q_lc__;
|
rw_lock h2q_lc__;
|
||||||
@ -72,6 +71,13 @@ inline std::atomic_size_t* acc_of(queue_t* queue) {
|
|||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
|
buff_t make_buff(void const * data, std::size_t size) {
|
||||||
|
return {
|
||||||
|
static_cast<buff_t::value_type const *>(data),
|
||||||
|
static_cast<buff_t::value_type const *>(data) + size
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
handle_t connect(char const * name) {
|
handle_t connect(char const * name) {
|
||||||
auto h = shm::acquire(name, sizeof(shm_info_t));
|
auto h = shm::acquire(name, sizeof(shm_info_t));
|
||||||
if (h == nullptr) {
|
if (h == nullptr) {
|
||||||
@ -137,13 +143,13 @@ bool send(handle_t h, void const * data, std::size_t size) {
|
|||||||
msg_id, { 0 }
|
msg_id, { 0 }
|
||||||
};
|
};
|
||||||
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
|
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
|
||||||
static_cast<std::size_t>(remain));
|
static_cast<std::size_t>(remain));
|
||||||
queue->push(msg);
|
queue->push(msg);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<byte_t> recv(handle_t h) {
|
buff_t recv(handle_t h) {
|
||||||
auto queue = queue_of(h);
|
auto queue = queue_of(h);
|
||||||
if (queue == nullptr) {
|
if (queue == nullptr) {
|
||||||
return {};
|
return {};
|
||||||
@ -155,30 +161,32 @@ std::vector<byte_t> recv(handle_t h) {
|
|||||||
while(1) {
|
while(1) {
|
||||||
// pop a new message
|
// pop a new message
|
||||||
auto msg = queue->pop();
|
auto msg = queue->pop();
|
||||||
// remain_ may minus & abs(remain_) < data_length
|
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
||||||
std::size_t remain = static_cast<std::size_t>(
|
std::size_t remain = static_cast<std::size_t>(
|
||||||
static_cast<int>(data_length) + msg.remain_);
|
static_cast<int>(data_length) + msg.remain_);
|
||||||
|
// find cache with msg.id_
|
||||||
auto cache_it = rcs->find(msg.id_);
|
auto cache_it = rcs->find(msg.id_);
|
||||||
if (cache_it == rcs->end()) {
|
if (cache_it == rcs->end()) {
|
||||||
std::vector<byte_t> buf(remain);
|
if (remain <= data_length) {
|
||||||
std::memcpy(buf.data(), msg.data_, remain);
|
return make_buff(msg.data_, remain);
|
||||||
return buf;
|
}
|
||||||
|
// cache the first message fragment
|
||||||
|
else rcs->emplace(msg.id_, make_buff(msg.data_));
|
||||||
}
|
}
|
||||||
// has cache before this message
|
// has cached before this message
|
||||||
auto& cache = cache_it->second;
|
else {
|
||||||
auto last_size = cache.size();
|
auto& cache = cache_it->second;
|
||||||
// this is the last message fragment
|
// this is the last message fragment
|
||||||
if (msg.remain_ <= 0) {
|
if (msg.remain_ <= 0) {
|
||||||
cache.resize(last_size + remain);
|
cache.insert(cache.end(), msg.data_, msg.data_ + remain);
|
||||||
std::memcpy(cache.data() + last_size, msg.data_, remain);
|
// finish this message, erase it from cache
|
||||||
// finish this message, erase it from cache
|
auto buf = std::move(cache);
|
||||||
auto buf = std::move(cache);
|
rcs->erase(cache_it);
|
||||||
rcs->erase(cache_it);
|
return buf;
|
||||||
return buf;
|
}
|
||||||
|
// there are remain datas after this message
|
||||||
|
cache.insert(cache.end(), msg.data_, msg.data_ + data_length);
|
||||||
}
|
}
|
||||||
// there are remain datas after this message
|
|
||||||
cache.resize(last_size + data_length);
|
|
||||||
std::memcpy(cache.data() + last_size, msg.data_, data_length);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +251,7 @@ bool channel::send(void const *data, std::size_t size) {
|
|||||||
return ipc::send(impl(p_)->h_, data, size);
|
return ipc::send(impl(p_)->h_, data, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool channel::send(std::vector<byte_t> const & buff) {
|
bool channel::send(buff_t const & buff) {
|
||||||
return channel::send(buff.data(), buff.size());
|
return channel::send(buff.data(), buff.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +259,7 @@ bool channel::send(std::string const & str) {
|
|||||||
return channel::send(str.c_str(), str.size() + 1);
|
return channel::send(str.c_str(), str.size() + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<byte_t> channel::recv() {
|
buff_t channel::recv() {
|
||||||
return ipc::recv(impl(p_)->h_);
|
return ipc::recv(impl(p_)->h_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,7 +31,7 @@ private slots:
|
|||||||
void test_prod_cons_performance();
|
void test_prod_cons_performance();
|
||||||
|
|
||||||
void test_queue();
|
void test_queue();
|
||||||
} /*unit__*/;
|
} unit__;
|
||||||
|
|
||||||
#include "test_circ.moc"
|
#include "test_circ.moc"
|
||||||
|
|
||||||
|
|||||||
@ -138,10 +138,10 @@ void test_performance() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Unit::test_rw_lock() {
|
void Unit::test_rw_lock() {
|
||||||
// test_performance<1, 1>();
|
test_performance<1, 1>();
|
||||||
// test_performance<4, 4>();
|
test_performance<4, 4>();
|
||||||
// test_performance<1, 8>();
|
test_performance<1, 8>();
|
||||||
// test_performance<8, 1>();
|
test_performance<8, 1>();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Unit::test_send_recv() {
|
void Unit::test_send_recv() {
|
||||||
@ -155,33 +155,37 @@ void Unit::test_send_recv() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Unit::test_channel() {
|
void Unit::test_channel() {
|
||||||
auto wait_for_connected = [](int id) {
|
auto wait_for_handshake = [](int id) {
|
||||||
std::string ack = "copy:" + std::to_string(id);
|
std::string ack = "copy:" + std::to_string(id);
|
||||||
ipc::channel cc { "my-ipc-channel" };
|
ipc::channel cc { "my-ipc-channel" };
|
||||||
std::atomic_bool unmatched { true };
|
std::atomic_bool unmatched { true };
|
||||||
std::thread re {[&] {
|
std::thread re {[&] {
|
||||||
|
bool re_ack = false;
|
||||||
do {
|
do {
|
||||||
auto dd = cc.recv();
|
auto dd = cc.recv();
|
||||||
std::cout << id << "-recv: "
|
QVERIFY(!dd.empty());
|
||||||
<< std::string { reinterpret_cast<char*>(dd.data()), dd.size() }
|
std::string got { reinterpret_cast<char*>(dd.data()), dd.size() - 1 };
|
||||||
<< "[" << dd.size() << "]" << std::endl;
|
std::cout << id << "-recv: " << got << "[" << dd.size() << "]" << std::endl;
|
||||||
if ((unmatched = (std::memcmp(dd.data(), ack.c_str(), (std::min)(dd.size(), ack.size())) != 0))) {
|
if (ack != got) {
|
||||||
const char cp[] = "copy:";
|
const char cp[] = "copy:";
|
||||||
if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) {
|
if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) {
|
||||||
std::cout << "cc.send(dd)" << std::endl;
|
std::cout << id << " re_ack cc.send(dd)" << std::endl;
|
||||||
cc.send(dd);
|
QVERIFY(re_ack = cc.send(dd));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else std::cout << id << " matched!" << std::endl;
|
else if (unmatched.load(std::memory_order_relaxed)) {
|
||||||
} while (unmatched.load(std::memory_order_relaxed));
|
unmatched.store(false, std::memory_order_release);
|
||||||
|
std::cout << id << " matched!" << std::endl;
|
||||||
|
}
|
||||||
|
} while (!re_ack || unmatched.load(std::memory_order_relaxed));
|
||||||
}};
|
}};
|
||||||
while (unmatched.load(std::memory_order_relaxed)) {
|
while (unmatched.load(std::memory_order_acquire)) {
|
||||||
if (!cc.send(ack)) {
|
if (!cc.send(ack)) {
|
||||||
std::cout << "send failed!" << std::endl;
|
std::cout << "send failed!" << std::endl;
|
||||||
unmatched = false;
|
unmatched = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
}
|
}
|
||||||
re.join();
|
re.join();
|
||||||
std::cout << "fini conn " << id << std::endl;
|
std::cout << "fini conn " << id << std::endl;
|
||||||
@ -200,16 +204,22 @@ void Unit::test_channel() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
std::thread t1 {[&] {
|
std::thread t1 {[&] {
|
||||||
auto cc = wait_for_connected(1);
|
auto cc = wait_for_handshake(1);
|
||||||
for (std::size_t i = 0; i < datas.size(); ++i) {
|
const char cp[] = "copy:";
|
||||||
auto dd = cc.recv();
|
bool unchecked = true;
|
||||||
|
for (std::size_t i = 0; i < datas.size(); ++i, unchecked = false) {
|
||||||
|
ipc::buff_t dd;
|
||||||
|
do {
|
||||||
|
dd = cc.recv();
|
||||||
|
} while (unchecked && (dd.size() > sizeof(cp)) &&
|
||||||
|
std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0);
|
||||||
QCOMPARE(dd.size(), std::strlen(datas[i]) + 1);
|
QCOMPARE(dd.size(), std::strlen(datas[i]) + 1);
|
||||||
QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0);
|
QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0);
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
|
|
||||||
std::thread t2 {[&] {
|
std::thread t2 {[&] {
|
||||||
auto cc = wait_for_connected(2);
|
auto cc = wait_for_handshake(2);
|
||||||
for (std::size_t i = 0; i < datas.size(); ++i) {
|
for (std::size_t i = 0; i < datas.size(); ++i) {
|
||||||
std::cout << "sending: " << datas[i] << std::endl;
|
std::cout << "sending: " << datas[i] << std::endl;
|
||||||
cc.send(datas[i]);
|
cc.send(datas[i]);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user