try semaphore

This commit is contained in:
mutouyun 2019-01-25 01:22:56 +08:00
parent a4b93f60cf
commit 3dc97ab6a6
3 changed files with 54 additions and 65 deletions

View File

@ -1,11 +1,14 @@
#pragma once #pragma once
#include <pthread.h> #include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <cstring> #include <cstring>
#include <atomic> #include <atomic>
#include "def.h" #include "def.h"
#include "rw_lock.h"
#include "platform/detail.h" #include "platform/detail.h"
@ -13,92 +16,70 @@ namespace ipc {
namespace detail { namespace detail {
class waiter { class waiter {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; std::atomic<unsigned> rc_ { 0 };
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
std::atomic<unsigned> counter_ { 0 }; std::atomic<unsigned> counter_ { 0 };
spin_lock lc_;
public: public:
using handle_t = bool; using handle_t = sem_t*;
private:
bool post(handle_t h) {
for (unsigned k = 0;;) {
auto c = counter_.load(std::memory_order_acquire);
if (c == 0) return false;
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) {
break;
}
ipc::yield(k);
}
return ::sem_post(h) == 0;
}
public: public:
constexpr static handle_t invalid() { constexpr static handle_t invalid() {
return false; return SEM_FAILED;
} }
handle_t open(char const * name) { handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid(); if (name == nullptr || name[0] == '\0') return invalid();
if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) { rc_.fetch_add(1, std::memory_order_relaxed);
// init mutex std::atomic_thread_fence(std::memory_order_release);
pthread_mutexattr_t mutex_attr; return ::sem_open(name, O_CREAT | O_RDWR,
if (::pthread_mutexattr_init(&mutex_attr) != 0) { S_IRUSR | S_IWUSR |
::printf("fail pthread_mutexattr_init\n"); S_IRGRP | S_IWGRP |
return invalid(); S_IROTH | S_IWOTH, 0);
}
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if (::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED) != 0) {
::printf("fail pthread_mutexattr_setpshared\n");
return invalid();
}
if (::pthread_mutex_init(&mutex_, &mutex_attr) != 0) {
::printf("fail pthread_mutex_init\n");
return invalid();
}
auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy);
// init condition
pthread_condattr_t cond_attr;
if (::pthread_condattr_init(&cond_attr) != 0) {
::printf("fail pthread_condattr_init\n");
return invalid();
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if (::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED) != 0) {
::printf("fail pthread_condattr_setpshared\n");
return invalid();
}
if (::pthread_cond_init(&cond_, &cond_attr) != 0) {
::printf("fail pthread_cond_init\n");
return invalid();
}
// no need to guard condition
// release guards
guard_mutex.release();
}
return true;
} }
void close(handle_t h) { void close(handle_t h, char const * name) {
if (h == invalid()) return; if (h == invalid()) return;
::printf("closing...\n"); if (name == nullptr || name[0] == '\0') return;
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { ::sem_close(h);
::pthread_cond_destroy(&cond_); if (rc_.fetch_sub(1, std::memory_order_acquire) == 1) {
::pthread_mutex_destroy(&mutex_); ::sem_unlink(name);
::printf("destroy end...\n");
} }
} }
bool wait(handle_t h) { bool wait(handle_t h) {
if (h == invalid()) return false; if (h == invalid()) return false;
::printf("wait...\n"); {
if (::pthread_mutex_lock(&mutex_) != 0) { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
return false; counter_.fetch_add(1, std::memory_order_relaxed);
} }
IPC_UNUSED_ auto guard = unique_ptr(&mutex_, ::pthread_mutex_unlock); bool ret = (::sem_wait(h) == 0);
if (::pthread_cond_wait(&cond_, &mutex_) != 0) { return ret;
return false;
}
return true;
} }
void notify(handle_t h) { void notify(handle_t h) {
if (h == invalid()) return; if (h == invalid()) return;
::printf("notify...\n"); post(h);
::pthread_cond_signal(&cond_);
} }
void broadcast(handle_t h) { void broadcast(handle_t h) {
if (h == invalid()) return; if (h == invalid()) return;
::printf("broadcast...\n"); IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
::pthread_cond_broadcast(&cond_); while (post(h)) ;
} }
}; };

View File

@ -27,16 +27,20 @@ public:
return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str()); return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str());
} }
void close(handle_t h) { void close(handle_t h, char const * /*name*/) {
if (h == invalid()) return;
::CloseHandle(h); ::CloseHandle(h);
} }
bool wait(handle_t h) { bool wait(handle_t h) {
counter_.fetch_add(1, std::memory_order_release); if (h == invalid()) return false;
counter_.fetch_add(1, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0; return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;
} }
void notify(handle_t h) { void notify(handle_t h) {
if (h == invalid()) return;
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto c = counter_.load(std::memory_order_acquire); auto c = counter_.load(std::memory_order_acquire);
if (c == 0) return; if (c == 0) return;
@ -49,6 +53,7 @@ public:
} }
void broadcast(handle_t h) { void broadcast(handle_t h) {
if (h == invalid()) return;
::ReleaseSemaphore(h, counter_.exchange(0, std::memory_order_acquire), NULL); ::ReleaseSemaphore(h, counter_.exchange(0, std::memory_order_acquire), NULL);
} }
}; };

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <string>
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ #if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE) defined(WINCE) || defined(_WIN32_WCE)
@ -18,6 +20,7 @@ public:
private: private:
waiter_t* w_ = nullptr; waiter_t* w_ = nullptr;
waiter_t::handle_t h_ = waiter_t::invalid(); waiter_t::handle_t h_ = waiter_t::invalid();
std::string n_;
public: public:
waiter_wrapper() = default; waiter_wrapper() = default;
@ -41,15 +44,15 @@ public:
bool open(char const * name) { bool open(char const * name) {
if (w_ == nullptr) return false; if (w_ == nullptr) return false;
close(); close();
h_ = w_->open(name); h_ = w_->open((n_ = name).c_str());
::printf("%s: %p\n", name, h_); ::printf("%s: %p\n", name, h_);
return valid(); return valid();
} }
void close() { void close() {
if (!valid()) return; if (!valid()) return;
::printf("close %p\n", h_); ::printf("close %s: %p\n", n_.c_str(), h_);
w_->close(h_); w_->close(h_, n_.c_str());
h_ = waiter_t::invalid(); h_ = waiter_t::invalid();
} }