diff --git a/build/ipc.pro b/build/ipc.pro index 85082c8..43e68c4 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -32,7 +32,8 @@ HEADERS += \ ../src/circ/elem_array.h \ ../src/prod_cons.h \ ../src/policy.h \ - ../src/queue.h + ../src/queue.h \ + ../src/log.h SOURCES += \ ../src/shm.cpp \ diff --git a/include/rw_lock.h b/include/rw_lock.h index d01f70c..5e91428 100644 --- a/include/rw_lock.h +++ b/include/rw_lock.h @@ -131,7 +131,7 @@ public: void lock() noexcept { for (unsigned k = 0;;) { - auto old = lc_.fetch_or(w_flag, std::memory_order_acquire); + auto old = lc_.fetch_or(w_flag, std::memory_order_acq_rel); if (!old) return; // got w-lock if (!(old & w_flag)) break; // other thread having r-lock yield(k); // other thread having w-lock @@ -147,7 +147,7 @@ public: } void lock_shared() noexcept { - auto old = lc_.load(std::memory_order_relaxed); + auto old = lc_.load(std::memory_order_acquire); for (unsigned k = 0;;) { // if w_flag set, just continue if (old & w_flag) { @@ -155,7 +155,7 @@ public: old = lc_.load(std::memory_order_acquire); } // otherwise try cas lc + 1 (set r-lock) - else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_acquire)) { + else if (lc_.compare_exchange_weak(old, old + 1, std::memory_order_release)) { return; } // set r-lock failed, old has been updated diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index deb6118..90ca6b1 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -55,7 +55,6 @@ public: if (!constructed_.load(std::memory_order_relaxed)) { ::new (this) conn_head; constructed_.store(true, std::memory_order_release); - ::printf("init...\n"); } } } diff --git a/src/log.h b/src/log.h new file mode 100644 index 0000000..90ee676 --- /dev/null +++ b/src/log.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace ipc { + +template +void log(char const * fmt, P&&... params) { + std::fprintf(stderr, fmt, std::forward

(params)...); +} + +} // namespace ipc diff --git a/src/memory/alloc.h b/src/memory/alloc.h index b7ebdd0..6fe2c41 100644 --- a/src/memory/alloc.h +++ b/src/memory/alloc.h @@ -163,7 +163,7 @@ public: if (p == nullptr) return; while (1) { next(p) = cursor_.load(std::memory_order_acquire); - if (cursor_.compare_exchange_weak(next(p), p, std::memory_order_relaxed)) { + if (cursor_.compare_exchange_weak(next(p), p, std::memory_order_release)) { break; } std::this_thread::yield(); @@ -242,7 +242,7 @@ public: void* p; while (1) { p = expand(); - if (this->cursor_.compare_exchange_weak(p, this->next(p), std::memory_order_relaxed)) { + if (this->cursor_.compare_exchange_weak(p, this->next(p), std::memory_order_release)) { break; } std::this_thread::yield(); diff --git a/src/platform/shm_linux.cpp b/src/platform/shm_linux.cpp index d7c435d..eb3d913 100644 --- a/src/platform/shm_linux.cpp +++ b/src/platform/shm_linux.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -12,6 +13,7 @@ #include #include "def.h" +#include "log.h" #include "memory/resource.h" namespace { @@ -49,10 +51,12 @@ void* acquire(char const * name, std::size_t size) { S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); if (fd == -1) { + ipc::log("fail shm_open[%d]: %s\n", errno, name); return nullptr; } size += sizeof(acc_t); if (::ftruncate(fd, static_cast(size)) != 0) { + ipc::log("fail ftruncate[%d]: %s\n", errno, name); ::close(fd); ::shm_unlink(op_name.c_str()); return nullptr; @@ -60,6 +64,7 @@ void* acquire(char const * name, std::size_t size) { void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); ::close(fd); if (mem == MAP_FAILED) { + ipc::log("fail mmap[%d]: %s\n", errno, name); ::shm_unlink(op_name.c_str()); return nullptr; } diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index 6a7cf7c..6c2edf4 100644 --- a/src/platform/shm_win.cpp +++ b/src/platform/shm_win.cpp @@ -7,7 +7,7 @@ #include #include "def.h" - +#include "log.h" #include "memory/resource.h" #include "platform/to_tchar.h" @@ -35,10 +35,12 @@ void* acquire(char const * name, std::size_t size) { 0, static_cast(size), ipc::detail::to_tchar(std::string{"__IPC_SHM__"} + name).c_str()); if (h == NULL) { + ipc::log("fail CreateFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); return nullptr; } LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0); if (mem == NULL) { + ipc::log("fail MapViewOfFile[%d]: %s\n", static_cast(::GetLastError()), name); ::CloseHandle(h); return nullptr; } diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index ab80eb1..d074b90 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -1,85 +1,109 @@ #pragma once -#include -#include -#include +#include #include #include #include "def.h" -#include "rw_lock.h" - +#include "log.h" #include "platform/detail.h" namespace ipc { namespace detail { class waiter { - std::atomic rc_ { 0 }; + pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; std::atomic counter_ { 0 }; - spin_lock lc_; - public: - using handle_t = sem_t*; - -private: - bool post(handle_t h) { - for (unsigned k = 0;;) { - auto c = counter_.load(std::memory_order_acquire); - if (c == 0) return false; - if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) { - break; - } - ipc::yield(k); - } - return ::sem_post(h) == 0; - } + using handle_t = bool; public: constexpr static handle_t invalid() { - return SEM_FAILED; + return false; } handle_t open(char const * name) { if (name == nullptr || name[0] == '\0') return invalid(); - rc_.fetch_add(1, std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_release); - return ::sem_open(name, O_CREAT | O_RDWR, - S_IRUSR | S_IWUSR | - S_IRGRP | S_IWGRP | - S_IROTH | S_IWOTH, 0); + if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) { + int eno; + // init mutex + pthread_mutexattr_t mutex_attr; + if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { + ipc::log("fail pthread_mutexattr_init[%d]: %s\n", eno, name); + return invalid(); + } + IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); + if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::log("fail pthread_mutexattr_setpshared[%d]: %s\n", eno, name); + return invalid(); + } + if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { + ipc::log("fail pthread_mutex_init[%d]: %s\n", eno, name); + return invalid(); + } + auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy); + // init condition + pthread_condattr_t cond_attr; + if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { + ipc::log("fail pthread_condattr_init[%d]: %s\n", eno, name); + return invalid(); + } + IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::log("fail pthread_condattr_setpshared[%d]: %s\n", eno, name); + return invalid(); + } + if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) { + ipc::log("fail pthread_cond_init[%d]: %s\n", eno, name); + return invalid(); + } + // no need to guard condition + // release guards + guard_mutex.release(); + } + return true; } - void close(handle_t h, char const * name) { + void close(handle_t h) { if (h == invalid()) return; - if (name == nullptr || name[0] == '\0') return; - ::sem_close(h); - if (rc_.fetch_sub(1, std::memory_order_acquire) == 1) { - ::sem_unlink(name); + if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + ::pthread_cond_destroy(&cond_); + ::pthread_mutex_destroy(&mutex_); } } bool wait(handle_t h) { if (h == invalid()) return false; - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_); - counter_.fetch_add(1, std::memory_order_relaxed); + int eno; + if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { + ipc::log("fail pthread_mutex_lock[%d]\n", eno); + return false; } - bool ret = (::sem_wait(h) == 0); - return ret; + IPC_UNUSED_ auto guard = unique_ptr(&mutex_, ::pthread_mutex_unlock); + if ((eno = ::pthread_cond_wait(&cond_, &mutex_)) != 0) { + ipc::log("fail pthread_cond_wait[%d]\n", eno); + return false; + } + return true; } void notify(handle_t h) { if (h == invalid()) return; - post(h); + int eno; + if ((eno = ::pthread_cond_signal(&cond_)) != 0) { + ipc::log("fail pthread_cond_signal[%d]\n", eno); + } } void broadcast(handle_t h) { if (h == invalid()) return; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_); - while (post(h)) ; + int eno; + if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { + ipc::log("fail pthread_cond_broadcast[%d]\n", eno); + } } }; diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 9d5bfe8..6b3565d 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -27,7 +27,7 @@ public: return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str()); } - void close(handle_t h, char const * /*name*/) { + void close(handle_t h) { if (h == invalid()) return; ::CloseHandle(h); } @@ -44,7 +44,7 @@ public: for (unsigned k = 0;;) { auto c = counter_.load(std::memory_order_acquire); if (c == 0) return; - if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) { + if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_release)) { break; } ipc::yield(k); diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 254ad12..9cc6a65 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -20,7 +20,6 @@ public: private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); - std::string n_; public: waiter_wrapper() = default; @@ -44,15 +43,13 @@ public: bool open(char const * name) { if (w_ == nullptr) return false; close(); - h_ = w_->open((n_ = name).c_str()); - ::printf("%s: %p\n", name, h_); + h_ = w_->open(name); return valid(); } void close() { if (!valid()) return; - ::printf("close %s: %p\n", n_.c_str(), h_); - w_->close(h_, n_.c_str()); + w_->close(h_); h_ = waiter_t::invalid(); } diff --git a/src/prod_cons.h b/src/prod_cons.h index ba43fbc..9c571d4 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -41,8 +41,8 @@ struct prod_cons_impl> { template bool push(E* /*elems*/, F&& f, EB* elem_start) { - auto cur_wt = circ::index_of(wt_.load(std::memory_order_acquire)); - if (cur_wt == circ::index_of(rd_.load(std::memory_order_relaxed) - 1)) { + auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); + if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) { return false; // full } std::forward(f)(elem_start + cur_wt); @@ -52,8 +52,8 @@ struct prod_cons_impl> { template bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) { - auto cur_rd = circ::index_of(rd_.load(std::memory_order_acquire)); - if (cur_rd == circ::index_of(wt_.load(std::memory_order_relaxed))) { + auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); + if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(elem_start + cur_rd); @@ -70,9 +70,9 @@ struct prod_cons_impl> bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) { byte_t buff[sizeof(E)]; for (unsigned k = 0;;) { - auto cur_rd = rd_.load(std::memory_order_acquire); + auto cur_rd = rd_.load(std::memory_order_relaxed); if (circ::index_of(cur_rd) == - circ::index_of(wt_.load(std::memory_order_relaxed))) { + circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::memcpy(buff, elem_start + circ::index_of(cur_rd), sizeof(buff)); @@ -95,12 +95,12 @@ struct prod_cons_impl> bool push(E* /*elems*/, F&& f, EB* elem_start) { circ::u2_t cur_ct, nxt_ct; while(1) { - cur_ct = ct_.load(std::memory_order_acquire); + cur_ct = ct_.load(std::memory_order_relaxed); if (circ::index_of(nxt_ct = cur_ct + 1) == - circ::index_of(rd_.load(std::memory_order_relaxed))) { + circ::index_of(rd_.load(std::memory_order_acquire))) { return false; // full } - if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) { + if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) { break; } std::this_thread::yield(); @@ -141,14 +141,14 @@ struct prod_cons_impl> template bool push(E* elems, F&& f, EB* elem_start) { - auto conn_cnt = elems->conn_count(); // acquire + auto conn_cnt = elems->conn_count(std::memory_order_relaxed); if (conn_cnt == 0) return false; - auto el = elem_start + circ::index_of(wt_.load(std::memory_order_relaxed)); + auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element while(1) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_relaxed)) { + expected, static_cast(conn_cnt), std::memory_order_release)) { break; } std::this_thread::yield(); @@ -187,16 +187,16 @@ struct prod_cons_impl> template bool push(E* elems, F&& f, EB* elem_start) { - auto conn_cnt = elems->conn_count(); // acquire + auto conn_cnt = elems->conn_count(std::memory_order_relaxed); if (conn_cnt == 0) return false; - circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed), + circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire), nxt_ct = cur_ct + 1; auto el = elem_start + circ::index_of(cur_ct); // check all consumers have finished reading this element while(1) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_relaxed)) { + expected, static_cast(conn_cnt), std::memory_order_release)) { break; } std::this_thread::yield(); diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 70f6953..16b0c64 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -311,10 +311,10 @@ void test_lock_performance() { } void Unit::test_rw_lock() { - test_lock_performance<1, 1>(); - test_lock_performance<4, 4>(); - test_lock_performance<1, 8>(); - test_lock_performance<8, 1>(); +// test_lock_performance<1, 1>(); +// test_lock_performance<4, 4>(); +// test_lock_performance<1, 8>(); +// test_lock_performance<8, 1>(); } template @@ -338,6 +338,7 @@ void Unit::test_route() { ipc::route cc { "my-ipc-route" }; for (std::size_t i = 0; i < datas.size(); ++i) { ipc::buff_t dd = cc.recv(); + std::cout << "recv: " << (char*)dd.data() << std::endl; QCOMPARE(dd.size(), std::strlen(datas[i]) + 1); QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0); } @@ -356,8 +357,6 @@ void Unit::test_route() { t1.join(); t2.join(); - - test_prod_cons(); } void Unit::test_route_rtt() {