From 478cb62c35ac1e8ed37932af5f64916528ee6c02 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Thu, 14 Feb 2019 15:56:01 +0800 Subject: [PATCH] impl ipc objects (mutex, condition, semaphore) --- include/waiter.h | 76 ++++++++++++++++++----- src/platform/waiter_linux.h | 11 +++- src/platform/waiter_wrapper.h | 111 +++++++++++++++++++++++++++++++++- src/waiter.cpp | 110 +++++++++++++++------------------ src/waiter_template.inc | 65 ++++++++++++++++++++ 5 files changed, 293 insertions(+), 80 deletions(-) create mode 100644 src/waiter_template.inc diff --git a/include/waiter.h b/include/waiter.h index c7e2ded..cc3b10d 100644 --- a/include/waiter.h +++ b/include/waiter.h @@ -4,16 +4,17 @@ namespace ipc { -class IPC_EXPORT waiter { +class condition; +class IPC_EXPORT mutex { public: - waiter(); - explicit waiter(char const * name); - waiter(waiter&& rhs); + mutex(); + explicit mutex(char const * name); + mutex(mutex&& rhs); - virtual ~waiter(); + ~mutex(); - void swap(waiter& rhs); - waiter& operator=(waiter rhs); + void swap(mutex& rhs); + mutex& operator=(mutex rhs); bool valid() const; char const * name () const; @@ -21,18 +22,65 @@ public: bool open (char const * name); void close(); - bool wait(); - bool wait_if_pred(); + bool lock(); + bool unlock(); +private: + class mutex_; + mutex_* p_; + + friend class condition; +}; + +class IPC_EXPORT semaphore { +public: + semaphore(); + explicit semaphore(char const * name); + semaphore(semaphore&& rhs); + + ~semaphore(); + + void swap(semaphore& rhs); + semaphore& operator=(semaphore rhs); + + bool valid() const; + char const * name () const; + + bool open (char const * name, long count = 0); + void close(); + + bool wait(); + bool post(long count = 1); + +private: + class semaphore_; + semaphore_* p_; +}; + +class IPC_EXPORT condition { +public: + condition(); + explicit condition(char const * name); + condition(condition&& rhs); + + ~condition(); + + void swap(condition& rhs); + condition& operator=(condition rhs); + + bool valid() const; + char const * name () const; + + bool open (char const * name); + void close(); + + bool wait(mutex&); bool notify(); bool broadcast(); -protected: - virtual bool pred(); - private: - class waiter_; - waiter_* p_; + class condition_; + condition_* p_; }; } // namespace ipc diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index e967a87..5c9d7e1 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -107,11 +107,16 @@ public: class semaphore { mutex lock_; condition cond_; - long volatile counter_ = 0; + long volatile counter_; public: - bool open() { - return lock_.open() && cond_.open(); + bool open(long count = 0) { + if (lock_.open() && cond_.open()) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + counter_ = count; + return true; + } + return false; } void close() { diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 80b7288..d1545a5 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -2,15 +2,120 @@ #include #include +#include +#include "shm.h" + +#include "platform/detail.h" #if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ defined(WINCE) || defined(_WIN32_WCE) + #include "platform/waiter_win.h" -#else + +namespace ipc { +namespace detail { + +using mutex_impl = ipc::detail::mutex; +using semaphore_impl = ipc::detail::semaphore; + +class condition_impl : public ipc::detail::condition { + ipc::shm::handle h_; + +public: + bool open(std::string const & name) { + if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long volatile))) { + return ipc::detail::condition::open(name, static_cast(h_.get())); + } + return false; + } + + void close() { + ipc::detail::condition::close(); + h_.release(); + } + + bool wait(mutex_impl& mtx) { + return ipc::detail::condition::wait_if(mtx, [] { return true; }); + } +}; + +} // namespace detail +} // namespace ipc + +#else /*!WIN*/ + #include "platform/waiter_linux.h" -#endif -#include "platform/detail.h" + +namespace ipc { +namespace detail { + +template +class object_impl { + ipc::shm::handle h_; + + struct info_t { + T object_; + std::atomic opened_; + }; + +public: + T& object() { + return static_cast(h_.get())->object_; + } + + template + bool open(char const * name, P&&... params) { + if (!h_.acquire(name, sizeof(info_t))) { + return false; + } + if ((static_cast(h_.get())->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && + !static_cast(h_.get())->object_.open(std::forward

(params)...)) { + return false; + } + return true; + } + + void close() { + if (!h_.valid()) return; + if (static_cast(h_.get())->opened_.fetch_sub(1, std::memory_order_release) == 1) { + static_cast(h_.get())->object_.close(); + } + h_.release(); + } +}; + +class mutex_impl : public object_impl { +public: + bool lock () { return object().lock (); } + bool unlock() { return object().unlock(); } +}; + +class semaphore_impl : public object_impl { +public: + bool wait() { + return object().wait_if([] { return true; }); + } + + bool post(long count) { + return object().post([count] { return count; }); + } +}; + +class condition_impl : public object_impl { +public: + bool wait(mutex_impl& mtx) { + return object().wait(mtx.object()); + } + + bool notify () { return object().notify (); } + bool broadcast() { return object().broadcast(); } +}; + +} // namespace detail +} // namespace ipc + +#endif/*!WIN*/ namespace ipc { namespace detail { diff --git a/src/waiter.cpp b/src/waiter.cpp index 8616064..5ff6cd2 100644 --- a/src/waiter.cpp +++ b/src/waiter.cpp @@ -5,82 +5,72 @@ #include "def.h" #include "platform/waiter_wrapper.h" +#undef IPC_PP_CAT_ +#undef IPC_PP_JOIN_T__ +#undef IPC_PP_JOIN_ + +#define IPC_PP_CAT_(X, ...) X##__VA_ARGS__ +#define IPC_PP_JOIN_T__(X, ...) IPC_PP_CAT_(X, __VA_ARGS__) +#define IPC_PP_JOIN_(X, ...) IPC_PP_JOIN_T__(X, __VA_ARGS__) + namespace ipc { -class waiter::waiter_ : public pimpl { -public: - std::string n_; +#undef IPC_OBJECT_TYPE_ +#undef IPC_OBJECT_TYPE_OPEN_PARS_ +#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - detail::waiter_wrapper w_ { new detail::waiter }; - ~waiter_() { delete w_.waiter(); } -}; +#define IPC_OBJECT_TYPE_ mutex +#define IPC_OBJECT_TYPE_OPEN_PARS_ +#define IPC_OBJECT_TYPE_OPEN_ARGS_ -waiter::waiter() - : p_(p_->make()) { +#include "waiter_template.inc" + +bool mutex::lock() { + return impl(p_)->h_.lock(); } -waiter::waiter(char const * name) - : waiter() { - open(name); +bool mutex::unlock() { + return impl(p_)->h_.unlock(); } -waiter::waiter(waiter&& rhs) - : waiter() { - swap(rhs); +#undef IPC_OBJECT_TYPE_ +#undef IPC_OBJECT_TYPE_OPEN_PARS_ +#undef IPC_OBJECT_TYPE_OPEN_ARGS_ + +#define IPC_OBJECT_TYPE_ semaphore +#define IPC_OBJECT_TYPE_OPEN_PARS_ , long count +#define IPC_OBJECT_TYPE_OPEN_ARGS_ , count + +#include "waiter_template.inc" + +bool semaphore::wait() { + return impl(p_)->h_.wait(); } -waiter::~waiter() { - p_->clear(); +bool semaphore::post(long count) { + return impl(p_)->h_.post(count); } -void waiter::swap(waiter& rhs) { - std::swap(p_, rhs.p_); +#undef IPC_OBJECT_TYPE_ +#undef IPC_OBJECT_TYPE_OPEN_PARS_ +#undef IPC_OBJECT_TYPE_OPEN_ARGS_ + +#define IPC_OBJECT_TYPE_ condition +#define IPC_OBJECT_TYPE_OPEN_PARS_ +#define IPC_OBJECT_TYPE_OPEN_ARGS_ + +#include "waiter_template.inc" + +bool condition::wait(mutex& mtx) { + return impl(p_)->h_.wait(impl(mtx.p_)->h_); } -waiter& waiter::operator=(waiter rhs) { - swap(rhs); - return *this; +bool condition::notify() { + return impl(p_)->h_.notify(); } -bool waiter::valid() const { - return impl(p_)->w_.valid(); -} - -char const * waiter::name() const { - return impl(p_)->n_.c_str(); -} - -bool waiter::open(char const * name) { - if (impl(p_)->w_.open(name)) { - impl(p_)->n_ = name; - return true; - } - return false; -} - -void waiter::close() { - impl(p_)->w_.close(); - impl(p_)->n_.clear(); -} - -bool waiter::wait() { - return impl(p_)->w_.wait_if([] { return true; }); -} - -bool waiter::pred() { - return true; -} - -bool waiter::wait_if_pred() { - return impl(p_)->w_.wait_if([this] { return pred(); }); -} - -bool waiter::notify() { - return impl(p_)->w_.notify(); -} - -bool waiter::broadcast() { - return impl(p_)->w_.broadcast(); +bool condition::broadcast() { + return impl(p_)->h_.broadcast(); } } // namespace ipc diff --git a/src/waiter_template.inc b/src/waiter_template.inc new file mode 100644 index 0000000..49e84ea --- /dev/null +++ b/src/waiter_template.inc @@ -0,0 +1,65 @@ + +#undef IPC_OBJECT_TYPE_P_ +#undef IPC_OBJECT_TYPE_I_ + +#define IPC_OBJECT_TYPE_P_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _) +#define IPC_OBJECT_TYPE_I_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _impl) + +class IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_P_ : public pimpl { +public: + std::string n_; + ipc::detail::IPC_OBJECT_TYPE_I_ h_; +}; + +IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_() + : p_(p_->make()) { +} + +IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(char const * name) + : IPC_OBJECT_TYPE_() { + open(name); +} + +IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(IPC_OBJECT_TYPE_&& rhs) + : IPC_OBJECT_TYPE_() { + swap(rhs); +} + +IPC_OBJECT_TYPE_::~IPC_OBJECT_TYPE_() { + p_->clear(); +} + +void IPC_OBJECT_TYPE_::swap(IPC_OBJECT_TYPE_& rhs) { + std::swap(p_, rhs.p_); +} + +IPC_OBJECT_TYPE_& IPC_OBJECT_TYPE_::operator=(IPC_OBJECT_TYPE_ rhs) { + swap(rhs); + return *this; +} + +bool IPC_OBJECT_TYPE_::valid() const { + return !impl(p_)->n_.empty(); +} + +char const * IPC_OBJECT_TYPE_::name() const { + return impl(p_)->n_.c_str(); +} + +bool IPC_OBJECT_TYPE_::open(char const * name IPC_OBJECT_TYPE_OPEN_PARS_) { + if (name == nullptr || name[0] == '\0') { + return false; + } + if (impl(p_)->n_ == name) return true; + close(); + if (impl(p_)->h_.open(name IPC_OBJECT_TYPE_OPEN_ARGS_)) { + impl(p_)->n_ = name; + return true; + } + return false; +} + +void IPC_OBJECT_TYPE_::close() { + impl(p_)->h_.close(); + impl(p_)->n_.clear(); +}