add error log (TBD); use pthread api for waiter

This commit is contained in:
mutouyun 2019-01-25 11:59:53 +08:00
parent 3c6ba58b41
commit 99926581d1
12 changed files with 118 additions and 78 deletions

View File

@ -32,7 +32,8 @@ HEADERS += \
../src/circ/elem_array.h \
../src/prod_cons.h \
../src/policy.h \
../src/queue.h
../src/queue.h \
../src/log.h
SOURCES += \
../src/shm.cpp \

View File

@ -131,7 +131,7 @@ public:
void lock() noexcept {
for (unsigned k = 0;;) {
auto old = lc_.fetch_or(w_flag, std::memory_order_acquire);
auto old = lc_.fetch_or(w_flag, std::memory_order_acq_rel);
if (!old) return; // got w-lock
if (!(old & w_flag)) break; // other thread having r-lock
yield(k); // other thread having w-lock
@ -147,7 +147,7 @@ public:
}
void lock_shared() noexcept {
auto old = lc_.load(std::memory_order_relaxed);
auto old = lc_.load(std::memory_order_acquire);
for (unsigned k = 0;;) {
// if w_flag set, just continue
if (old & w_flag) {
@ -155,7 +155,7 @@ public:
old = lc_.load(std::memory_order_acquire);
}
// otherwise try cas lc + 1 (set r-lock)
else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_acquire)) {
else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_release)) {
return;
}
// set r-lock failed, old has been updated

View File

@ -55,7 +55,6 @@ public:
if (!constructed_.load(std::memory_order_relaxed)) {
::new (this) conn_head;
constructed_.store(true, std::memory_order_release);
::printf("init...\n");
}
}
}

13
src/log.h Normal file
View File

@ -0,0 +1,13 @@
#pragma once
#include <cstdio>
#include <utility>
namespace ipc {
template <typename... P>
void log(char const * fmt, P&&... params) {
std::fprintf(stderr, fmt, std::forward<P>(params)...);
}
} // namespace ipc

View File

