mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
use tls::pointer instead of thread_local
This commit is contained in:
parent
aae13eba02
commit
64fc26733e
@ -1,8 +1,7 @@
|
|||||||
# cpp-ipc
|
# cpp-ipc - C++ IPC Library
|
||||||
|
|
||||||
[](https://github.com/mutouyun/cpp-ipc/blob/master/LICENSE) [](https://travis-ci.org/mutouyun/cpp-ipc)
|
[](https://github.com/mutouyun/cpp-ipc/blob/master/LICENSE) [](https://travis-ci.org/mutouyun/cpp-ipc)
|
||||||
|
|
||||||
C++ IPC Library:
|
|
||||||
A high-performance inter-process communication using shared memory on Linux/Windows.
|
A high-performance inter-process communication using shared memory on Linux/Windows.
|
||||||
使用共享内存的跨平台(Linux/Windows,x86/x64/ARM)高性能IPC通讯库。
|
使用共享内存的跨平台(Linux/Windows,x86/x64/ARM)高性能IPC通讯库。
|
||||||
|
|
||||||
@ -10,7 +9,7 @@ A high-performance inter-process communication using shared memory on Linux/Wind
|
|||||||
* 除STL外,无其他依赖
|
* 除STL外,无其他依赖
|
||||||
* 无锁(lock-free)或轻量级shared-spin-lock(`ipc::channel::connect`/`disconnect`)
|
* 无锁(lock-free)或轻量级shared-spin-lock(`ipc::channel::connect`/`disconnect`)
|
||||||
* 底层数据结构为循环数组(circular array),无动态内存分配
|
* 底层数据结构为循环数组(circular array),无动态内存分配
|
||||||
* `ipc::route`支持单生产者多消费者(1vN),`ipc::channel`支持多生产者多消费者
|
* `ipc::route`支持单生产者多消费者(1vN),`ipc::channel`支持多生产者多消费者(NvM)
|
||||||
|
|
||||||
## Performance
|
## Performance
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ CONFIG -= app_bundle
|
|||||||
DESTDIR = ../output
|
DESTDIR = ../output
|
||||||
|
|
||||||
msvc:QMAKE_CXXFLAGS += /std:c++17
|
msvc:QMAKE_CXXFLAGS += /std:c++17
|
||||||
else:QMAKE_CXXFLAGS += -std=gnu++1z
|
else:QMAKE_CXXFLAGS += -std=gnu++1z -Wno-unused-function
|
||||||
|
|
||||||
INCLUDEPATH += \
|
INCLUDEPATH += \
|
||||||
../test \
|
../test \
|
||||||
|
|||||||
28
src/ipc.cpp
28
src/ipc.cpp
@ -10,6 +10,7 @@
|
|||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "circ_queue.h"
|
#include "circ_queue.h"
|
||||||
#include "shm.h"
|
#include "shm.h"
|
||||||
|
#include "tls_pointer.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
@ -46,11 +47,21 @@ constexpr queue_t* queue_of(handle_t h) {
|
|||||||
|
|
||||||
inline auto& recv_cache() {
|
inline auto& recv_cache() {
|
||||||
/*
|
/*
|
||||||
* the performance of tls::pointer is not good enough
|
<Remarks> thread_local may have some bugs.
|
||||||
* so regardless of the mingw-crash-problem for the moment
|
See: https://sourceforge.net/p/mingw-w64/bugs/727/
|
||||||
|
https://sourceforge.net/p/mingw-w64/bugs/527/
|
||||||
|
https://github.com/Alexpux/MINGW-packages/issues/2519
|
||||||
|
https://github.com/ChaiScript/ChaiScript/issues/402
|
||||||
|
https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
|
||||||
|
https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
|
||||||
*/
|
*/
|
||||||
thread_local std::unordered_map<msg_id_t, buff_t> rc;
|
static tls::pointer<std::unordered_map<msg_id_t, buff_t>> rc;
|
||||||
return rc;
|
return *rc.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline auto& queues_cache() {
|
||||||
|
static tls::pointer<std::vector<queue_t*>> qc;
|
||||||
|
return *qc.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
@ -176,7 +187,7 @@ buff_t multi_recv(F&& upd) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
buff_t recv(handle_t const * hs, std::size_t size) {
|
buff_t recv(handle_t const * hs, std::size_t size) {
|
||||||
thread_local std::vector<queue_t*> q_arr(size);
|
auto& q_arr = queues_cache();
|
||||||
q_arr.clear(); // make the size to 0
|
q_arr.clear(); // make the size to 0
|
||||||
for (size_t i = 0; i < size; ++i) {
|
for (size_t i = 0; i < size; ++i) {
|
||||||
auto que = queue_of(hs[i]);
|
auto que = queue_of(hs[i]);
|
||||||
@ -193,7 +204,12 @@ buff_t recv(handle_t const * hs, std::size_t size) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
buff_t recv(handle_t h) {
|
buff_t recv(handle_t h) {
|
||||||
return recv(&h, 1);
|
auto que = queue_of(h);
|
||||||
|
if (que == nullptr) return {};
|
||||||
|
que->connect(); // wouldn't connect twice
|
||||||
|
return multi_recv([&que] {
|
||||||
|
return std::make_tuple(&que, 1);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -10,6 +10,8 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
#include "tls_pointer.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using acc_t = std::atomic_size_t;
|
using acc_t = std::atomic_size_t;
|
||||||
@ -23,8 +25,8 @@ constexpr void* mem_of(void* mem) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
inline auto& m2h() {
|
inline auto& m2h() {
|
||||||
thread_local std::unordered_map<void*, std::string> cache;
|
static ipc::tls::pointer<std::unordered_map<void*, std::string>> cache;
|
||||||
return cache;
|
return *cache.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|||||||
@ -10,6 +10,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
#include "tls_pointer.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
@ -27,8 +28,8 @@ constexpr auto to_tchar(std::string && str) -> IsSame<T, std::wstring> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
inline auto& m2h() {
|
inline auto& m2h() {
|
||||||
thread_local std::unordered_map<void*, HANDLE> cache;
|
static ipc::tls::pointer<std::unordered_map<void*, HANDLE>> cache;
|
||||||
return cache;
|
return *cache.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|||||||
22
test/test.h
22
test/test.h
@ -6,13 +6,15 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <cstdio>
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
# include <cxxabi.h> // abi::__cxa_demangle
|
# include <cxxabi.h> // abi::__cxa_demangle
|
||||||
#endif/*__GNUC__*/
|
#endif/*__GNUC__*/
|
||||||
|
|
||||||
#include "stopwatch.hpp"
|
#include "stopwatch.hpp"
|
||||||
|
#include "spin_lock.hpp"
|
||||||
|
|
||||||
class TestSuite : public QObject
|
class TestSuite : public QObject
|
||||||
{
|
{
|
||||||
@ -87,6 +89,8 @@ void benchmark_prod_cons(T* cq) {
|
|||||||
test_stopwatch sw;
|
test_stopwatch sw;
|
||||||
test_verify<V> vf { M };
|
test_verify<V> vf { M };
|
||||||
|
|
||||||
|
capo::spin_lock lc;
|
||||||
|
|
||||||
int cid = 0;
|
int cid = 0;
|
||||||
for (auto& t : consumers) {
|
for (auto& t : consumers) {
|
||||||
t = std::thread{[&, cid] {
|
t = std::thread{[&, cid] {
|
||||||
@ -95,15 +99,20 @@ void benchmark_prod_cons(T* cq) {
|
|||||||
int i = 0;
|
int i = 0;
|
||||||
tcq.recv(cn, [&](auto&& msg) {
|
tcq.recv(cn, [&](auto&& msg) {
|
||||||
// if (i % ((Loops * N) / 10) == 0) {
|
// if (i % ((Loops * N) / 10) == 0) {
|
||||||
// std::printf("%d-recving: %d%%\n", cid, (i * 100) / (Loops * N));
|
// std::unique_lock<capo::spin_lock> guard { lc };
|
||||||
|
// std::cout << cid << "-recving: " << (i * 100) / (Loops * N) << "%" << std::endl;
|
||||||
// }
|
// }
|
||||||
vf.push_data(cid, msg);
|
vf.push_data(cid, msg);
|
||||||
++i;
|
++i;
|
||||||
});
|
});
|
||||||
// std::printf("%d-consumer-disconnect\n", cid);
|
// {
|
||||||
|
// std::unique_lock<capo::spin_lock> guard { lc };
|
||||||
|
// std::cout << cid << "-consumer-disconnect" << std::endl;
|
||||||
|
// }
|
||||||
tcq.disconnect(cn);
|
tcq.disconnect(cn);
|
||||||
if (++fini_c != std::extent<decltype(consumers)>::value) {
|
if (++fini_c != std::extent<decltype(consumers)>::value) {
|
||||||
// std::printf("%d-consumer-end\n", cid);
|
// std::unique_lock<capo::spin_lock> guard { lc };
|
||||||
|
// std::cout << cid << "-consumer-end" << std::endl;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sw.print_elapsed(N, M, Loops);
|
sw.print_elapsed(N, M, Loops);
|
||||||
@ -121,10 +130,11 @@ void benchmark_prod_cons(T* cq) {
|
|||||||
auto cn = tcq.connect_send();
|
auto cn = tcq.connect_send();
|
||||||
sw.start();
|
sw.start();
|
||||||
for (int i = 0; i < Loops; ++i) {
|
for (int i = 0; i < Loops; ++i) {
|
||||||
tcq.send(cn, { pid, i });
|
|
||||||
// if (i % (Loops / 10) == 0) {
|
// if (i % (Loops / 10) == 0) {
|
||||||
// std::printf("%d-sending: %d%%\n", pid, i * 100 / Loops);
|
// std::unique_lock<capo::spin_lock> guard { lc };
|
||||||
|
// std::cout << pid << "-sending: " << (i * 100 / Loops) << "%" << std::endl;
|
||||||
// }
|
// }
|
||||||
|
tcq.send(cn, { pid, i });
|
||||||
}
|
}
|
||||||
if (++fini_p != std::extent<decltype(producers)>::value) return;
|
if (++fini_p != std::extent<decltype(producers)>::value) return;
|
||||||
// quit
|
// quit
|
||||||
|
|||||||
@ -28,18 +28,11 @@ bool operator==(msg_t const & m1, msg_t const & m2) {
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct test_verify<cq_t> {
|
struct test_verify<cq_t> {
|
||||||
std::unordered_map<int, std::vector<int>>* list_;
|
std::vector<std::unordered_map<int, std::vector<int>>> list_;
|
||||||
int lcount_;
|
|
||||||
|
|
||||||
test_verify(int M) {
|
test_verify(int M)
|
||||||
list_ = new std::remove_reference_t<decltype(*list_)>[
|
: list_(static_cast<std::size_t>(M))
|
||||||
static_cast<std::size_t>(lcount_ = M)
|
{}
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
~test_verify() {
|
|
||||||
delete [] list_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void prepare(void* pt) {
|
void prepare(void* pt) {
|
||||||
std::cout << "start consumer: " << pt << std::endl;
|
std::cout << "start consumer: " << pt << std::endl;
|
||||||
@ -51,8 +44,8 @@ struct test_verify<cq_t> {
|
|||||||
|
|
||||||
void verify(int N, int Loops) {
|
void verify(int N, int Loops) {
|
||||||
std::cout << "verifying..." << std::endl;
|
std::cout << "verifying..." << std::endl;
|
||||||
for (int m = 0; m < lcount_; ++m) {
|
for (auto& c_dats : list_) {
|
||||||
auto& cons_vec = list_[m];
|
auto& cons_vec = c_dats;
|
||||||
for (int n = 0; n < N; ++n) {
|
for (int n = 0; n < N; ++n) {
|
||||||
auto& vec = cons_vec[n];
|
auto& vec = cons_vec[n];
|
||||||
QCOMPARE(vec.size(), static_cast<std::size_t>(Loops));
|
QCOMPARE(vec.size(), static_cast<std::size_t>(Loops));
|
||||||
|
|||||||
@ -12,7 +12,6 @@
|
|||||||
#include <array>
|
#include <array>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <unordered_map>
|
|
||||||
|
|
||||||
#include "stopwatch.hpp"
|
#include "stopwatch.hpp"
|
||||||
#include "spin_lock.hpp"
|
#include "spin_lock.hpp"
|
||||||
@ -35,14 +34,13 @@ constexpr int LoopCount = 100000;
|
|||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
struct test_verify {
|
struct test_verify {
|
||||||
std::unordered_map<int, std::vector<ipc::buff_t>> list_;
|
std::vector<std::vector<ipc::buff_t>> list_;
|
||||||
int lcount_;
|
|
||||||
|
|
||||||
test_verify(int M) : lcount_{ M } {}
|
test_verify(int M)
|
||||||
|
: list_(static_cast<std::size_t>(M))
|
||||||
|
{}
|
||||||
|
|
||||||
void prepare(void* pt) {
|
void prepare(void* /*pt*/) {}
|
||||||
std::cout << "start consumer: " << pt << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
void push_data(int cid, ipc::buff_t const & msg) {
|
void push_data(int cid, ipc::buff_t const & msg) {
|
||||||
list_[cid].emplace_back(std::move(msg));
|
list_[cid].emplace_back(std::move(msg));
|
||||||
@ -50,8 +48,8 @@ struct test_verify {
|
|||||||
|
|
||||||
void verify(int /*N*/, int /*Loops*/) {
|
void verify(int /*N*/, int /*Loops*/) {
|
||||||
std::cout << "verifying..." << std::endl;
|
std::cout << "verifying..." << std::endl;
|
||||||
for (int m = 0; m < lcount_; ++m) {
|
for (auto& c_dats : list_) {
|
||||||
QCOMPARE(datas__, list_[m]);
|
QCOMPARE(datas__, c_dats);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -101,9 +99,9 @@ struct test_cq<ipc::route> {
|
|||||||
void send(cn_t& cn, const std::array<int, 2>& info) {
|
void send(cn_t& cn, const std::array<int, 2>& info) {
|
||||||
int n = info[1];
|
int n = info[1];
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
cn.send(ipc::buff_t { '\0' });
|
QVERIFY(cn.send(ipc::buff_t { '\0' }));
|
||||||
}
|
}
|
||||||
else cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
|
else QVERIFY(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -158,15 +156,15 @@ struct test_cq<ipc::channel> {
|
|||||||
void send(cn_t* cn, const std::array<int, 2>& info) {
|
void send(cn_t* cn, const std::array<int, 2>& info) {
|
||||||
thread_local struct s_dummy {
|
thread_local struct s_dummy {
|
||||||
s_dummy(cn_t* cn, int m) {
|
s_dummy(cn_t* cn, int m) {
|
||||||
cn->wait_for_recv(m);
|
cn->wait_for_recv(static_cast<std::size_t>(m));
|
||||||
// std::printf("start to send: %d.\n", m);
|
// std::printf("start to send: %d.\n", m);
|
||||||
}
|
}
|
||||||
} _(cn, m_);
|
} _(cn, m_);
|
||||||
int n = info[1];
|
int n = info[1];
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
cn->send(ipc::buff_t { '\0' });
|
QVERIFY(cn->send(ipc::buff_t { '\0' }));
|
||||||
}
|
}
|
||||||
else cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)]);
|
else QVERIFY(cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user