mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
clear codes
This commit is contained in:
parent
f7f06ab052
commit
1cf69038bb
@ -2,18 +2,8 @@
|
|||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
#include <cstring>
|
|
||||||
#include <atomic>
|
|
||||||
#include <string>
|
|
||||||
#include <utility>
|
|
||||||
#include <tuple>
|
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
#include "shm.h"
|
|
||||||
#include "rw_lock.h"
|
|
||||||
#include "id_pool.h"
|
|
||||||
#include "pool_alloc.h"
|
|
||||||
|
|
||||||
#include "platform/detail.h"
|
#include "platform/detail.h"
|
||||||
|
|
||||||
@ -115,9 +105,9 @@ public:
|
|||||||
#pragma pop_macro("IPC_PTHREAD_FUNC_")
|
#pragma pop_macro("IPC_PTHREAD_FUNC_")
|
||||||
|
|
||||||
class semaphore {
|
class semaphore {
|
||||||
mutex lock_;
|
mutex lock_;
|
||||||
condition cond_;
|
condition cond_;
|
||||||
long counter_ = 0;
|
long volatile counter_ = 0;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
bool open() {
|
bool open() {
|
||||||
|
|||||||
@ -18,7 +18,8 @@ namespace ipc {
|
|||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
class waiter {
|
class waiter {
|
||||||
std::atomic<long> counter_ { 0 };
|
long volatile counter_ = 0;
|
||||||
|
spin_lock lock_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using handle_t = HANDLE;
|
using handle_t = HANDLE;
|
||||||
@ -37,51 +38,32 @@ public:
|
|||||||
::CloseHandle(h);
|
::CloseHandle(h);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename F>
|
|
||||||
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& pred) {
|
|
||||||
if (all == nullptr || size == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!std::forward<F>(pred)()) return true;
|
|
||||||
auto hs = static_cast<handle_t*>(mem::alloc(sizeof(handle_t) * size));
|
|
||||||
IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); });
|
|
||||||
std::size_t i = 0;
|
|
||||||
for (; i < size; ++i) {
|
|
||||||
auto& info = all[i];
|
|
||||||
if ((std::get<0>(all[i]) == nullptr) ||
|
|
||||||
(std::get<1>(all[i]) == invalid())) continue;
|
|
||||||
std::get<0>(info)->counter_.fetch_add(1, std::memory_order_relaxed);
|
|
||||||
hs[i] = std::get<1>(all[i]);
|
|
||||||
}
|
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
|
||||||
return ::WaitForMultipleObjects(static_cast<DWORD>(i), hs, FALSE, INFINITE) != WAIT_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename F>
|
template <typename F>
|
||||||
bool wait_if(handle_t h, F&& pred) {
|
bool wait_if(handle_t h, F&& pred) {
|
||||||
if (h == invalid()) return false;
|
if (h == invalid()) return false;
|
||||||
if (!std::forward<F>(pred)()) return true;
|
{
|
||||||
counter_.fetch_add(1, std::memory_order_relaxed);
|
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
if (!std::forward<F>(pred)()) return true;
|
||||||
|
++ counter_;
|
||||||
|
}
|
||||||
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;
|
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void notify(handle_t h) {
|
void notify(handle_t h) {
|
||||||
if (h == invalid()) return;
|
if (h == invalid()) return;
|
||||||
for (unsigned k = 0;;) {
|
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||||
auto c = counter_.load(std::memory_order_acquire);
|
if (counter_ == 0) return;
|
||||||
if (c == 0) return;
|
-- counter_;
|
||||||
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_release)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
|
||||||
::ReleaseSemaphore(h, 1, NULL);
|
::ReleaseSemaphore(h, 1, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void broadcast(handle_t h) {
|
void broadcast(handle_t h) {
|
||||||
if (h == invalid()) return;
|
if (h == invalid()) return;
|
||||||
::ReleaseSemaphore(h, counter_.exchange(0, std::memory_order_acquire), NULL);
|
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||||
|
if (counter_ == 0) return;
|
||||||
|
long all_count = counter_;
|
||||||
|
counter_ = 0;
|
||||||
|
::ReleaseSemaphore(h, all_count, NULL);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -1,10 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <tuple>
|
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
#include <atomic>
|
||||||
#include "pool_alloc.h"
|
|
||||||
|
|
||||||
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
|
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
|
||||||
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
|
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
|
||||||
@ -26,10 +23,6 @@ private:
|
|||||||
waiter_t* w_ = nullptr;
|
waiter_t* w_ = nullptr;
|
||||||
waiter_t::handle_t h_ = waiter_t::invalid();
|
waiter_t::handle_t h_ = waiter_t::invalid();
|
||||||
|
|
||||||
auto to_w_info() {
|
|
||||||
return std::make_tuple(w_, h_);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
waiter_wrapper() = default;
|
waiter_wrapper() = default;
|
||||||
explicit waiter_wrapper(waiter_t* w) {
|
explicit waiter_wrapper(waiter_t* w) {
|
||||||
|
|||||||
@ -94,7 +94,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
template <typename E, typename F, typename EB>
|
template <typename E, typename F, typename EB>
|
||||||
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
||||||
circ::u2_t cur_ct, nxt_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
while(1) {
|
while (1) {
|
||||||
cur_ct = ct_.load(std::memory_order_relaxed);
|
cur_ct = ct_.load(std::memory_order_relaxed);
|
||||||
if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
||||||
circ::index_of(rd_.load(std::memory_order_acquire))) {
|
circ::index_of(rd_.load(std::memory_order_acquire))) {
|
||||||
@ -106,7 +106,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(elem_start + circ::index_of(cur_ct));
|
std::forward<F>(f)(elem_start + circ::index_of(cur_ct));
|
||||||
while(1) {
|
while (1) {
|
||||||
auto exp_wt = cur_ct;
|
auto exp_wt = cur_ct;
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
@ -145,7 +145,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire));
|
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire));
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
while(1) {
|
while (1) {
|
||||||
rc_t expected = 0;
|
rc_t expected = 0;
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
||||||
@ -193,7 +193,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>>
|
|||||||
nxt_ct = cur_ct + 1;
|
nxt_ct = cur_ct + 1;
|
||||||
auto el = elem_start + circ::index_of(cur_ct);
|
auto el = elem_start + circ::index_of(cur_ct);
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
while(1) {
|
while (1) {
|
||||||
rc_t expected = 0;
|
rc_t expected = 0;
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
||||||
@ -204,7 +204,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>>
|
|||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(el->data_);
|
std::forward<F>(f)(el->data_);
|
||||||
while(1) {
|
while (1) {
|
||||||
auto exp_wt = cur_ct;
|
auto exp_wt = cur_ct;
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user