add waiter for long-time wait. (TBD)

This commit is contained in:
mutouyun 2019-01-15 21:22:09 +08:00
parent 8af8f99df3
commit d9e24236af
15 changed files with 413 additions and 63 deletions

View File

@ -12,33 +12,39 @@ DESTDIR = ../output
INCLUDEPATH += \
../include \
../src \
../src/platform
../src
HEADERS += \
../include/export.h \
../include/shm.h \
../include/circ_elems_array.h \
../include/circ_elem_array.h \
../include/circ_queue.h \
../include/ipc.h \
../include/def.h \
../include/shm.h \
../include/elem_def.h \
../include/elem_circ.h \
../include/elem_link.h \
../include/waiter.h \
../include/queue.h \
../include/ipc.h \
../include/rw_lock.h \
../include/tls_pointer.h \
../include/pool_alloc.h \
../include/buffer.h \
../src/memory/alloc.hpp \
../src/memory/wrapper.hpp \
../src/memory/resource.hpp
../src/memory/resource.hpp \
../src/platform/waiter.h
SOURCES += \
../src/shm.cpp \
../src/ipc.cpp \
../src/pool_alloc.cpp \
../src/buffer.cpp
../src/buffer.cpp \
../src/waiter.cpp
unix {
HEADERS += \
../src/platform/waiter_linux.h
SOURCES += \
../src/platform/shm_linux.cpp \
../src/platform/tls_pointer_linux.cpp
@ -52,6 +58,10 @@ INSTALLS += target
else:win32 {
HEADERS += \
../src/platform/to_tchar.h \
../src/platform/waiter_win.h
SOURCES += \
../src/platform/shm_win.cpp \
../src/platform/tls_pointer_win.cpp

View File

@ -264,6 +264,9 @@ public:
elem_array(const elem_array&) = delete;
elem_array& operator=(const elem_array&) = delete;
auto & waiter() { return head_.waiter_; }
auto const & waiter() const { return head_.waiter_; }
std::size_t connect () noexcept { return head_.connect (); }
std::size_t disconnect() noexcept { return head_.disconnect(); }
std::size_t conn_count() const noexcept { return head_.conn_count(); }

View File

@ -4,10 +4,13 @@
#include <cstddef>
#include <cstdint>
#include "platform/waiter.h"
namespace ipc {
template <typename U2>
struct conn_head {
ipc::detail::waiter waiter_;
std::atomic<U2> cc_ { 0 }; // connection counter
std::size_t connect() noexcept {

View File

@ -30,32 +30,32 @@ struct IPC_EXPORT channel_detail {
};
template <typename Detail>
class channel_ipml {
class channel_impl {
private:
handle_t h_ = nullptr;
std::string n_;
public:
channel_ipml() = default;
channel_impl() = default;
explicit channel_ipml(char const * name) {
explicit channel_impl(char const * name) {
this->connect(name);
}
channel_ipml(channel_ipml&& rhs) {
channel_impl(channel_impl&& rhs) {
swap(rhs);
}
~channel_ipml() {
~channel_impl() {
disconnect();
}
void swap(channel_ipml& rhs) {
void swap(channel_impl& rhs) {
std::swap(h_, rhs.h_);
n_.swap(rhs.n_);
}
channel_ipml& operator=(channel_ipml rhs) {
channel_impl& operator=(channel_impl rhs) {
swap(rhs);
return *this;
}
@ -72,8 +72,8 @@ public:
return (handle() != nullptr);
}
channel_ipml clone() const {
return channel_ipml { name() };
channel_impl clone() const {
return channel_impl { name() };
}
bool connect(char const * name) {
@ -99,7 +99,7 @@ public:
}
static void wait_for_recv(char const * name, std::size_t r_count) {
return channel_ipml(name).wait_for_recv(r_count);
return channel_impl(name).wait_for_recv(r_count);
}
void clear_recv() {
@ -137,7 +137,7 @@ public:
* A route could only be used in 1 to N
* (one producer/server/sender to multi consumers/clients/receivers)
*/
using route = channel_ipml<channel_detail<
using route = channel_impl<channel_detail<
ipc::queue, ipc::prod_cons_circ<relat::single, relat::multi, trans::broadcast>
>>;
@ -149,7 +149,7 @@ using route = channel_ipml<channel_detail<
* would receive your sent messages.
*/
using channel = channel_ipml<channel_detail<
using channel = channel_impl<channel_detail<
ipc::queue, ipc::prod_cons_circ<relat::multi, relat::multi, trans::broadcast>
>>;

View File

@ -8,11 +8,14 @@
#include <tuple>
#include <thread>
#include <chrono>
#include <string>
#include "def.h"
#include "rw_lock.h"
#include "elem_circ.h"
#include "platform/waiter.h"
namespace ipc {
template <typename T,
@ -24,14 +27,15 @@ public:
private:
elems_t* elems_ = nullptr;
ipc::detail::waiter_impl wi_;
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
std::atomic_bool connected_ { false };
public:
queue() = default;
explicit queue(elems_t* els) : queue() {
attach(els);
explicit queue(elems_t* els, char const * name = nullptr) : queue() {
attach(els, name);
}
queue(const queue&) = delete;
@ -73,10 +77,18 @@ public:
return connected_.load(std::memory_order_acquire);
}
elems_t* attach(elems_t* els) noexcept {
elems_t* attach(elems_t* els, char const * name = nullptr) noexcept {
if (els == nullptr) return nullptr;
auto old = elems_;
elems_ = els;
if (name == nullptr) {
wi_.close();
wi_.attach(nullptr);
}
else {
wi_.attach(&(elems_->waiter()));
wi_.open((std::string{ "__IPC_WAITER__" } +name).c_str());
}
cursor_ = elems_->cursor();
return old;
}
@ -91,9 +103,13 @@ public:
template <typename... P>
auto push(P&&... params) noexcept {
if (elems_ == nullptr) return false;
return elems_->push([&](void* p) {
if (elems_->push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
});
})) {
wi_.notify();
return true;
}
return false;
}
T pop() noexcept {
@ -107,7 +123,7 @@ public:
})) {
return item;
}
ipc::sleep(k);
ipc::sleep(k, [this] { return wi_.wait(); });
}
}
};

View File

@ -5,6 +5,7 @@
#include <chrono>
#include <limits>
#include <type_traits>
#include <utility>
////////////////////////////////////////////////////////////////
/// Gives hint to processor that improves performance of spin-wait loops.
@ -69,11 +70,14 @@ inline void yield(K& k) noexcept {
++k;
}
template <std::size_t N = 4096, typename K>
inline void sleep(K& k) noexcept {
template <std::size_t N = 4096, typename K, typename F>
inline void sleep(K& k, F&& f) noexcept {
if (k < static_cast<K>(N)) {
std::this_thread::yield();
}
else if (std::forward<F>(f)()) {
return;
}
else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return;
@ -81,6 +85,11 @@ inline void sleep(K& k) noexcept {
++k;
}
template <std::size_t N = 4096, typename K>
inline void sleep(K& k) noexcept {
sleep<N>(k, []() constexpr { return false; });
}
} // namespace ipc
#pragma pop_macro("IPC_LOCK_PAUSE_")

33
include/waiter.h Normal file
View File

@ -0,0 +1,33 @@
#pragma once
#include "export.h"
namespace ipc {
class IPC_EXPORT waiter {
public:
waiter();
explicit waiter(char const * name);
waiter(waiter&& rhs);
~waiter();
void swap(waiter& rhs);
waiter& operator=(waiter rhs);
bool valid() const;
char const * name () const;
bool open (char const * name);
void close();
bool wait();
bool notify();
bool broadcast();
private:
class waiter_;
waiter_* p_;
};
} // namespace ipc

View File

@ -24,10 +24,10 @@ inline auto acc_of_msg() {
}
template <template <typename...> class Queue, typename Policy>
struct detail;
struct detail_impl;
template <typename Policy>
struct detail<ipc::queue, Policy> {
struct detail_impl<ipc::queue, Policy> {
#pragma pack(1)
struct msg_t {
@ -98,7 +98,7 @@ static handle_t connect(char const * name) {
if (mem == nullptr) {
return nullptr;
}
return new queue_t { &(static_cast<shm_info_t*>(mem)->elems_) };
return new queue_t { &(static_cast<shm_info_t*>(mem)->elems_), name };
}
static void disconnect(handle_t h) {
@ -215,7 +215,7 @@ static buff_t recv(handle_t h) {
}
}
}; // detail<ipc::queue>
}; // detail_impl<ipc::queue>
} // internal-linkage
@ -223,42 +223,42 @@ namespace ipc {
template <template <typename...> class Queue, typename Policy>
handle_t channel_detail<Queue, Policy>::connect(char const * name) {
return detail<Queue, Policy>::connect(name);
return detail_impl<Queue, Policy>::connect(name);
}
template <template <typename...> class Queue, typename Policy>
void channel_detail<Queue, Policy>::disconnect(handle_t h) {
detail<Queue, Policy>::disconnect(h);
detail_impl<Queue, Policy>::disconnect(h);
}
template <template <typename...> class Queue, typename Policy>
std::size_t channel_detail<Queue, Policy>::recv_count(handle_t h) {
return detail<Queue, Policy>::recv_count(h);
return detail_impl<Queue, Policy>::recv_count(h);
}
template <template <typename...> class Queue, typename Policy>
void channel_detail<Queue, Policy>::wait_for_recv(handle_t h, std::size_t r_count) {
return detail<Queue, Policy>::wait_for_recv(h, r_count);
return detail_impl<Queue, Policy>::wait_for_recv(h, r_count);
}
template <template <typename...> class Queue, typename Policy>
void channel_detail<Queue, Policy>::clear_recv(handle_t h) {
detail<Queue, Policy>::clear_recv(h);
detail_impl<Queue, Policy>::clear_recv(h);
}
template <template <typename...> class Queue, typename Policy>
void channel_detail<Queue, Policy>::clear_recv(char const * name) {
detail<Queue, Policy>::clear_recv(name);
detail_impl<Queue, Policy>::clear_recv(name);
}
template <template <typename...> class Queue, typename Policy>
bool channel_detail<Queue, Policy>::send(handle_t h, void const * data, std::size_t size) {
return detail<Queue, Policy>::send(h, data, size);
return detail_impl<Queue, Policy>::send(h, data, size);
}
template <template <typename...> class Queue, typename Policy>
buff_t channel_detail<Queue, Policy>::recv(handle_t h) {
return detail<Queue, Policy>::recv(h);
return detail_impl<Queue, Policy>::recv(h);
}
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::single, relat::single, trans::unicast >>;

View File

@ -38,7 +38,8 @@ void* acquire(char const * name, std::size_t size) {
if (name == nullptr || name[0] == '\0' || size == 0) {
return nullptr;
}
int fd = ::shm_open(name, O_CREAT | O_RDWR,
int fd = ::shm_open((std::string{"__IPC_SHM__"} + name).c_str(),
O_CREAT | O_RDWR,
S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH);

View File

@ -2,31 +2,16 @@
#include <Windows.h>
#include <type_traits>
#include <string>
#include <locale>
#include <codecvt>
#include <utility>
#include "def.h"
#include "tls_pointer.h"
#include "platform/to_tchar.h"
#include "memory/resource.hpp"
namespace {
template <typename T, typename S, typename R = S>
using IsSame = ipc::Requires<std::is_same<T, typename S::value_type>::value, R>;
template <typename T = TCHAR>
constexpr auto to_tchar(std::string && str) -> IsSame<T, std::string, std::string &&> {
return std::move(str);
}
template <typename T = TCHAR>
constexpr auto to_tchar(std::string && str) -> IsSame<T, std::wstring> {
return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str));
}
inline auto& m2h() {
static ipc::tls::pointer<ipc::mem::unordered_map<void*, HANDLE>> cache;
return *cache.create();
@ -44,7 +29,7 @@ void* acquire(char const * name, std::size_t size) {
HANDLE h = ::CreateFileMapping(INVALID_HANDLE_VALUE, NULL,
PAGE_READWRITE | SEC_COMMIT,
0, static_cast<DWORD>(size),
to_tchar(std::string{"__SHM__"} + name).c_str());
ipc::detail::to_tchar(std::string{"__IPC_SHM__"} + name).c_str());
if (h == NULL) {
return nullptr;
}

55
src/platform/to_tchar.h Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <tchar.h>
#include <type_traits>
#include <string>
#include <locale>
#include <codecvt>
#include <algorithm>
#include <cstring>
#include "def.h"
namespace ipc::detail {
struct has_value_type_ {
template <typename T> static std::true_type check(typename T::value_type *);
template <typename T> static std::false_type check(...);
};
template <typename T, typename U, typename = decltype(has_value_type_::check<U>(nullptr))>
struct is_same_char : std::is_same<T, U> {};
template <typename T, typename U>
struct is_same_char<T, U, std::true_type> : std::is_same<T, typename U::value_type> {};
template <typename T, typename S, typename R = S>
using IsSameChar = ipc::Requires<is_same_char<T, S>::value, R>;
////////////////////////////////////////////////////////////////
/// to_tchar implementation
////////////////////////////////////////////////////////////////
template <typename T = TCHAR>
constexpr auto to_tchar(std::string && str) -> IsSameChar<T, std::string, std::string &&> {
return std::move(str);
}
template <typename T = TCHAR>
constexpr auto to_tchar(std::string && str) -> IsSameChar<T, std::wstring> {
return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str));
}
template <typename T>
inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar<T, char, void> {
std::memcpy(dst, src, size);
}
template <typename T>
inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar<T, wchar_t, void> {
auto wstr = std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(src, src + size);
std::memcpy(dst, wstr.data(), (std::min)(wstr.size(), size));
}
} // namespace ipc::detail

77
src/platform/waiter.h Normal file
View File

@ -0,0 +1,77 @@
#pragma once
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
#include "platform/waiter_win.h"
#else
#include "platform/waiter_linux.h"
#endif
namespace ipc::detail {
class waiter_impl {
public:
using waiter_t = detail::waiter;
private:
waiter_t* w_ = nullptr;
waiter_t::handle_t h_ = waiter_t::invalid();
std::string n_;
public:
waiter_impl() = default;
explicit waiter_impl(waiter_t* w) {
attach(w);
}
waiter_impl(const waiter_impl&) = delete;
waiter_impl& operator=(const waiter_impl&) = delete;
waiter_t * waiter() { return w_; }
waiter_t const * waiter() const { return w_; }
void attach(waiter_t* w) {
w_ = w;
}
bool valid() const {
return (w_ != nullptr) && (h_ != waiter_t::invalid());
}
char const * name() const {
return n_.c_str();
}
bool open(char const * name) {
if (w_ == nullptr) return false;
close();
h_ = w_->open(name);
return valid();
}
void close() {
if (!valid()) return;
w_->close(h_);
h_ = waiter_t::invalid();
n_.clear();
}
bool wait() {
if (!valid()) return false;
return w_->wait(h_);
}
bool notify() {
if (!valid()) return false;
w_->notify(h_);
return true;
}
bool broadcast() {
if (!valid()) return false;
w_->broadcast(h_);
return true;
}
};
} // namespace ipc::detail

View File

@ -0,0 +1,32 @@
#pragma once
namespace ipc::detail {
class waiter {
public:
using handle_t = void*;
constexpr static handle_t invalid() {
return nullptr;
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
return invalid();
}
static void close(handle_t /*h*/) {
}
bool wait(handle_t /*h*/) {
return false;
}
void notify(handle_t /*h*/) {
}
void broadcast(handle_t /*h*/) {
}
};
} // namespace ipc::detail

55
src/platform/waiter_win.h Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <Windows.h>
#include <algorithm>
#include <iterator>
#include <atomic>
#include "rw_lock.h"
#include "platform/to_tchar.h"
namespace ipc::detail {
class waiter {
std::atomic<long> counter_ { 0 };
public:
using handle_t = HANDLE;
constexpr static handle_t invalid() {
return NULL;
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str());
}
static void close(handle_t h) {
::CloseHandle(h);
}
bool wait(handle_t h) {
counter_.fetch_add(1, std::memory_order_release);
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;
}
void notify(handle_t h) {
for (unsigned k = 0;;) {
auto c = counter_.load(std::memory_order_acquire);
if (c == 0) return;
if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_relaxed)) {
break;
}
ipc::yield(k);
}
::ReleaseSemaphore(h, 1, NULL);
}
void broadcast(handle_t h) {
::ReleaseSemaphore(h, counter_.exchange(0, std::memory_order_acquire), NULL);
}
};
} // namespace ipc::detail

71
src/waiter.cpp Normal file
View File

@ -0,0 +1,71 @@
#include "waiter.h"
#include <string>
#include "def.h"
#include "platform/waiter.h"
namespace ipc {
class waiter::waiter_ : public pimpl<waiter_> {
public:
detail::waiter_impl w_ { new detail::waiter };
~waiter_() { delete w_.waiter(); }
};
waiter::waiter()
: p_(p_->make()) {
}
waiter::waiter(char const * name)
: waiter() {
open(name);
}
waiter::waiter(waiter&& rhs)
: waiter() {
swap(rhs);
}
waiter::~waiter() {
p_->clear();
}
void waiter::swap(waiter& rhs) {
std::swap(p_, rhs.p_);
}
waiter& waiter::operator=(waiter rhs) {
swap(rhs);
return *this;
}
bool waiter::valid() const {
return impl(p_)->w_.valid();
}
char const * waiter::name() const {
return impl(p_)->w_.name();
}
bool waiter::open(char const * name) {
return impl(p_)->w_.open(name);
}
void waiter::close() {
impl(p_)->w_.close();
}
bool waiter::wait() {
return impl(p_)->w_.wait();
}
bool waiter::notify() {
return impl(p_)->w_.notify();
}
bool waiter::broadcast() {
return impl(p_)->w_.broadcast();
}
} // namespace ipc