use rw_lock for ipc with multi-thread

This commit is contained in:
mutouyun 2018-12-12 23:31:22 +08:00
parent c40dddcc06
commit 5ffc820983
3 changed files with 45 additions and 13 deletions

View File

@ -34,8 +34,9 @@ public:
} }
void swap(queue& rhs) { void swap(queue& rhs) {
std::swap(elems_ , rhs.elems_ ); std::swap(elems_ , rhs.elems_ );
std::swap(cursor_, rhs.cursor_); std::swap(cursor_ , rhs.cursor_ );
std::swap(connected_, rhs.connected_);
} }
queue& operator=(queue rhs) { queue& operator=(queue rhs) {

View File

@ -7,6 +7,8 @@
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <utility> #include <utility>
#include <shared_mutex>
#include <mutex>
#include "circ_queue.h" #include "circ_queue.h"
#include "rw_lock.h" #include "rw_lock.h"
@ -33,6 +35,7 @@ queue_t* queue_of(handle_t h) {
if (h == nullptr) { if (h == nullptr) {
return nullptr; return nullptr;
} }
std::shared_lock<rw_lock> guard { h2q_lc__ };
auto it = h2q__.find(h); auto it = h2q__.find(h);
if (it == h2q__.end()) { if (it == h2q__.end()) {
return nullptr; return nullptr;
@ -59,18 +62,26 @@ handle_t connect(char const * name) {
if (mem == nullptr) { if (mem == nullptr) {
return nullptr; return nullptr;
} }
h2q__[h].attach(static_cast<queue_t::array_t*>(mem)); {
std::unique_lock<rw_lock> guard { h2q_lc__ };
h2q__[h].attach(static_cast<queue_t::array_t*>(mem));
}
h_guard.release(); h_guard.release();
return h; return h;
} }
void disconnect(handle_t h) { void disconnect(handle_t h) {
auto it = h2q__.find(h); void* mem = nullptr;
if (it == h2q__.end()) return; {
it->second.disconnect(); std::unique_lock<rw_lock> guard { h2q_lc__ };
shm::close(it->second.detach()); auto it = h2q__.find(h);
if (it == h2q__.end()) return;
it->second.disconnect();
mem = it->second.elems(); // needn't to detach
h2q__.erase(it);
}
shm::close(mem);
shm::release(h, sizeof(queue_t)); shm::release(h, sizeof(queue_t));
h2q__.erase(it);
} }
bool send(handle_t h, void* data, int size) { bool send(handle_t h, void* data, int size) {

View File

@ -5,6 +5,11 @@
#include <shared_mutex> #include <shared_mutex>
#include <mutex> #include <mutex>
#include <typeinfo> #include <typeinfo>
#include <memory>
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/
#include "ipc.h" #include "ipc.h"
#include "rw_lock.h" #include "rw_lock.h"
@ -49,7 +54,17 @@ void benchmark() {
Lc lc; Lc lc;
test_stopwatch sw; test_stopwatch sw;
#if defined(__GNUC__)
{
const char* typeid_name = typeid(Lc).name();
const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr);
std::unique_ptr<void, decltype(::free)*> guard { (void*)real_name, ::free };
if (real_name == nullptr) real_name = typeid_name;
std::cout << std::endl << real_name << std::endl;
}
#else
std::cout << std::endl << typeid(Lc).name() << std::endl; std::cout << std::endl << typeid(Lc).name() << std::endl;
#endif/*__GNUC__*/
for (auto& t : r_trd) { for (auto& t : r_trd) {
t = std::thread([&] { t = std::thread([&] {
@ -58,7 +73,7 @@ void benchmark() {
while (1) { while (1) {
int x = -1; int x = -1;
{ {
[[maybe_unused]] std::shared_lock<Lc> guard { lc }; std::shared_lock<Lc> guard { lc };
if (cnt < datas.size()) { if (cnt < datas.size()) {
x = datas[cnt]; x = datas[cnt];
} }
@ -73,9 +88,9 @@ void benchmark() {
if (++fini == std::extent<decltype(r_trd)>::value) { if (++fini == std::extent<decltype(r_trd)>::value) {
sw.print_elapsed(R, W, Loops); sw.print_elapsed(R, W, Loops);
} }
std::uint64_t sum = 0; std::int64_t sum = 0;
for (int i : seq) sum += i; for (int i : seq) sum += i;
QCOMPARE(sum, acc<std::uint64_t>(1, Loops) * std::extent<decltype(w_trd)>::value); QCOMPARE(sum, acc<std::int64_t>(1, Loops) * std::extent<decltype(w_trd)>::value);
}); });
} }
@ -84,7 +99,7 @@ void benchmark() {
sw.start(); sw.start();
for (int i = 1; i <= Loops; ++i) { for (int i = 1; i <= Loops; ++i) {
{ {
[[maybe_unused]] std::unique_lock<Lc> guard { lc }; std::unique_lock<Lc> guard { lc };
datas.push_back(i); datas.push_back(i);
} }
std::this_thread::yield(); std::this_thread::yield();
@ -101,10 +116,15 @@ void benchmark() {
template <int R, int W> template <int R, int W>
void test_performance() { void test_performance() {
std::cout << std::endl
<< "test_performance: [" << R << ":" << W << "]"
<< std::endl;
benchmark<ipc::rw_lock , R, W>(); benchmark<ipc::rw_lock , R, W>();
benchmark<lc_wrapper<capo::spin_lock>, R, W>(); benchmark<lc_wrapper<capo::spin_lock>, R, W>();
benchmark<lc_wrapper<std::mutex> , R, W>(); benchmark<lc_wrapper<std::mutex> , R, W>();
benchmark<std::shared_mutex , R, W>(); benchmark<std::shared_timed_mutex , R, W>();
} }
void Unit::test_rw_lock() { void Unit::test_rw_lock() {