@ -163,7 +163,7 @@ public:
if (p == nullptr) return;
while (1) {
next(p) = cursor_.load(std::memory_order_acquire);
if (cursor_.compare_exchange_weak(next(p), p, std::memory_order_relaxed)) {
if (cursor_.compare_exchange_weak(next(p), p, std::memory_order_release)) {
break;
}
std::this_thread::yield();
@ -242,7 +242,7 @@ public:
void* p;
while (1) {
p = expand();
if (this->cursor_.compare_exchange_weak(p, this->next(p), std::memory_order_relaxed)) {
if (this->cursor_.compare_exchange_weak(p, this->next(p), std::memory_order_release)) {
break;
}
std::this_thread::yield();

View File

@ -5,6 +5,7 @@
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <atomic>
#include <string>
@ -12,6 +13,7 @@
#include <mutex>
#include "def.h"
#include "log.h"
#include "memory/resource.h"
namespace {
@ -49,10 +51,12 @@ void* acquire(char const * name, std::size_t size) {
S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH);
if (fd == -1) {
ipc::log("fail shm_open[%d]: %s\n", errno, name);
return nullptr;
}
size += sizeof(acc_t);
if (::ftruncate(fd, static_cast<off_t>(size)) != 0) {
ipc::log("fail ftruncate[%d]: %s\n", errno, name);
::close(fd);
::shm_unlink(op_name.c_str());
return nullptr;
@ -60,6 +64,7 @@ void* acquire(char const * name, std::size_t size) {
void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
::close(fd);
if (mem == MAP_FAILED) {
ipc::log("fail mmap[%d]: %s\n", errno, name);
::shm_unlink(op_name.c_str());
return nullptr;
}

View File

@ -7,7 +7,7 @@
#include <mutex>
#include "def.h"
#include "log.h"
#include "memory/resource.h"
#include "platform/to_tchar.h"
@ -35,10 +35,12 @@ void* acquire(char const * name, std::size_t size) {
0, static_cast<DWORD>(size),
ipc::detail::to_tchar(std::string{"__IPC_SHM__"} + name).c_str());
if (h == NULL) {
ipc::log("fail CreateFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
return nullptr;
}
LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (mem == NULL) {
ipc::log("fail MapViewOfFile[%d]: %s\n", static_cast<int>(::GetLastError()), name);
::CloseHandle(h);
return nullptr;
}

View File

@ -1,85 +1,109 @@
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <pthread.h>
#include <cstring>
#include <atomic>
#include "def.h"
#include "rw_lock.h"
#include "log.h"
#include "platform/detail.h"
namespace ipc {
namespace detail {
class waiter {
std::atomic<unsigned> rc_ { 0 };
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
std::atomic<unsigned> counter_ { 0 };
spin_lock lc_;
public:
using handle_t = sem_t*;
private:
bool post(handle_t h) {
for (unsigned k = 0;;) {
auto c = counter_.load(std::memory_order_acquire);
if (c == 0) return false;
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) {
break;
}
ipc::yield(k);
}
return ::sem_post(h) == 0;
}
using handle_t = bool;
public:
constexpr static handle_t invalid() {
return SEM_FAILED;
return false;
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
rc_.fetch_add(1, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
return ::sem_open(name, O_CREAT | O_RDWR,
S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH, 0);
if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) {
int eno;
// init mutex
pthread_mutexattr_t mutex_attr;
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::log("fail pthread_mutexattr_init[%d]: %s\n", eno, name);
return invalid();
}
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_mutexattr_setpshared[%d]: %s\n", eno, name);
return invalid();
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::log("fail pthread_mutex_init[%d]: %s\n", eno, name);
return invalid();
}
auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy);
// init condition
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::log("fail pthread_condattr_init[%d]: %s\n", eno, name);
return invalid();
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_condattr_setpshared[%d]: %s\n", eno, name);
return invalid();
}
if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) {
ipc::log("fail pthread_cond_init[%d]: %s\n", eno, name);
return invalid();
}
// no need to guard condition
// release guards
guard_mutex.release();
}
return true;
}
void close(handle_t h, char const * name) {
void close(handle_t h) {
if (h == invalid()) return;
if (name == nullptr || name[0] == '\0') return;
::sem_close(h);
if (rc_.fetch_sub(1, std::memory_order_acquire) == 1) {
::sem_unlink(name);
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
::pthread_cond_destroy(&cond_);
::pthread_mutex_destroy(&mutex_);
}
}
bool wait(handle_t h) {
if (h == invalid()) return false;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
counter_.fetch_add(1, std::memory_order_relaxed);
int eno;
if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) {
ipc::log("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
bool ret = (::sem_wait(h) == 0);
return ret;
IPC_UNUSED_ auto guard = unique_ptr(&mutex_, ::pthread_mutex_unlock);
if ((eno = ::pthread_cond_wait(&cond_, &mutex_)) != 0) {
ipc::log("fail pthread_cond_wait[%d]\n", eno);
return false;
}
return true;
}
void notify(handle_t h) {
if (h == invalid()) return;
post(h);
int eno;
if ((eno = ::pthread_cond_signal(&cond_)) != 0) {
ipc::log("fail pthread_cond_signal[%d]\n", eno);
}
}
void broadcast(handle_t h) {
if (h == invalid()) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
while (post(h)) ;
int eno;
if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) {
ipc::log("fail pthread_cond_broadcast[%d]\n", eno);
}
}
};

View File

@ -27,7 +27,7 @@ public:
return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str());
}
void close(handle_t h, char const * /*name*/) {
void close(handle_t h) {
if (h == invalid()) return;
::CloseHandle(h);
}
@ -44,7 +44,7 @@ public:
for (unsigned k = 0;;) {
auto c = counter_.load(std::memory_order_acquire);
if (c == 0) return;
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) {
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_release)) {
break;
}
ipc::yield(k);

View File

@ -20,7 +20,6 @@ public:
private:
waiter_t* w_ = nullptr;
waiter_t::handle_t h_ = waiter_t::invalid();
std::string n_;
public:
waiter_wrapper() = default;
@ -44,15 +43,13 @@ public:
bool open(char const * name) {
if (w_ == nullptr) return false;
close();
h_ = w_->open((n_ = name).c_str());
::printf("%s: %p\n", name, h_);
h_ = w_->open(name);
return valid();
}
void close() {
if (!valid()) return;
::printf("close %s: %p\n", n_.c_str(), h_);
w_->close(h_, n_.c_str());
w_->close(h_);
h_ = waiter_t::invalid();
}

View File

@ -41,8 +41,8 @@ struct prod_cons_impl<prod_cons<relat::single, relat::single, trans::unicast>> {
template <typename E, typename F, typename EB>
bool push(E* /*elems*/, F&& f, EB* elem_start) {
auto cur_wt = circ::index_of(wt_.load(std::memory_order_acquire));
if (cur_wt == circ::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
return false; // full
}
std::forward<F>(f)(elem_start + cur_wt);
@ -52,8 +52,8 @@ struct prod_cons_impl<prod_cons<relat::single, relat::single, trans::unicast>> {
template <typename E, typename F, typename EB>
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
auto cur_rd = circ::index_of(rd_.load(std::memory_order_acquire));
if (cur_rd == circ::index_of(wt_.load(std::memory_order_relaxed))) {
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
return false; // empty
}
std::forward<F>(f)(elem_start + cur_rd);
@ -70,9 +70,9 @@ struct prod_cons_impl<prod_cons<relat::single, relat::multi , trans::unicast>>
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
byte_t buff[sizeof(E)];
for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_acquire);
auto cur_rd = rd_.load(std::memory_order_relaxed);
if (circ::index_of(cur_rd) ==
circ::index_of(wt_.load(std::memory_order_relaxed))) {
circ::index_of(wt_.load(std::memory_order_acquire))) {
return false; // empty
}
std::memcpy(buff, elem_start + circ::index_of(cur_rd), sizeof(buff));
@ -95,12 +95,12 @@ struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::unicast>>
bool push(E* /*elems*/, F&& f, EB* elem_start) {
circ::u2_t cur_ct, nxt_ct;
while(1) {
cur_ct = ct_.load(std::memory_order_acquire);
cur_ct = ct_.load(std::memory_order_relaxed);
if (circ::index_of(nxt_ct = cur_ct + 1) ==
circ::index_of(rd_.load(std::memory_order_relaxed))) {
circ::index_of(rd_.load(std::memory_order_acquire))) {
return false; // full
}
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) {
break;
}
std::this_thread::yield();
@ -141,14 +141,14 @@ struct prod_cons_impl<prod_cons<relat::single, relat::multi, trans::broadcast>>
template <typename E, typename F, typename EB>
bool push(E* elems, F&& f, EB* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
auto conn_cnt = elems->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false;
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_relaxed));
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
while(1) {
rc_t expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
break;
}
std::this_thread::yield();
@ -187,16 +187,16 @@ struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::broadcast>>
template <typename E, typename F, typename EB>
bool push(E* elems, F&& f, EB* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
auto conn_cnt = elems->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false;
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire),
nxt_ct = cur_ct + 1;
auto el = elem_start + circ::index_of(cur_ct);
// check all consumers have finished reading this element
while(1) {
rc_t expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
break;
}
std::this_thread::yield();

View File

@ -311,10 +311,10 @@ void test_lock_performance() {
}
void Unit::test_rw_lock() {
test_lock_performance<1, 1>();
test_lock_performance<4, 4>();
test_lock_performance<1, 8>();
test_lock_performance<8, 1>();
// test_lock_performance<1, 1>();
// test_lock_performance<4, 4>();
// test_lock_performance<1, 8>();
// test_lock_performance<8, 1>();
}
template <typename T, int N, int M, bool V = true, int Loops = LoopCount>
@ -338,6 +338,7 @@ void Unit::test_route() {
ipc::route cc { "my-ipc-route" };
for (std::size_t i = 0; i < datas.size(); ++i) {
ipc::buff_t dd = cc.recv();
std::cout << "recv: " << (char*)dd.data() << std::endl;
QCOMPARE(dd.size(), std::strlen(datas[i]) + 1);
QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0);
}
@ -356,8 +357,6 @@ void Unit::test_route() {
t1.join();
t2.join();
test_prod_cons<ipc::route, 1, 1>();
}
void Unit::test_route_rtt() {