This commit is contained in:
zhangyi 2019-03-26 11:10:22 +08:00
parent 1d2f6d13ea
commit 6730fa578d
3 changed files with 44 additions and 50 deletions

View File

@ -75,6 +75,19 @@ struct cache_t {
}
};
template <typename W, typename F>
void wait_for(W& waiter, F&& pred) {
for (unsigned k = 0; pred();) {
bool ret = true;
ipc::sleep(k, [&ret, &waiter, &pred] {
return waiter.wait_if([&ret, &pred] {
return ret = pred();
});
});
if (!ret) break;
}
}
template <typename Policy>
struct detail_impl {
@ -148,13 +161,9 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) {
if (que == nullptr) {
return false;
}
for (unsigned k = 0; que->conn_count() < r_count;) {
ipc::sleep(k, [h, que, r_count] {
return info_of(h)->cc_waiter_.wait_if([que, r_count] {
wait_for(info_of(h)->cc_waiter_, [que, r_count] {
return que->conn_count() < r_count;
});
});
}
return true;
}
@ -202,15 +211,9 @@ static buff_t recv(ipc::handle_t h) {
while (1) {
// pop a new message
typename queue_t::value_t msg;
for (unsigned k = 0; !que->pop(msg);) {
bool succ = false;
ipc::sleep(k, [h, que, &msg, &succ] {
return info_of(h)->rd_waiter_.wait_if([que, &msg, &succ] {
return !(succ = que->pop(msg));
wait_for(info_of(h)->rd_waiter_, [que, &msg] {
return !que->pop(msg);
});
});
if (succ) break;
}
if (msg.head_.que_ == nullptr) return {};
if (msg.head_.que_ == que) continue; // pop next
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length

View File

@ -20,10 +20,6 @@ class semaphore {
HANDLE h_ = NULL;
public:
friend bool operator==(semaphore const & s1, semaphore const & s2) {
return s1.h_ == s2.h_;
}
bool open(std::string && name, long count = 0, long limit = LONG_MAX) {
h_ = ::CreateSemaphore(NULL, count, limit, ipc::detail::to_tchar(std::move(name)).c_str());
if (h_ == NULL) {
@ -72,27 +68,23 @@ class condition {
mutex lock_;
semaphore sema_, handshake_;
ipc::shm::handle waiting_; // std::atomic<unsigned>
std::atomic<unsigned> * waiting_ = nullptr;
long * counter_ = nullptr;
auto waiting_cnt() {
return static_cast<std::atomic<unsigned>*>(waiting_.get());
}
public:
friend bool operator==(condition const & c1, condition const & c2) {
return c1.counter_ == c2.counter_;
return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_);
}
friend bool operator!=(condition const & c1, condition const & c2) {
return !(c1 == c2);
}
bool open(std::string const & name, long * counter) {
if (lock_ .open ("__COND_MTX__" + name) &&
sema_ .open ("__COND_SEM__" + name) &&
handshake_.open ("__COND_HAN__" + name) &&
waiting_ .acquire(("__COND_WAITING_CNT__" + name).c_str(), sizeof(std::atomic<unsigned>))) {
bool open(std::string const & name, std::atomic<unsigned> * waiting, long * counter) {
if (lock_ .open("__COND_MTX__" + name) &&
sema_ .open("__COND_SEM__" + name) &&
handshake_.open("__COND_HAN__" + name)) {
waiting_ = waiting;
counter_ = counter;
return true;
}
@ -100,7 +92,6 @@ public:
}
void close() {
waiting_ .release();
handshake_.close();
sema_ .close();
lock_ .close();
@ -108,10 +99,7 @@ public:
template <typename Mutex, typename F>
bool wait_if(Mutex& mtx, F&& pred) {
auto cnt = waiting_cnt();
if (cnt != nullptr) {
cnt->fetch_add(1, std::memory_order_release);
}
waiting_->fetch_add(1, std::memory_order_release);
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
@ -119,9 +107,7 @@ public:
}
mtx.unlock();
bool ret = sema_.wait();
if (cnt != nullptr) {
cnt->fetch_sub(1, std::memory_order_release);
}
waiting_->fetch_sub(1, std::memory_order_release);
ret = handshake_.post() && ret;
mtx.lock();
return ret;
@ -129,8 +115,7 @@ public:
bool notify() {
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_cnt() != nullptr &&
waiting_cnt()->load(std::memory_order_relaxed) == 0) {
if (waiting_->load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
@ -145,8 +130,7 @@ public:
bool broadcast() {
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_cnt() != nullptr &&
waiting_cnt()->load(std::memory_order_relaxed) == 0) {
if (waiting_->load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
@ -163,6 +147,8 @@ public:
};
class waiter {
std::atomic<unsigned> waiting_ { 0 };
long counter_ = 0;
public:
@ -177,7 +163,7 @@ public:
return invalid();
}
condition cond;
if (cond.open(name, &counter_)) {
if (cond.open(name, &waiting_, &counter_)) {
return cond;
}
return invalid();

View File

@ -20,19 +20,24 @@ using mutex_impl = ipc::detail::mutex;
using semaphore_impl = ipc::detail::semaphore;
class condition_impl : public ipc::detail::condition {
ipc::shm::handle h_;
ipc::shm::handle wait_h_, cnt_h_;
public:
bool open(std::string const & name) {
if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long))) {
return ipc::detail::condition::open(name, static_cast<long *>(h_.get()));
if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic<unsigned>)) &&
cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) {
return ipc::detail::condition::open(name,
static_cast<std::atomic<unsigned> *>(wait_h_.get()),
static_cast<long *>(cnt_h_.get()));
}
return false;
}
void close() {
ipc::detail::condition::close();
h_.release();
wait_h_.release();
cnt_h_ .release();
}
bool wait(mutex_impl& mtx) {