优化ut,修正tls中的bug

This commit is contained in:
mutouyun 2020-09-13 15:06:47 +08:00 committed by wood.zhang
parent 64f4104b74
commit 326bc10b2d
31 changed files with 1710 additions and 1452 deletions

22
3rdparty/capo/make.hpp vendored Executable file
View File

@ -0,0 +1,22 @@
/*
The Capo Library
Code covered by the MIT License
Author: mutouyun (http://orzz.org)
*/
#pragma once
#include <type_traits> // std::decay
#include <utility> // std::forward
namespace capo
{
template <template <typename...> class Ret, typename... T>
using return_t = Ret<typename std::decay<T>::type...>;
template <template <typename...> class Ret, typename... T>
inline return_t<Ret, T...> make(T&&... args)
{
return return_t<Ret, T...>(std::forward<T>(args)...);
}
}

25
3rdparty/capo/noncopyable.hpp vendored Executable file
View File

@ -0,0 +1,25 @@
/*
The Capo Library
Code covered by the MIT License
Author: mutouyun (http://orzz.org)
*/
#pragma once
namespace capo {
////////////////////////////////////////////////////////////////
class noncopyable
{
protected:
noncopyable(void) = default;
~noncopyable(void) = default;
public:
noncopyable(const noncopyable&) = delete;
noncopyable& operator=(const noncopyable&) = delete;
};
////////////////////////////////////////////////////////////////
} // namespace capo

103
3rdparty/capo/scope_guard.hpp vendored Executable file
View File

@ -0,0 +1,103 @@
/*
The Capo Library
Code covered by the MIT License
Author: mutouyun (http://orzz.org)
*/
#pragma once
#include "capo/noncopyable.hpp"
#include "capo/unused.hpp"
#include "capo/make.hpp"
#include <utility> // std::forward, std::move
#include <algorithm> // std::swap
#include <functional> // std::function
namespace capo {
////////////////////////////////////////////////////////////////
/// Execute guard function when the enclosing scope exits
////////////////////////////////////////////////////////////////
template <typename F = std::function<void()>>
class scope_guard : capo::noncopyable
{
F destructor_;
mutable bool dismiss_;
public:
template <typename F_>
scope_guard(F_&& destructor)
: destructor_(std::forward<F_>(destructor))
, dismiss_(false)
{}
scope_guard(scope_guard&& rhs)
: destructor_(std::move(rhs.destructor_))
, dismiss_(true) // dismiss rhs
{
std::swap(dismiss_, rhs.dismiss_);
}
~scope_guard(void)
{
try { do_exit(); }
/*
In the realm of exceptions, it is fundamental that you can do nothing
if your "undo/recover" action fails.
*/
catch(...) { /* Do nothing */ }
}
void dismiss(void) const noexcept
{
dismiss_ = true;
}
void do_exit(void)
{
if (!dismiss_)
{
dismiss_ = true;
destructor_();
}
}
void swap(scope_guard& rhs)
{
std::swap(destructor_, rhs.destructor_);
std::swap(dismiss_, rhs.dismiss_);
}
};
namespace detail_scope_guard {
struct helper
{
template <typename F>
auto operator=(F&& destructor) -> decltype(capo::make<scope_guard>(std::forward<F>(destructor)))
{
return capo::make<scope_guard>(std::forward<F>(destructor));
}
};
} // namespace detail_scope_guard
#define CAPO_SCOPE_GUARD_V_(L) CAPO_UNUSED_ scope_guard_##L##__
#define CAPO_SCOPE_GUARD_L_(L) auto CAPO_SCOPE_GUARD_V_(L) = capo::detail_scope_guard::helper{}
/*
Do things like this:
-->
CAPO_SCOPE_GUARD_ = [ptr]
{
if (ptr) free(ptr);
};
*/
#define CAPO_SCOPE_GUARD_ CAPO_SCOPE_GUARD_L_(__LINE__)
////////////////////////////////////////////////////////////////
} // namespace capo

320
3rdparty/capo/type_name.hpp vendored Executable file
View File

@ -0,0 +1,320 @@
/*
The Capo Library
Code covered by the MIT License
Author: mutouyun (http://orzz.org)
*/
#pragma once
#include "capo/scope_guard.hpp"
#include <typeinfo> // typeid
#include <sstream> // std::ostringstream, std::string
#include <type_traits> // std::is_array
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/
namespace capo {
namespace detail_type_name {
////////////////////////////////////////////////////////////////
/// Ready for output
////////////////////////////////////////////////////////////////
// Forward declarations
template <typename T, bool IsBase = false>
struct check;
/*
Output state management
*/
class output
{
bool is_compact_ = true;
template <typename T>
bool check_empty(const T&) { return false; }
bool check_empty(const char* val)
{
return (!val) || (val[0] == 0);
}
template <typename T>
void out(const T& val)
{
if (check_empty(val)) return;
if (!is_compact_) sr_ += " ";
std::ostringstream ss;
ss << val;
sr_ += ss.str();
is_compact_ = false;
}
std::string& sr_;
public:
output(std::string& sr) : sr_(sr) {}
output& operator()(void) { return (*this); }
template <typename T1, typename... T>
output& operator()(const T1& val, const T&... args)
{
out(val);
return operator()(args...);
}
output& compact(void)
{
is_compact_ = true;
return (*this);
}
};
// ()
template <bool>
struct bracket
{
output& out_;
bracket(output& out, const char* = nullptr) : out_(out)
{ out_("(").compact(); }
~bracket(void)
{ out_.compact()(")"); }
};
template <>
struct bracket<false>
{
bracket(output& out, const char* str = nullptr)
{ out(str); }
};
// [N]
template <size_t N = 0>
struct bound
{
output& out_;
bound(output& out) : out_(out) {}
~bound(void)
{
if (N == 0) out_("[]");
else out_("[").compact()
( N ).compact()
("]");
}
};
// (P1, P2, ...)
template <bool, typename... P>
struct parameter;
template <bool IsStart>
struct parameter<IsStart>
{
output& out_;
parameter(output& out) : out_(out) {}
~parameter(void)
{ bracket<IsStart> { out_ }; }
};
template <bool IsStart, typename P1, typename... P>
struct parameter<IsStart, P1, P...>
{
output& out_;
parameter(output& out) : out_(out) {}
~parameter(void)
{
[this](bracket<IsStart>&&)
{
check<P1> { out_ };
parameter<false, P...> { out_.compact() };
} (bracket<IsStart> { out_, "," });
}
};
////////////////////////////////////////////////////////////////
/// Template specializations for checking
////////////////////////////////////////////////////////////////
/*
CV-qualifiers, references, pointers
*/
template <typename T, bool IsBase>
struct check
{
output out_;
check(const output& out) : out_(out)
{
# if defined(__GNUC__)
const char* typeid_name = typeid(T).name();
char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr);
CAPO_SCOPE_GUARD_ = [real_name]
{
if (real_name) ::free(real_name);
};
out_(real_name ? real_name : typeid_name);
# else /*__GNUC__*/
out_(typeid(T).name());
# endif/*__GNUC__*/
}
};
#pragma push_macro("CAPO_CHECK_TYPE__")
#undef CAPO_CHECK_TYPE__
#define CAPO_CHECK_TYPE__(OPT) \
template <typename T, bool IsBase> \
struct check<T OPT, IsBase> : check<T, true> \
{ \
using base_t = check<T, true>; \
using base_t::out_; \
check(const output& out) : base_t(out) { out_(#OPT); } \
};
CAPO_CHECK_TYPE__(const)
CAPO_CHECK_TYPE__(volatile)
CAPO_CHECK_TYPE__(const volatile)
CAPO_CHECK_TYPE__(&)
CAPO_CHECK_TYPE__(&&)
CAPO_CHECK_TYPE__(*)
#pragma pop_macro("CAPO_CHECK_TYPE__")
/*
Arrays
*/
#pragma push_macro("CAPO_CHECK_TYPE_ARRAY__")
#pragma push_macro("CAPO_CHECK_TYPE_ARRAY_CV__")
#pragma push_macro("CAPO_CHECK_TYPE_PLACEHOLDER__")
#undef CAPO_CHECK_TYPE_ARRAY__
#undef CAPO_CHECK_TYPE_ARRAY_CV__
#undef CAPO_CHECK_TYPE_PLACEHOLDER__
#define CAPO_CHECK_TYPE_ARRAY__(CV_OPT, BOUND_OPT, ...) \
template <typename T, bool IsBase __VA_ARGS__> \
struct check<T CV_OPT [BOUND_OPT], IsBase> : check<T CV_OPT, !std::is_array<T>::value> \
{ \
using base_t = check<T CV_OPT, !std::is_array<T>::value>; \
using base_t::out_; \
\
bound<BOUND_OPT> bound_ = out_; \
bracket<IsBase> bracket_ = out_; \
\
check(const output& out) : base_t(out) {} \
};
#define CAPO_CHECK_TYPE_ARRAY_CV__(BOUND_OPT, ...) \
CAPO_CHECK_TYPE_ARRAY__(, BOUND_OPT, ,##__VA_ARGS__) \
CAPO_CHECK_TYPE_ARRAY__(const, BOUND_OPT, ,##__VA_ARGS__) \
CAPO_CHECK_TYPE_ARRAY__(volatile, BOUND_OPT, ,##__VA_ARGS__) \
CAPO_CHECK_TYPE_ARRAY__(const volatile, BOUND_OPT, ,##__VA_ARGS__)
#define CAPO_CHECK_TYPE_PLACEHOLDER__
CAPO_CHECK_TYPE_ARRAY_CV__(CAPO_CHECK_TYPE_PLACEHOLDER__)
#if defined(__GNUC__)
CAPO_CHECK_TYPE_ARRAY_CV__(0)
#endif/*__GNUC__*/
CAPO_CHECK_TYPE_ARRAY_CV__(N, size_t N)
#pragma pop_macro("CAPO_CHECK_TYPE_PLACEHOLDER__")
#pragma pop_macro("CAPO_CHECK_TYPE_ARRAY_CV__")
#pragma pop_macro("CAPO_CHECK_TYPE_ARRAY__")
/*
Functions
*/
template <typename T, bool IsBase, typename... P>
struct check<T(P...), IsBase> : check<T, true>
{
using base_t = check<T, true>;
using base_t::out_;
parameter<true, P...> parameter_ = out_;
bracket<IsBase> bracket_ = out_;
check(const output& out) : base_t(out) {}
};
/*
Pointers to members
*/
template <typename T, bool IsBase, typename C>
struct check<T C::*, IsBase> : check<T, true>
{
using base_t = check<T, true>;
using base_t::out_;
check(const output& out) : base_t(out)
{
check<C> { out_ };
out_.compact()("::*");
}
};
/*
Pointers to member functions
*/
#pragma push_macro("CAPO_CHECK_TYPE_MEM_FUNC__")
#undef CAPO_CHECK_TYPE_MEM_FUNC__
#define CAPO_CHECK_TYPE_MEM_FUNC__(...) \
template <typename T, bool IsBase, typename C, typename... P> \
struct check<T(C::*)(P...) __VA_ARGS__, IsBase> \
{ \
scope_guard<> at_destruct_cv_; \
check<T(P...), true> base_; \
output& out_ = base_.out_; \
\
check(const output& out) \
: at_destruct_cv_([this]{ out_(#__VA_ARGS__); }) \
, base_(out) \
{ \
check<C> { out_ }; \
out_.compact()("::*"); \
} \
};
CAPO_CHECK_TYPE_MEM_FUNC__()
CAPO_CHECK_TYPE_MEM_FUNC__(const)
CAPO_CHECK_TYPE_MEM_FUNC__(volatile)
CAPO_CHECK_TYPE_MEM_FUNC__(const volatile)
#pragma pop_macro("CAPO_CHECK_TYPE_MEM_FUNC__")
} // namespace detail_type_name
////////////////////////////////////////////////////////////////
/// Get the name of the given type
////////////////////////////////////////////////////////////////
/*
type_name<const volatile void *>()
-->
void const volatile *
*/
template <typename T>
inline std::string type_name(void)
{
std::string str;
detail_type_name::check<T> { str };
return str;
}
} // namespace capo

23
3rdparty/capo/unused.hpp vendored Executable file
View File

@ -0,0 +1,23 @@
/*
The Capo Library
Code covered by the MIT License
Author: mutouyun (http://orzz.org)
*/
#pragma once
////////////////////////////////////////////////////////////////
#ifdef CAPO_UNUSED_
# error "CAPO_UNUSED_ has been defined."
#endif
#if defined(_MSC_VER)
# define CAPO_UNUSED_ __pragma(warning(suppress: 4100 4101 4189))
#elif defined(__GNUC__)
# define CAPO_UNUSED_ __attribute__((__unused__))
#else
# define CAPO_UNUSED_
#endif
////////////////////////////////////////////////////////////////

3
demo/chat/CMakeLists.txt Normal file → Executable file
View File

@ -6,6 +6,3 @@ file(GLOB HEAD_FILES ./*.h)
add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES})
target_link_libraries(${PROJECT_NAME} ipc)
if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} pthread rt)
endif()

42
include/ipc.h Normal file → Executable file
View File

@ -27,10 +27,10 @@ struct IPC_EXPORT chan_impl {
static std::size_t recv_count(handle_t h);
static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm);
static bool send(handle_t h, void const * data, std::size_t size);
static bool send(handle_t h, void const * data, std::size_t size, std::size_t tm);
static buff_t recv(handle_t h, std::size_t tm);
static bool try_send(handle_t h, void const * data, std::size_t size);
static bool try_send(handle_t h, void const * data, std::size_t size, std::size_t tm);
static buff_t try_recv(handle_t h);
};
@ -38,16 +38,18 @@ template <typename Flag>
class chan_wrapper {
private:
using detail_t = chan_impl<Flag>;
handle_t h_ = nullptr;
handle_t h_ = nullptr;
unsigned mode_ = ipc::sender;
public:
chan_wrapper() = default;
explicit chan_wrapper(char const * name, unsigned mode = sender) {
explicit chan_wrapper(char const * name, unsigned mode = ipc::sender) {
this->connect(name, mode);
}
chan_wrapper(chan_wrapper&& rhs) {
chan_wrapper(chan_wrapper&& rhs) noexcept {
swap(rhs);
}
@ -55,7 +57,7 @@ public:
disconnect();
}
void swap(chan_wrapper& rhs) {
void swap(chan_wrapper& rhs) noexcept {
std::swap(h_, rhs.h_);
}
@ -68,22 +70,26 @@ public:
return detail_t::name(h_);
}
handle_t handle() const {
handle_t handle() const noexcept {
return h_;
}
bool valid() const {
bool valid() const noexcept {
return (handle() != nullptr);
}
chan_wrapper clone() const {
return chan_wrapper { name() };
unsigned mode() const noexcept {
return mode_;
}
bool connect(char const * name, unsigned mode = sender | receiver) {
chan_wrapper clone() const {
return chan_wrapper { name(), mode_ };
}
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) {
if (name == nullptr || name[0] == '\0') return false;
this->disconnect();
h_ = detail_t::connect(name, mode);
h_ = detail_t::connect(name, mode_ = mode);
return valid();
}
@ -105,13 +111,13 @@ public:
return chan_wrapper(name).wait_for_recv(r_count, tm);
}
bool send (void const * data, std::size_t size) { return detail_t::send(h_, data, size) ; }
bool send (buff_t const & buff) { return this->send(buff.data(), buff.size()) ; }
bool send (std::string const & str) { return this->send(str.c_str(), str.size() + 1); }
bool send (void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm) ; }
bool send (buff_t const & buff , std::size_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm) ; }
bool send (std::string const & str , std::size_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); }
bool try_send(void const * data, std::size_t size) { return detail_t::try_send(h_, data, size) ; }
bool try_send(buff_t const & buff) { return this->try_send(buff.data(), buff.size()) ; }
bool try_send(std::string const & str) { return this->try_send(str.c_str(), str.size() + 1); }
bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm) ; }
bool try_send(buff_t const & buff , std::size_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm) ; }
bool try_send(std::string const & str , std::size_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); }
buff_t recv(std::size_t tm = invalid_value) {
return detail_t::recv(h_, tm);

4
include/rw_lock.h Normal file → Executable file
View File

@ -63,8 +63,10 @@ inline void yield(K& k) noexcept {
if (k < 4) { /* Do nothing */ }
else
if (k < 16) { IPC_LOCK_PAUSE_(); }
else
if (k < 32) { std::this_thread::yield(); }
else {
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return;
}
++k;

View File

@ -1,87 +1,112 @@
#pragma once
#include <cstdint>
#include <utility>
#include <limits>
#include <memory> // std::unique_ptr
#include <utility> // std::forward
#include <cstddef> // std::size_t
#include "export.h"
#include "platform/detail.h"
namespace ipc {
namespace tls {
using key_t = std::uint64_t;
using destructor_t = void (*)(void*);
using key_t = std::size_t;
using destructor_t = void (*)(void *);
enum : key_t {
invalid_value = (std::numeric_limits<key_t>::max)()
struct key_info {
key_t key_;
};
IPC_EXPORT key_t create (destructor_t destructor = nullptr);
IPC_EXPORT void release(key_t key);
IPC_EXPORT bool create (key_info * pkey, destructor_t destructor = nullptr);
IPC_EXPORT void release(key_info const * pkey);
IPC_EXPORT bool set(key_t key, void* ptr);
IPC_EXPORT void* get(key_t key);
IPC_EXPORT bool set(key_info const * pkey, void * ptr);
IPC_EXPORT void * get(key_info const * pkey);
////////////////////////////////////////////////////////////////
/// Thread-local pointer
////////////////////////////////////////////////////////////////
/*
* <Remarks>
*
/**
* @remarks
* You need to set the ipc::tls::pointer's storage manually:
* ```
* tls::pointer<int> p;
* if (!p) p = new int(123);
* ```
* It would be like an ordinary pointer.
* Or you could just call create to 'new' this pointer automatically.
* Or you could just call create_once to 'new' this pointer automatically.
* ```
* tls::pointer<int> p;
* p.create(123);
* p.create_once(123);
* ```
*/
template <typename T>
class pointer {
key_t key_;
class pointer : public key_info {
pointer(pointer const &) = delete;
pointer & operator=(pointer const &) = delete;
void destruct() const {
delete static_cast<T*>(tls::get(this));
}
public:
using value_type = T;
pointer()
: key_(tls::create([](void* p) { delete static_cast<T*>(p); })) {
pointer() {
tls::create(this, [](void* p) {
delete static_cast<T*>(p);
});
}
~pointer() {
tls::release(key_);
destruct();
tls::release(this);
}
template <typename... P>
T* create(P&&... params) {
auto ptr = static_cast<T*>(get(key_));
if (ptr == nullptr) {
ptr = new T(std::forward<P>(params)...);
if (!set(key_, ptr)) {
delete ptr;
destruct();
auto ptr = detail::unique_ptr(new T(std::forward<P>(params)...));
if (!tls::set(this, ptr.get())) {
return nullptr;
}
return ptr.release();
}
template <typename... P>
T* create_once(P&&... params) {
auto p = static_cast<T*>(tls::get(this));
if (p == nullptr) {
auto ptr = detail::unique_ptr(new T(std::forward<P>(params)...));
if (!tls::set(this, ptr.get())) {
return nullptr;
}
p = ptr.release();
}
return ptr;
return p;
}
T* operator=(T* ptr) {
set(key_, ptr);
return ptr;
T* operator=(T* p) {
set(this, p);
return p;
}
operator T*() const { return static_cast<T*>(get(key_)); }
explicit operator T *() const {
return static_cast<T *>(tls::get(this));
}
T& operator* () { return *static_cast<T*>(*this); }
const T& operator* () const { return *static_cast<T*>(*this); }
explicit operator bool() const {
return tls::get(this) != nullptr;
}
T* operator->() { return static_cast<T*>(*this); }
const T* operator->() const { return static_cast<T*>(*this); }
T & operator* () { return *static_cast<T *>(*this); }
const T & operator* () const { return *static_cast<T *>(*this); }
T * operator->() { return static_cast<T *>(*this); }
const T * operator->() const { return static_cast<T *>(*this); }
};
} // namespace tls

12
src/CMakeLists.txt Normal file → Executable file
View File

@ -2,10 +2,13 @@ project(ipc)
add_compile_options(-D__IPC_LIBRARY__)
if(NOT MSVC)
add_compile_options(-fPIC)
endif()
include_directories(
${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/src
)
${CMAKE_SOURCE_DIR}/src)
if(UNIX)
file(GLOB SRC_FILES ${CMAKE_SOURCE_DIR}/src/platform/*_linux.cpp)
@ -16,3 +19,8 @@ aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_FILES)
file(GLOB HEAD_FILES ${CMAKE_SOURCE_DIR}/include/*.h ${CMAKE_SOURCE_DIR}/src/*.inc ${CMAKE_SOURCE_DIR}/src/memory/*.hpp)
add_library(${PROJECT_NAME} SHARED ${SRC_FILES} ${HEAD_FILES})
if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} PUBLIC
pthread
$<$<NOT:$<STREQUAL:${CMAKE_SYSTEM_NAME},Windows>>:rt>)
endif()

4
src/concept.h Normal file → Executable file
View File

@ -1,13 +1,13 @@
#pragma once
#include <type_traits>
#include <type_traits> // std::enable_if
namespace ipc {
// concept helpers
template <bool Cond, typename R = void>
using require = std::enable_if_t<Cond, R>;
using require = typename std::enable_if<Cond, R>::type;
#ifdef IPC_CONCEPT_
# error "IPC_CONCEPT_ has been defined."

40
src/ipc.cpp Normal file → Executable file
View File

@ -283,7 +283,7 @@ struct conn_info_head {
}
auto& recv_cache() {
return *recv_cache_.create();
return *recv_cache_.create_once();
}
};
@ -440,21 +440,21 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
return true;
}
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) {
return send([tm](auto info, auto que, auto msg_id) {
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size);
}, default_timeout)) {
}, tm)) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
if (!que->force_push([](void* p) {
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
if (tmp_msg->storage_) {
clear_storage(*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
static_cast<std::int32_t>(data_length) + tmp_msg->remain_);
}
return true;
}, info->cc_id_, msg_id, remain, data, size)) {
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
if (tmp_msg->storage_) {
clear_storage(*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
static_cast<std::int32_t>(data_length) + tmp_msg->remain_);
}
return true;
}, info->cc_id_, msg_id, remain, data, size)) {
return false;
}
}
@ -464,12 +464,12 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
}, h, data, size);
}
static bool try_send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) {
return send([tm](auto info, auto que, auto msg_id) {
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size);
}, 0)) {
}, tm)) {
return false;
}
info->rd_waiter_.broadcast();
@ -587,8 +587,8 @@ bool chan_impl<Flag>::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::s
}
template <typename Flag>
bool chan_impl<Flag>::send(ipc::handle_t h, void const * data, std::size_t size) {
return detail_impl<policy_t<Flag>>::send(h, data, size);
bool chan_impl<Flag>::send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) {
return detail_impl<policy_t<Flag>>::send(h, data, size, tm);
}
template <typename Flag>
@ -597,8 +597,8 @@ buff_t chan_impl<Flag>::recv(ipc::handle_t h, std::size_t tm) {
}
template <typename Flag>
bool chan_impl<Flag>::try_send(ipc::handle_t h, void const * data, std::size_t size) {
return detail_impl<policy_t<Flag>>::try_send(h, data, size);
bool chan_impl<Flag>::try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) {
return detail_impl<policy_t<Flag>>::try_send(h, data, size, tm);
}
template <typename Flag>

View File

@ -166,7 +166,7 @@ public:
template <typename ... P>
async_wrapper(P ... pars) {
get_alloc_ = [this, pars ...]()->ref_t {
return *tls_.create(this, pars ...);
return *tls_.create_once(this, pars ...);
};
}
@ -229,8 +229,8 @@ struct default_mapping_policy {
};
template <typename F, typename ... P>
IPC_CONSTEXPR_ static void foreach(F f, P ... params) {
for (std::size_t i = 0; i < classes_size; ++i) f(i, params...);
IPC_CONSTEXPR_ static void foreach(F && f, P && ... params) {
for (std::size_t i = 0; i < classes_size; ++i) f(i, std::forward<P>(params)...);
}
IPC_CONSTEXPR_ static std::size_t block_size(std::size_t id) noexcept {
@ -238,9 +238,9 @@ struct default_mapping_policy {
}
template <typename F, typename D, typename ... P>
IPC_CONSTEXPR_ static auto classify(F f, D d, std::size_t size, P ... params) {
IPC_CONSTEXPR_ static auto classify(F && f, D && d, std::size_t size, P && ... params) {
std::size_t id = (size - base_size - 1) / iter_size;
return (id < classes_size) ? f(id, size, params...) : d(size, params...);
return (id < classes_size) ? f(id, size, std::forward<P>(params)...) : d(size, std::forward<P>(params)...);
}
};
@ -251,41 +251,49 @@ class variable_wrapper {
struct initiator {
std::aligned_storage_t< sizeof(FixedAlloc),
alignof(FixedAlloc)> arr_[MappingP::classes_size];
using falc_t = std::aligned_storage_t<sizeof(FixedAlloc), alignof(FixedAlloc)>;
falc_t arr_[MappingP::classes_size];
initiator() {
MappingP::foreach([](std::size_t id, initiator* t) {
::new (&(t->arr_[id])) FixedAlloc(MappingP::block_size(id));
}, this);
MappingP::foreach([](std::size_t id, falc_t * a) {
::new (&(a[id])) FixedAlloc(MappingP::block_size(id));
}, arr_);
}
FixedAlloc& at(std::size_t id) {
return reinterpret_cast<FixedAlloc&>(arr_[id]);
~initiator() {
MappingP::foreach([](std::size_t id, falc_t * a) {
initiator::at(a, id).~FixedAlloc();
}, arr_);
}
static FixedAlloc & at(falc_t * arr, std::size_t id) noexcept {
return reinterpret_cast<FixedAlloc&>(arr[id]);
}
} init_;
using falc_t = typename initiator::falc_t;
public:
void swap(variable_wrapper & other) {
MappingP::foreach([](std::size_t id, initiator* t, initiator* o) {
t->at(id).swap(o->at(id));
}, &init_, &other.init_);
MappingP::foreach([](std::size_t id, falc_t * in, falc_t * ot) {
initiator::at(in, id).swap(initiator::at(ot, id));
}, init_.arr_, other.init_.arr_);
}
void* alloc(std::size_t size) {
return MappingP::classify([](std::size_t id, std::size_t size, initiator* t) {
return t->at(id).alloc(size);
}, [](std::size_t size, initiator*) {
return MappingP::classify([](std::size_t id, std::size_t size, falc_t * a) {
return initiator::at(a, id).alloc(size);
}, [](std::size_t size, falc_t *) {
return DefaultAlloc::alloc(size);
}, size, &init_);
}, size, init_.arr_);
}
void free(void* p, std::size_t size) {
MappingP::classify([](std::size_t id, std::size_t size, void* p, initiator* t) {
t->at(id).free(p, size);
}, [](std::size_t size, void* p, initiator*) {
MappingP::classify([](std::size_t id, std::size_t size, void* p, falc_t * a) {
initiator::at(a, id).free(p, size);
}, [](std::size_t size, void* p, falc_t *) {
DefaultAlloc::free(p, size);
}, size, p, &init_);
}, size, p, init_.arr_);
}
};

View File

@ -5,13 +5,7 @@
#include <shared_mutex>
#include <type_traits>
#include <tuple>
#include <atomic>
#include <algorithm>
#include <utility>
#include <new>
#include "def.h"
#include "export.h"
// pre-defined
@ -61,6 +55,8 @@
namespace std {
// deduction guides for std::unique_ptr
template <typename T>
unique_ptr(T* p) -> unique_ptr<T>;
template <typename T, typename D>
unique_ptr(T* p, D&& d) -> unique_ptr<T, std::decay_t<D>>;
@ -81,20 +77,25 @@ namespace ipc {
namespace detail {
// deduction guides for std::unique_ptr
template <typename T>
constexpr auto unique_ptr(T* p) noexcept {
return std::unique_ptr<T> { p };
}
template <typename T, typename D>
constexpr auto unique_ptr(T* p, D&& d) {
constexpr auto unique_ptr(T* p, D&& d) noexcept {
return std::unique_ptr<T, std::decay_t<D>> { p, std::forward<D>(d) };
}
// deduction guides for std::unique_lock
template <typename T>
constexpr auto unique_lock(T&& lc) {
constexpr auto unique_lock(T&& lc) noexcept {
return std::unique_lock<std::decay_t<T>> { std::forward<T>(lc) };
}
// deduction guides for std::shared_lock
template <typename T>
constexpr auto shared_lock(T&& lc) {
constexpr auto shared_lock(T&& lc) noexcept {
return std::shared_lock<std::decay_t<T>> { std::forward<T>(lc) };
}
@ -110,40 +111,5 @@ constexpr const T& (min)(const T& a, const T& b) {
#endif/*__cplusplus < 201703L*/
template <typename F, typename D>
constexpr decltype(auto) static_switch(std::size_t /*i*/, std::index_sequence<>, F&& /*f*/, D&& def) {
return std::forward<D>(def)();
}
template <typename F, typename D, std::size_t N, std::size_t...I>
constexpr decltype(auto) static_switch(std::size_t i, std::index_sequence<N, I...>, F&& f, D&& def) {
return (i == N) ? std::forward<F>(f)(std::integral_constant<std::size_t, N>{}) :
static_switch(i, std::index_sequence<I...>{}, std::forward<F>(f), std::forward<D>(def));
}
template <std::size_t N, typename F, typename D>
constexpr decltype(auto) static_switch(std::size_t i, F&& f, D&& def) {
return static_switch(i, std::make_index_sequence<N>{}, std::forward<F>(f), std::forward<D>(def));
}
template <typename F, std::size_t...I>
IPC_CONSTEXPR_ void static_for(std::index_sequence<I...>, F&& f) {
IPC_UNUSED_ auto expand = { (std::forward<F>(f)(std::integral_constant<std::size_t, I>{}), 0)... };
}
template <std::size_t N, typename F>
IPC_CONSTEXPR_ void static_for(F&& f) {
static_for(std::make_index_sequence<N>{}, std::forward<F>(f));
}
// Minimum offset between two objects to avoid false sharing.
enum {
// #if __cplusplus >= 201703L
// cache_line_size = std::hardware_destructive_interference_size
// #else /*__cplusplus < 201703L*/
cache_line_size = 64
// #endif/*__cplusplus < 201703L*/
};
} // namespace detail
} // namespace ipc

View File

@ -1,29 +0,0 @@
#include "tls_pointer.h"
#include <pthread.h> // pthread_...
namespace ipc {
namespace tls {
key_t create(destructor_t destructor) {
pthread_key_t k;
if (pthread_key_create(&k, destructor) == 0) {
return static_cast<key_t>(k);
}
return invalid_value;
}
void release(key_t key) {
pthread_key_delete(static_cast<pthread_key_t>(key));
}
bool set(key_t key, void* ptr) {
return pthread_setspecific(static_cast<pthread_key_t>(key), ptr) == 0;
}
void* get(key_t key) {
return pthread_getspecific(static_cast<pthread_key_t>(key));
}
} // namespace tls
} // namespace ipc

View File

@ -0,0 +1,82 @@
#include "tls_pointer.h"
#include <pthread.h> // pthread_...
#include <atomic> // std::atomic_thread_fence
#include <cassert> // assert
#include "log.h"
#include "utility.h"
namespace ipc {
namespace tls {
namespace {
namespace native {
using key_t = pthread_key_t;
bool create(key_t * pkey, void (*destructor)(void*)) {
assert(pkey != nullptr);
int err = ::pthread_key_create(pkey, destructor);
if (err != 0) {
ipc::error("[native::create] pthread_key_create failed [%d].\n", err);
return false;
}
return true;
}
bool release(key_t key) {
int err = ::pthread_key_delete(key);
if (err != 0) {
ipc::error("[native::release] pthread_key_delete failed [%d].\n", err);
return false;
}
return true;
}
bool set(key_t key, void * ptr) {
int err = ::pthread_setspecific(key, ptr);
if (err != 0) {
ipc::error("[native::set] pthread_setspecific failed [%d].\n", err);
return false;
}
return true;
}
void * get(key_t key) {
return ::pthread_getspecific(key);
}
} // namespace native
} // internal-linkage
bool create(key_info * pkey, destructor_t destructor) {
assert(pkey != nullptr);
native::key_t k;
if (!native::create(&k, destructor)) {
return false;
}
pkey->key_ = horrible_cast<key_t>(k);
std::atomic_thread_fence(std::memory_order_seq_cst);
return true;
}
void release(key_info const * pkey) {
assert(pkey != nullptr);
static_cast<void>(
native::release(horrible_cast<native::key_t>(pkey->key_)));
}
bool set(key_info const * pkey, void* ptr) {
assert(pkey != nullptr);
return native::set(horrible_cast<native::key_t>(pkey->key_), ptr);
}
void * get(key_info const * pkey) {
assert(pkey != nullptr);
return native::get(horrible_cast<native::key_t>(pkey->key_));
}
} // namespace tls
} // namespace ipc

192
src/platform/tls_pointer_win.cpp Normal file → Executable file
View File

@ -1,18 +1,14 @@
#include "tls_pointer.h"
#include "log.h"
#include <Windows.h> // ::Tls...
#include <atomic>
#include <Windows.h>
namespace ipc {
/*
* <Remarks>
*
/**
* @remarks
* Windows doesn't support a per-thread destructor with its TLS primitives.
* So, here will build it manually by inserting a function to be called on each thread's exit.
*
* <Reference>
* @see
* - https://www.codeproject.com/Articles/8113/Thread-Local-Storage-The-C-Way
* - https://src.chromium.org/viewvc/chrome/trunk/src/base/threading/thread_local_storage_win.cc
* - https://github.com/mirror/mingw-org-wsl/blob/master/src/libcrt/crt/tlssup.c
@ -20,186 +16,17 @@ namespace ipc {
* - http://svn.boost.org/svn/boost/trunk/libs/thread/src/win32/tss_pe.cpp
*/
namespace {
struct tls_data {
using destructor_t = void(*)(void*);
unsigned index_;
DWORD win_key_;
destructor_t destructor_;
bool valid() const noexcept {
return win_key_ != TLS_OUT_OF_INDEXES;
}
void* get() const {
return ::TlsGetValue(win_key_);
}
bool set(void* p) {
return TRUE == ::TlsSetValue(win_key_, static_cast<LPVOID>(p));
}
void destruct() {
void* data = valid() ? get() : nullptr;
if (data != nullptr) {
if (destructor_ != nullptr) destructor_(data);
set(nullptr);
}
}
void clear_self() {
if (valid()) {
destruct();
::TlsFree(win_key_);
}
delete this;
}
};
struct tls_recs {
tls_data* recs_[TLS_MINIMUM_AVAILABLE] {};
unsigned index_ = 0;
bool insert(tls_data* data) noexcept {
if (index_ >= TLS_MINIMUM_AVAILABLE) {
ipc::error("[tls_recs] insert tls_data failed[index_ >= TLS_MINIMUM_AVAILABLE].\n");
return false;
}
recs_[(data->index_ = index_++)] = data;
return true;
}
bool erase(tls_data* data) noexcept {
if (data->index_ >= TLS_MINIMUM_AVAILABLE) return false;
recs_[data->index_] = nullptr;
return true;
}
tls_data* * begin() noexcept { return &recs_[0]; }
tls_data* const * begin() const noexcept { return &recs_[0]; }
tls_data* * end () noexcept { return &recs_[index_]; }
tls_data* const * end () const noexcept { return &recs_[index_]; }
};
struct key_gen {
DWORD rec_key_;
key_gen() : rec_key_(::TlsAlloc()) {
if (rec_key_ == TLS_OUT_OF_INDEXES) {
ipc::error("[record_key] TlsAlloc failed[%lu].\n", ::GetLastError());
}
}
~key_gen() {
::TlsFree(rec_key_);
rec_key_ = TLS_OUT_OF_INDEXES;
}
} gen__;
DWORD& record_key() noexcept {
return gen__.rec_key_;
}
bool record(tls_data* tls_dat) {
if (record_key() == TLS_OUT_OF_INDEXES) return false;
auto rec = static_cast<tls_recs*>(::TlsGetValue(record_key()));
if (rec == nullptr) {
if (FALSE == ::TlsSetValue(record_key(), static_cast<LPVOID>(rec = new tls_recs))) {
ipc::error("[record] TlsSetValue failed[%lu].\n", ::GetLastError());
return false;
}
}
return rec->insert(tls_dat);
}
void erase_record(tls_data* tls_dat) {
if (tls_dat == nullptr) return;
if (record_key() == TLS_OUT_OF_INDEXES) return;
auto rec = static_cast<tls_recs*>(::TlsGetValue(record_key()));
if (rec == nullptr) return;
rec->erase(tls_dat);
tls_dat->clear_self();
}
void clear_all_records() {
if (record_key() == TLS_OUT_OF_INDEXES) return;
auto rec = static_cast<tls_recs*>(::TlsGetValue(record_key()));
if (rec == nullptr) return;
for (auto tls_dat : *rec) {
if (tls_dat != nullptr) tls_dat->destruct();
}
delete rec;
::TlsSetValue(record_key(), static_cast<LPVOID>(nullptr));
}
} // internal-linkage
namespace ipc {
namespace tls {
key_t create(destructor_t destructor) {
record_key(); // gen record-key
auto tls_dat = new tls_data { unsigned(-1), ::TlsAlloc(), destructor };
std::atomic_thread_fence(std::memory_order_seq_cst);
if (!tls_dat->valid()) {
ipc::error("[tls::create] TlsAlloc failed[%lu].\n", ::GetLastError());
tls_dat->clear_self();
return invalid_value;
}
return reinterpret_cast<key_t>(tls_dat);
}
void release(key_t tls_key) {
if (tls_key == invalid_value) {
ipc::error("[tls::release] tls_key is invalid_value.\n");
return;
}
auto tls_dat = reinterpret_cast<tls_data*>(tls_key);
if (tls_dat == nullptr) {
ipc::error("[tls::release] tls_dat is nullptr.\n");
return;
}
erase_record(tls_dat);
}
bool set(key_t tls_key, void* ptr) {
if (tls_key == invalid_value) {
ipc::error("[tls::set] tls_key is invalid_value.\n");
return false;
}
auto tls_dat = reinterpret_cast<tls_data*>(tls_key);
if (tls_dat == nullptr) {
ipc::error("[tls::set] tls_dat is nullptr.\n");
return false;
}
if (!tls_dat->set(ptr)) {
ipc::error("[tls::set] TlsSetValue failed[%lu].\n", ::GetLastError());
return false;
}
record(tls_dat);
return true;
}
void* get(key_t tls_key) {
if (tls_key == invalid_value) {
ipc::error("[tls::get] tls_key is invalid_value.\n");
return nullptr;
}
auto tls_dat = reinterpret_cast<tls_data*>(tls_key);
if (tls_dat == nullptr) {
ipc::error("[tls::get] tls_dat is nullptr.\n");
return nullptr;
}
return tls_dat->get();
}
} // namespace tls
void at_thread_exit();
namespace {
void NTAPI OnTlsCallback(PVOID, DWORD dwReason, PVOID) {
if (dwReason == DLL_THREAD_DETACH) clear_all_records();
if (dwReason == DLL_THREAD_DETACH) {
ipc::tls::at_thread_exit();
}
}
} // internal-linkage
@ -271,4 +98,5 @@ extern "C" NX_CRTALLOC_(".tls") const IMAGE_TLS_DIRECTORY _tls_used = {
#endif/*_MSC_VER, __GNUC__*/
} // namespace tls
} // namespace ipc

76
src/platform/tls_pointer_win.h Executable file
View File

@ -0,0 +1,76 @@
#pragma once
#include <unordered_map> // std::unordered_map
#include <atomic> // std::atomic_thread_fence
#include <cassert> // assert
#include "log.h"
#include "utility.h"
namespace ipc {
namespace tls {
namespace {
inline void tls_destruct(key_info const * pkey, void * p) {
assert(pkey != nullptr);
auto destructor = horrible_cast<destructor_t>(pkey->key_);
if (destructor != nullptr) destructor(p);
}
struct tls_recs : public std::unordered_map<key_info const *, void *> {
~tls_recs() {
for (auto & pair : *this) {
tls_destruct(pair.first, pair.second);
}
}
};
inline tls_recs * tls_get_recs() {
thread_local tls_recs * recs_ptr = nullptr;
if (recs_ptr == nullptr) {
recs_ptr = new tls_recs;
}
assert(recs_ptr != nullptr);
return recs_ptr;
}
} // internal-linkage
void at_thread_exit() {
delete tls_get_recs();
}
bool create(key_info * pkey, destructor_t destructor) {
assert(pkey != nullptr);
pkey->key_ = horrible_cast<key_t>(destructor);
std::atomic_thread_fence(std::memory_order_seq_cst);
return true;
}
void release(key_info const * pkey) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
tls_get_recs()->erase(pkey);
}
bool set(key_info const * pkey, void * ptr) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
(*tls_get_recs())[pkey] = ptr;
return true;
}
void * get(key_info const * pkey) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
auto const recs = tls_get_recs();
auto it = recs->find(pkey);
if (it == recs->end()) {
return nullptr;
}
return it->second;
}
} // namespace tls
} // namespace ipc

12
src/prod_cons.h Normal file → Executable file
View File

@ -8,6 +8,8 @@
#include "def.h"
#include "platform/detail.h"
#include "circ/elem_def.h"
#include "log.h"
#include "utility.h"
namespace ipc {
@ -26,8 +28,8 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
std::aligned_storage_t<DataSize, AlignSize> data_ {};
};
alignas(detail::cache_line_size) std::atomic<circ::u2_t> rd_; // read index
alignas(detail::cache_line_size) std::atomic<circ::u2_t> wt_; // write index
alignas(cache_line_size) std::atomic<circ::u2_t> rd_; // read index
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
constexpr circ::u2_t cursor() const noexcept {
return 0;
@ -110,7 +112,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
};
alignas(detail::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
template <typename W, typename F, typename E>
bool push(W* /*wrapper*/, F&& f, E* elems) {
@ -196,7 +198,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
std::atomic<rc_t> rc_ { 0 }; // read-counter
};
alignas(detail::cache_line_size) std::atomic<circ::u2_t> wt_; // write index
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
circ::u2_t cursor() const noexcept {
return wt_.load(std::memory_order_acquire);
@ -290,7 +292,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
};
alignas(detail::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
circ::u2_t cursor() const noexcept {
return ct_.load(std::memory_order_acquire);

15
src/queue.h Normal file → Executable file
View File

@ -9,6 +9,7 @@
#include <thread>
#include <chrono>
#include <string>
#include <cassert> // assert
#include "def.h"
#include "shm.h"
@ -103,6 +104,12 @@ public:
elems_ = open<elems_t>(name);
}
explicit queue_base(elems_t * elems)
: queue_base() {
assert(elems != nullptr);
elems_ = elems;
}
/* not virtual */ ~queue_base() {
base_t::close();
}
@ -136,7 +143,7 @@ public:
}
template <typename T, typename... P>
auto push(P&&... params) {
bool push(P&&... params) {
if (elems_ == nullptr) return false;
return elems_->push(this, [&](void* p) {
::new (p) T(std::forward<P>(params)...);
@ -144,7 +151,7 @@ public:
}
template <typename T, typename F, typename... P>
auto force_push(F&& prep, P&&... params) {
bool force_push(F&& prep, P&&... params) {
if (elems_ == nullptr) return false;
return elems_->force_push(this, [&](void* p) {
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
@ -174,12 +181,12 @@ public:
using base_t::base_t;
template <typename... P>
auto push(P&&... params) {
bool push(P&&... params) {
return base_t::template push<T>(std::forward<P>(params)...);
}
template <typename... P>
auto force_push(P&&... params) {
bool force_push(P&&... params) {
return base_t::template force_push<T>(std::forward<P>(params)...);
}

9
src/tls_pointer.cpp Executable file
View File

@ -0,0 +1,9 @@
#include "tls_pointer.h"
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
#include "platform/tls_pointer_win.h"
#else /*!WIN*/
#include "platform/tls_pointer_linux.h"
#endif/*!WIN*/

56
src/utility.h Executable file
View File

@ -0,0 +1,56 @@
#pragma once
#include <utility> // std::forward, std::integer_sequence
#include <cstddef> // std::size_t
#include <new> // std::hardware_destructive_interference_size
#include "platform/detail.h"
namespace ipc {
template <typename F, typename D>
constexpr decltype(auto) static_switch(std::size_t /*i*/, std::index_sequence<>, F&& /*f*/, D&& def) {
return std::forward<D>(def)();
}
template <typename F, typename D, std::size_t N, std::size_t...I>
constexpr decltype(auto) static_switch(std::size_t i, std::index_sequence<N, I...>, F&& f, D&& def) {
return (i == N) ? std::forward<F>(f)(std::integral_constant<std::size_t, N>{}) :
static_switch(i, std::index_sequence<I...>{}, std::forward<F>(f), std::forward<D>(def));
}
template <std::size_t N, typename F, typename D>
constexpr decltype(auto) static_switch(std::size_t i, F&& f, D&& def) {
return static_switch(i, std::make_index_sequence<N>{}, std::forward<F>(f), std::forward<D>(def));
}
template <typename F, std::size_t...I>
IPC_CONSTEXPR_ void static_for(std::index_sequence<I...>, F&& f) {
IPC_UNUSED_ auto expand = { (std::forward<F>(f)(std::integral_constant<std::size_t, I>{}), 0)... };
}
template <std::size_t N, typename F>
IPC_CONSTEXPR_ void static_for(F&& f) {
static_for(std::make_index_sequence<N>{}, std::forward<F>(f));
}
// Minimum offset between two objects to avoid false sharing.
enum {
// #if __cplusplus >= 201703L
// cache_line_size = std::hardware_destructive_interference_size
// #else /*__cplusplus < 201703L*/
cache_line_size = 64
// #endif/*__cplusplus < 201703L*/
};
template <typename T, typename U>
T horrible_cast(U val) {
union {
T out;
U in;
} u;
u.in = val;
return u.out;
}
} // namespace ipc

6
test/CMakeLists.txt Normal file → Executable file
View File

@ -13,8 +13,7 @@ include_directories(
${CMAKE_SOURCE_DIR}/src
${CMAKE_SOURCE_DIR}/test
${CMAKE_SOURCE_DIR}/3rdparty
${CMAKE_SOURCE_DIR}/3rdparty/gtest/include
)
${CMAKE_SOURCE_DIR}/3rdparty/gtest/include)
file(GLOB SRC_FILES ${CMAKE_SOURCE_DIR}/test/*.cpp)
file(GLOB HEAD_FILES ${CMAKE_SOURCE_DIR}/test/*.h)
@ -24,6 +23,3 @@ add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES})
link_directories(${CMAKE_SOURCE_DIR}/3rdparty/gperftools)
target_link_libraries(${PROJECT_NAME} gtest gtest_main ipc)
#target_link_libraries(${PROJECT_NAME} tcmalloc_minimal)
if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} pthread rt)
endif()

View File

@ -8,14 +8,40 @@
#include <mutex>
#include <utility>
#if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle
#endif/*__GNUC__*/
#include "gtest/gtest.h"
#include "capo/stopwatch.hpp"
#include "capo/spin_lock.hpp"
#include "thread_pool.h"
namespace ipc_ut {
template <typename Dur>
struct unit;
template <> struct unit<std::chrono::nanoseconds> {
constexpr static char const * str() noexcept {
return "ns";
}
};
template <> struct unit<std::chrono::microseconds> {
constexpr static char const * str() noexcept {
return "us";
}
};
template <> struct unit<std::chrono::milliseconds> {
constexpr static char const * str() noexcept {
return "ms";
}
};
template <> struct unit<std::chrono::seconds> {
constexpr static char const * str() noexcept {
return "sec";
}
};
struct test_stopwatch {
capo::stopwatch<> sw_;
@ -27,115 +53,34 @@ struct test_stopwatch {
}
}
template <int Factor, typename ToDur = std::chrono::microseconds>
void print_elapsed(int N, int M, int Loops, const char * unit = " us/d") {
template <typename ToDur = std::chrono::nanoseconds>
void print_elapsed(int N, int Loops, char const * message = "") {
auto ts = sw_.elapsed<ToDur>();
std::cout << "[" << N << ":" << M << ", " << Loops << "] "
<< (double(ts) / double(Factor ? (Loops * Factor) : (Loops * N))) << unit << std::endl;
std::cout << "[" << N << ", \t" << Loops << "] " << message << "\t"
<< (double(ts) / double(Loops)) << " " << unit<ToDur>::str() << std::endl;
}
void print_elapsed(int N, int M, int Loops) {
print_elapsed<0>(N, M, Loops);
template <int Factor, typename ToDur = std::chrono::nanoseconds>
void print_elapsed(int N, int M, int Loops, char const * message = "") {
auto ts = sw_.elapsed<ToDur>();
std::cout << "[" << N << "-" << M << ", \t" << Loops << "] " << message << "\t"
<< (double(ts) / double(Factor ? (Loops * Factor) : (Loops * N))) << " " << unit<ToDur>::str() << std::endl;
}
template <typename ToDur = std::chrono::nanoseconds>
void print_elapsed(int N, int M, int Loops, char const * message = "") {
print_elapsed<0, ToDur>(N, M, Loops, message);
}
};
template <typename V>
struct test_verify;
template <>
struct test_verify<void> {
test_verify (int) {}
void prepare (void*) {}
void verify (int, int) {}
template <typename U>
void push_data(int, U&&) {}
};
template <typename T>
struct test_cq;
template <typename T>
std::string type_name() {
#if defined(__GNUC__)
const char* typeid_name = typeid(T).name();
const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr);
std::unique_ptr<void, decltype(::free)*> guard { (void*)real_name, ::free };
if (real_name == nullptr) real_name = typeid_name;
return real_name;
#else
return typeid(T).name();
#endif/*__GNUC__*/
inline static thread_pool & sender() {
static thread_pool pool;
return pool;
}
template <int N, int M, int Loops, typename V = void, typename T>
void benchmark_prod_cons(T* cq) {
std::cout << "benchmark_prod_cons " << type_name<T>() << " [" << N << ":" << M << ", " << Loops << "]" << std::endl;
test_cq<T> tcq { cq };
std::thread producers[N];
std::thread consumers[M];
std::atomic_int fini_p { 0 }, fini_c { 0 };
test_stopwatch sw;
test_verify<V> vf { M };
// capo::spin_lock lc;
int cid = 0;
for (auto& t : consumers) {
t = std::thread{[&, cid] {
vf.prepare(&t);
auto cn = tcq.connect();
int i = 0;
tcq.recv(cn, [&](auto&& msg) {
// if (i % ((Loops * N) / 10) == 0) {
// std::unique_lock<capo::spin_lock> guard { lc };
// std::cout << cid << "-recving: " << (i * 100) / (Loops * N) << "%" << std::endl;
// }
vf.push_data(cid, std::forward<decltype(msg)>(msg));
++i;
});
// {
// std::unique_lock<capo::spin_lock> guard { lc };
// std::cout << cid << "-consumer-disconnect" << std::endl;
// }
tcq.disconnect(cn);
if ((fini_c.fetch_add(1, std::memory_order_relaxed) + 1) != M) {
// std::unique_lock<capo::spin_lock> guard { lc };
// std::cout << cid << "-consumer-end" << std::endl;
return;
}
sw.print_elapsed(N, M, Loops);
vf.verify(N, Loops);
}};
++cid;
}
tcq.wait_start(M);
std::cout << "start producers..." << std::endl;
int pid = 0;
for (auto& t : producers) {
t = std::thread{[&, pid] {
auto cn = tcq.connect_send();
sw.start();
for (int i = 0; i < Loops; ++i) {
// if (i % (Loops / 10) == 0) {
// std::unique_lock<capo::spin_lock> guard { lc };
// std::cout << pid << "-sending: " << (i * 100 / Loops) << "%" << std::endl;
// }
tcq.send(cn, { pid, i });
}
if ((fini_p.fetch_add(1, std::memory_order_relaxed) + 1) != N) {
return;
}
// quit
tcq.send(cn, { -1, -1 });
tcq.disconnect(cn);
}};
++pid;
}
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();
inline static thread_pool & reader() {
static thread_pool pool;
return pool;
}
} // namespace ipc_ut

View File

@ -1,385 +0,0 @@
#include <iostream>
#include <string>
#include <type_traits>
#include <memory>
#include <new>
#include <vector>
#include <unordered_map>
#include "queue.h"
#include "prod_cons.h"
#include "policy.h"
#include "circ/elem_array.h"
#include "memory/resource.h"
#include "test.h"
namespace {
struct msg_t {
int pid_;
int dat_;
};
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
using pc_t = ipc::prod_cons_impl<ipc::wr<Rp, Rc, Ts>>;
template <std::size_t DataSize, typename Policy>
struct ea_t : public ipc::circ::elem_array<Policy, DataSize, 1> {
ea_t() { std::memset(this, 0, sizeof(ipc::circ::elem_array<Policy, DataSize, 1>)); }
};
using cq_t = ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
>;
bool operator==(msg_t const & m1, msg_t const & m2) {
return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_);
}
} // internal-linkage
template <std::size_t D, typename P>
struct test_verify<ea_t<D, P>> {
std::vector<std::unordered_map<int, std::vector<int>>> list_;
test_verify(int M)
: list_(static_cast<std::size_t>(M))
{}
void prepare(void* pt) {
std::cout << "start consumer: " << pt << std::endl;
}
void push_data(int cid, msg_t const & msg) {
list_[cid][msg.pid_].push_back(msg.dat_);
}
void verify(int N, int Loops) {
std::cout << "verifying..." << std::endl;
for (auto& c_dats : list_) {
for (int n = 0; n < N; ++n) {
auto& vec = c_dats[n];
//for (int d : vec) {
// std::cout << d << " ";
//}
//std::cout << std::endl;
EXPECT_EQ(vec.size(), static_cast<std::size_t>(Loops));
int i = 0;
for (int d : vec) {
EXPECT_EQ(i, d);
++i;
}
}
}
}
};
template <ipc::relat Rp>
struct test_verify<pc_t<Rp, ipc::relat::multi, ipc::trans::unicast>> : test_verify<cq_t> {
using test_verify<cq_t>::test_verify;
void verify(int N, int Loops) {
std::cout << "verifying..." << std::endl;
for (int n = 0; n < N; ++n) {
std::vector<int> datas;
std::uint64_t sum = 0;
for (auto& c_dats : list_) {
for (int d : c_dats[n]) {
datas.push_back(d);
sum += d;
}
}
EXPECT_EQ(datas.size(), static_cast<std::size_t>(Loops));
EXPECT_EQ(sum, (Loops * std::uint64_t(Loops - 1)) / 2);
}
}
};
template <typename P>
struct quit_mode;
template <ipc::relat Rp, ipc::relat Rc>
struct quit_mode<pc_t<Rp, Rc, ipc::trans::unicast>> {
using type = volatile bool;
};
template <ipc::relat Rp, ipc::relat Rc>
struct quit_mode<pc_t<Rp, Rc, ipc::trans::broadcast>> {
struct type {
constexpr type(bool) {}
constexpr operator bool() const { return false; }
};
};
template <std::size_t D, typename P>
struct test_cq<ea_t<D, P>> {
using ca_t = ea_t<D, P>;
using cn_t = decltype(std::declval<ca_t>().connect());
using cr_t = decltype(std::declval<ca_t>().cursor());
typename quit_mode<P>::type quit_ = false;
ca_t* ca_;
std::unordered_map<cn_t, cr_t> conns_;
test_cq(ca_t* ca) : ca_(ca) {}
cn_t connect() {
auto it = conns_.emplace(ca_->connect(), ca_->cursor());
return it.first->first;
}
ca_t* connect_send() {
return ca_;
}
void disconnect(cn_t cc_id) {
ca_->disconnect(cc_id);
}
void disconnect(ca_t*) {}
ca_t * elems() noexcept { return ca_; }
ca_t const * elems() const noexcept { return ca_; }
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t cn, F&& proc) {
struct que {
cn_t cn_;
// for ca_->pop
cn_t connected_id() const noexcept { return cn_; }
} q { cn };
cr_t& cur = conns_[cn];
while (1) {
msg_t msg;
while (ca_->pop(&q, &cur, [&msg](void* p) {
msg = *static_cast<msg_t*>(p);
})) {
if (msg.pid_ < 0) {
quit_ = true;
return;
}
proc(msg);
}
if (quit_) return;
std::this_thread::yield();
}
}
void send(ca_t* ca, msg_t const & msg) {
while (!ca->push(this, [&msg](void* p) {
(*static_cast<msg_t*>(p)) = msg;
})) {
std::this_thread::yield();
}
}
};
template <typename... T>
struct test_cq<ipc::queue<T...>> {
using cn_t = ipc::queue<T...>;
test_cq(void*) {}
cn_t* connect() {
cn_t* queue = new cn_t { "test-ipc-queue" };
[&] { EXPECT_TRUE(queue->connect()); } ();
return queue;
}
void disconnect(cn_t* queue) {
queue->disconnect();
delete queue;
}
void wait_start(int M) {
cn_t que("test-ipc-queue");
while (que.conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t* queue, F&& proc) {
while(1) {
typename cn_t::value_t msg;
while (!queue->pop(msg)) {
std::this_thread::yield();
}
if (msg.pid_ < 0) return;
proc(msg);
}
}
cn_t* connect_send() {
return new cn_t { "test-ipc-queue" };
}
void send(cn_t* cn, msg_t const & msg) {
while (!cn->push(msg)) {
std::this_thread::yield();
}
}
};
namespace {
constexpr int LoopCount = 1000000;
//constexpr int LoopCount = 1000/*0000*/;
TEST(Circ, init) {
std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl;
std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl;
std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl;
std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl;
EXPECT_EQ(static_cast<std::size_t>(cq_t::data_size), sizeof(msg_t));
std::cout << "sizeof(ea_t<sizeof(msg_t)>) = " << sizeof(cq_t) << std::endl;
}
TEST(Circ, prod_cons_1v1) {
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>
> el_arr_ssu;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
> el_arr_smu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
> el_arr_mmu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_smb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smb);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb);
}
TEST(Circ, prod_cons_1v3) {
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
> el_arr_smu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
> el_arr_mmu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_smb);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smb);
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb);
}
TEST(Circ, prod_cons_performance) {
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
> el_arr_smu;
ipc::detail::static_for<8>([&el_arr_smu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
});
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_smb;
ipc::detail::static_for<8>([&el_arr_smb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smb);
});
// test & verify
ipc::detail::static_for<8>([&el_arr_smb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, cq_t>(&el_arr_smb);
});
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
> el_arr_mmu;
ipc::detail::static_for<8>([&el_arr_mmu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
});
ipc::detail::static_for<8>([&el_arr_mmu](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, 1, LoopCount, void>(&el_arr_mmu);
});
ipc::detail::static_for<8>([&el_arr_mmu](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
});
ea_t<
sizeof(msg_t),
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
> el_arr_mmb;
ipc::detail::static_for<8>([&el_arr_mmb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
});
ipc::detail::static_for<8>([&el_arr_mmb](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, 1, LoopCount, void>(&el_arr_mmb);
});
ipc::detail::static_for<8>([&el_arr_mmb](auto index) {
benchmark_prod_cons<decltype(index)::value + 1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
});
}
TEST(Circ, queue) {
using queue_t = ipc::queue<msg_t, ipc::policy::choose<
ipc::circ::elem_array,
ipc::wr<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
>>;
queue_t queue;
EXPECT_TRUE(!queue.push(msg_t { 1, 2 }));
msg_t msg {};
EXPECT_TRUE(!queue.pop(msg));
EXPECT_EQ(msg, (msg_t {}));
EXPECT_TRUE(sizeof(decltype(queue)::elems_t) <= sizeof(cq_t));
ipc::detail::static_for<16>([](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);
});
}
} // internal-linkage

View File

@ -1,472 +1,150 @@
#include <thread>
#include <vector>
#include <type_traits>
#include <iostream>
#include <shared_mutex>
#include <mutex>
#include <typeinfo>
#include <memory>
#include <string>
#include <cstring>
#include <algorithm>
#include <array>
#include <limits>
#include <utility>
#include "capo/stopwatch.hpp"
#include "capo/spin_lock.hpp"
#include "capo/random.hpp"
#include <vector>
#include <iostream>
#include <cstring>
#include "ipc.h"
#include "rw_lock.h"
#include "buffer.h"
#include "memory/resource.h"
#include "test.h"
#include "thread_pool.h"
#include "capo/random.hpp"
using namespace ipc;
namespace {
std::vector<ipc::buff_t> datas__;
constexpr int DataMin = 2;
constexpr int DataMax = 256;
constexpr int LoopCount = 100000;
// constexpr int LoopCount = 1000;
} // internal-linkage
template <typename T>
struct test_verify {
std::vector<std::vector<ipc::buff_t>> list_;
test_verify(int M)
: list_(static_cast<std::size_t>(M))
{}
void prepare(void* /*pt*/) {}
void push_data(int cid, ipc::buff_t && msg) {
list_[cid].emplace_back(std::move(msg));
}
void verify(int /*N*/, int /*Loops*/) {
std::cout << "verifying..." << std::endl;
for (auto& c_dats : list_) {
EXPECT_EQ(datas__.size(), c_dats.size());
std::size_t i = 0;
for (auto& d : c_dats) {
EXPECT_EQ(datas__[i++], d);
}
}
}
};
template <>
struct test_cq<ipc::route> {
using cn_t = ipc::route;
std::string conn_name_;
test_cq(void*)
: conn_name_("test-ipc-route") {
}
cn_t connect() {
return cn_t { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) {
cn_t::wait_for_recv(conn_name_.c_str(), static_cast<std::size_t>(M));
}
template <typename F>
void recv(cn_t& cn, F&& proc) {
do {
auto msg = cn.recv();
if (msg.size() < 2) {
EXPECT_EQ(msg, ipc::buff_t('\0'));
return;
}
proc(std::move(msg));
} while(1);
}
cn_t connect_send() {
return connect();
}
void send(cn_t& cn, const std::array<int, 2>& info) {
int n = info[1];
if (n < 0) {
/*EXPECT_TRUE*/(cn.send(ipc::buff_t('\0')));
}
else /*EXPECT_TRUE*/(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
}
};
template <>
struct test_cq<ipc::channel> {
using cn_t = ipc::channel;
std::string conn_name_;
int m_ = 0;
test_cq(void*)
: conn_name_("test-ipc-channel") {
}
cn_t connect() {
return cn_t { conn_name_.c_str() };
}
void disconnect(cn_t& cn) {
cn.disconnect();
}
void wait_start(int M) { m_ = M; }
template <typename F>
void recv(cn_t& cn, F&& proc) {
do {
auto msg = cn.recv();
if (msg.size() < 2) {
EXPECT_EQ(msg, ipc::buff_t('\0'));
return;
}
proc(std::move(msg));
} while(1);
}
cn_t connect_send() {
return connect();
}
void send(cn_t& cn, const std::array<int, 2>& info) {
thread_local struct s_dummy {
s_dummy(cn_t& cn, int m) {
cn.wait_for_recv(static_cast<std::size_t>(m));
// std::printf("start to send: %d.\n", m);
}
} _(cn, m_);
int n = info[1];
if (n < 0) {
/*EXPECT_TRUE*/(cn.send(ipc::buff_t('\0')));
}
else /*EXPECT_TRUE*/(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
}
};
namespace {
struct Init {
Init() {
capo::random<> rdm { DataMin, DataMax };
capo::random<> bit { 0, (std::numeric_limits<ipc::byte_t>::max)() };
for (int i = 0; i < LoopCount; ++i) {
std::size_t n = static_cast<std::size_t>(rdm());
ipc::buff_t buff {
new ipc::byte_t[n], n,
[](void* p, std::size_t) {
delete [] static_cast<ipc::byte_t*>(p);
}
};
for (std::size_t k = 0; k < buff.size(); ++k) {
static_cast<ipc::byte_t*>(buff.data())[k] = static_cast<ipc::byte_t>(bit());
}
datas__.emplace_back(std::move(buff));
}
}
} init__;
template <typename T>
constexpr T acc(T b, T e) noexcept {
return (e + b) * (e - b + 1) / 2;
}
template <typename Mutex>
struct lc_wrapper : Mutex {
void lock_shared () { Mutex::lock (); }
void unlock_shared() { Mutex::unlock(); }
};
template <typename Lc, int W, int R, int Loops = LoopCount>
void benchmark_lc() {
std::thread w_trd[W];
std::thread r_trd[R];
std::atomic_int fini { 0 };
// std::atomic_bool wf { false };
std::vector<int> datas;
Lc lc;
test_stopwatch sw;
std::cout << std::endl << type_name<Lc>() << std::endl;
for (auto& t : r_trd) {
t = std::thread([&] {
std::vector<int> seq;
std::size_t cnt = 0;
while (1) {
int x = -1;
{
std::shared_lock<Lc> guard { lc };
// EXPECT_TRUE(!wf);
if (cnt < datas.size()) {
x = datas[cnt];
}
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (x == 0) break; // quit
if (x != -1) {
seq.push_back(x);
++cnt;
}
}
std::this_thread::yield();
}
if ((fini.fetch_add(1, std::memory_order_relaxed) + 1) == R) {
sw.print_elapsed(W, R, Loops);
}
std::uint64_t sum = 0;
for (int i : seq) sum += static_cast<std::uint64_t>(i);
EXPECT_EQ(sum, acc<std::uint64_t>(1, Loops) * W);
class rand_buf : public buffer {
public:
rand_buf() {
int size = capo::random{1, 65536}();
*this = buffer(new char[size], size, [](void * p, std::size_t) {
delete [] static_cast<char *>(p);
});
}
for (auto& t : w_trd) {
t = std::thread([&] {
rand_buf(rand_buf &&) = default;
rand_buf(rand_buf const & rhs) {
if (rhs.empty()) return;
void * mem = new char[rhs.size()];
std::memcpy(mem, rhs.data(), rhs.size());
*this = buffer(mem, rhs.size(), [](void * p, std::size_t) {
delete [] static_cast<char *>(p);
});
}
rand_buf(buffer && rhs)
: buffer(std::move(rhs)) {
}
void set_id(int k) noexcept {
*get<char *>() = static_cast<char>(k);
}
int get_id() const noexcept {
return static_cast<int>(*get<char *>());
}
using buffer::operator=;
};
template <relat Rp, relat Rc, trans Ts>
void test_basic(char const * name) {
using que_t = chan<wr<Rp, Rc, Ts>>;
rand_buf test1, test2;
que_t que1 { name };
EXPECT_FALSE(que1.send(test1));
EXPECT_FALSE(que1.try_send(test2));
que_t que2 { que1.name(), ipc::receiver };
EXPECT_TRUE(que1.send(test1));
EXPECT_TRUE(que1.try_send(test2));
EXPECT_EQ(que2.recv(), test1);
EXPECT_EQ(que2.recv(), test2);
}
template <relat Rp, relat Rc, trans Ts>
void test_sr(char const * name, int size, int s_cnt, int r_cnt) {
using que_t = chan<wr<Rp, Rc, Ts>>;
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
ipc_ut::test_stopwatch sw;
std::vector<rand_buf> tests(size);
for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] {
que_t que1 { name };
EXPECT_TRUE(que1.wait_for_recv(r_cnt));
sw.start();
for (int i = 1; i <= Loops; ++i) {
{
std::unique_lock<Lc> guard { lc };
// wf = true;
datas.push_back(i);
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// wf = false;
}
std::this_thread::yield();
for (auto & buf : tests) {
rand_buf data { buf };
data.set_id(k);
EXPECT_TRUE(que1.send(data));
}
});
};
}
for (auto& t : w_trd) t.join();
lc.lock();
datas.push_back(0);
lc.unlock();
for (auto& t : r_trd) t.join();
}
template <int W, int R>
void test_lock_performance() {
std::cout << std::endl
<< "test_lock_performance: [" << W << ":" << R << "]"
<< std::endl;
benchmark_lc<ipc::rw_lock , W, R>();
benchmark_lc<lc_wrapper< ipc::spin_lock>, W, R>();
benchmark_lc<lc_wrapper<capo::spin_lock>, W, R>();
benchmark_lc<lc_wrapper<std::mutex> , W, R>();
benchmark_lc<std::shared_timed_mutex , W, R>();
}
TEST(IPC, rw_lock) {
// test_lock_performance<1, 1>();
// test_lock_performance<4, 4>();
// test_lock_performance<1, 8>();
// test_lock_performance<8, 1>();
}
template <typename T, int N, int M, bool V = true, int Loops = LoopCount>
void test_prod_cons() {
benchmark_prod_cons<N, M, Loops, std::conditional_t<V, T, void>>((T*)nullptr);
}
TEST(IPC, route) {
// return;
std::vector<char const *> const datas = {
"hello!",
"foo",
"bar",
"ISO/IEC",
"14882:2011",
"ISO/IEC 14882:2017 Information technology - Programming languages - C++",
"ISO/IEC 14882:2020",
"Modern C++ Design: Generic Programming and Design Patterns Applied"
};
std::thread t1 {[&] {
ipc::route cc { "my-ipc-route" };
for (std::size_t i = 0; i < datas.size(); ++i) {
ipc::buff_t dd = cc.recv();
std::cout << "recv: " << (char*)dd.data() << std::endl;
EXPECT_EQ(dd.size(), std::strlen(datas[i]) + 1);
EXPECT_TRUE(std::memcmp(dd.data(), datas[i], dd.size()) == 0);
}
}};
std::thread t2 {[&] {
ipc::route cc { "my-ipc-route" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
for (std::size_t i = 0; i < datas.size(); ++i) {
std::cout << "sending: " << datas[i] << std::endl;
EXPECT_TRUE(cc.send(datas[i]));
}
}};
t1.join();
t2.join();
test_prod_cons<ipc::route, 1, 1>(); // test & verify
}
TEST(IPC, route_rtt) {
// return;
test_stopwatch sw;
std::thread t1 {[&] {
ipc::route cc { "my-ipc-route-1" };
ipc::route cr { "my-ipc-route-2" };
for (std::size_t i = 0;; ++i) {
auto dd = cc.recv();
if (dd.size() < 2) return;
//std::cout << "recv: " << i << "-[" << dd.size() << "]" << std::endl;
while (!cr.send(ipc::buff_t('a'))) {
std::this_thread::yield();
for (int k = 0; k < r_cnt; ++k) {
ipc_ut::reader() << [name, &tests, s_cnt] {
que_t que2 { name, ipc::receiver };
std::vector<int> cursors(s_cnt);
for (;;) {
rand_buf got { que2.recv() };
ASSERT_FALSE(got.empty());
int & cur = cursors.at(got.get_id());
ASSERT_TRUE((cur >= 0) && (cur < static_cast<int>(tests.size())));
rand_buf buf { tests.at(cur++) };
buf.set_id(got.get_id());
EXPECT_EQ(got, buf);
int n = 0;
for (; n < static_cast<int>(cursors.size()); ++n) {
if (cursors[n] < static_cast<int>(tests.size())) break;
}
if (n == static_cast<int>(cursors.size())) break;
}
}
}};
};
}
std::thread t2 {[&] {
ipc::route cc { "my-ipc-route-1" };
ipc::route cr { "my-ipc-route-2" };
while (cc.recv_count() == 0) {
std::this_thread::yield();
}
sw.start();
for (std::size_t i = 0; i < LoopCount; ++i) {
cc.send(datas__[i]);
//std::cout << "sent: " << i << "-[" << datas__[i].size() << "]" << std::endl;
/*auto dd = */cr.recv();
// if (dd.size() != 1 || dd[0] != 'a') {
// EXPECT_TRUE(false);
// }
}
cc.send(ipc::buff_t('\0'));
t1.join();
sw.print_elapsed(1, 1, LoopCount);
}};
t2.join();
ipc_ut::sender().wait_for_done();
ipc_ut::reader().wait_for_done();
sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, size, name);
}
TEST(IPC, route_performance) {
// return;
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::route, 1, decltype(index)::value + 1, false>();
});
// test_prod_cons<ipc::route, 1, 8>(); // test & verify
}
TEST(IPC, channel) {
// return;
int fail_v = 0;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {
ipc::buff_t dd = cc.recv();
if (dd.size() < 2) return;
if (dd != datas__[i]) {
std::printf("fail recv: %zd-[%zd, %zd]\n", i, dd.size(), datas__[i].size());
// for (std::size_t k = 0; k < dd.size(); ++k) {
// if (dd.data<ipc::byte_t>()[k] != datas__[i].data<ipc::byte_t>()[k]) {
// std::printf("fail check: %zd-%zd, %02x != %02x\n",
// i, k, (unsigned)dd .data<ipc::byte_t>()[k],
// (unsigned)datas__[i].data<ipc::byte_t>()[k]);
// }
// }
++fail_v;
}
}
}};
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
cc.wait_for_recv(1);
for (std::size_t i = 0; i < static_cast<std::size_t>((std::min)(100, LoopCount)); ++i) {
std::printf("sending: %zd-[%zd]\n", i, datas__[i].size());
cc.send(datas__[i]);
}
cc.send(ipc::buff_t('\0'));
t1.join();
}};
t2.join();
EXPECT_EQ(fail_v, 0);
}
TEST(IPC, channel_rtt) {
// return;
test_stopwatch sw;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
bool recv_2 = false;
for (std::size_t i = 0;; ++i) {
auto dd = cc.recv();
if (dd.size() < 2) return;
//if (i % 1000 == 0) {
// std::cout << "recv: " << i << "-[" << dd.size() << "]" << std::endl;
//}
while (!recv_2) {
recv_2 = cc.wait_for_recv(2);
}
cc.send(ipc::buff_t('a'));
}
}};
std::thread t2 {[&] {
ipc::channel cc { "my-ipc-channel" };
cc.wait_for_recv(1);
sw.start();
for (std::size_t i = 0; i < LoopCount; ++i) {
//if (i % 1000 == 0) {
// std::cout << "send: " << i << "-[" << datas__[i].size() << "]" << std::endl;
//}
cc.send(datas__[i]);
/*auto dd = */cc.recv();
//if (dd.size() != 1 || dd.data<char>()[0] != 'a') {
// std::cout << "recv ack fail: " << i << "-[" << dd.size() << "]" << std::endl;
// EXPECT_TRUE(false);
//}
}
cc.send(ipc::buff_t('\0'));
t1.join();
sw.print_elapsed(1, 1, LoopCount);
}};
t2.join();
}
TEST(IPC, channel_performance) {
// return;
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::channel, 1, decltype(index)::value + 1, false>();
});
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::channel, decltype(index)::value + 1, 1, false>();
});
ipc::detail::static_for<8>([](auto index) {
test_prod_cons<ipc::channel, decltype(index)::value + 1,
decltype(index)::value + 1, false>();
});
}
constexpr int LoopCount = 10000;
constexpr int MultiMax = 8;
} // internal-linkage
TEST(IPC, basic) {
test_basic<relat::single, relat::single, trans::unicast >("ssu");
test_basic<relat::single, relat::multi , trans::unicast >("smu");
test_basic<relat::multi , relat::multi , trans::unicast >("mmu");
test_basic<relat::single, relat::multi , trans::broadcast>("smb");
test_basic<relat::multi , relat::multi , trans::broadcast>("mmb");
}
TEST(IPC, 1v1) {
test_sr<relat::single, relat::single, trans::unicast >("ssu", LoopCount, 1, 1);
test_sr<relat::single, relat::multi , trans::unicast >("smu", LoopCount, 1, 1);
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", LoopCount, 1, 1);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, 1);
}
TEST(IPC, 1vN) {
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, MultiMax);
}
TEST(IPC, Nv1) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, 1);
}
TEST(IPC, NvN) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, MultiMax);
}

View File

@ -12,18 +12,21 @@
// #include "gperftools/tcmalloc.h"
#include "test.h"
#include "thread_pool.h"
namespace {
constexpr int DataMin = 4;
constexpr int DataMax = 256;
constexpr int LoopCount = 4194304;
constexpr int LoopCount = 8388608;
constexpr int ThreadMax = 16;
// constexpr int DataMin = 256;
// constexpr int DataMax = 512;
// constexpr int LoopCount = 2097152;
std::vector<std::size_t> sizes__;
std::vector<void*> ptr_cache__[ThreadMax];
template <typename M>
struct alloc_ix_t {
@ -33,10 +36,11 @@ struct alloc_ix_t {
alloc_ix_t() {
if (inited_) return;
inited_ = true;
M::init(ix_);
M::init();
}
int index(std::size_t /*pid*/, std::size_t /*k*/, std::size_t n) {
template <int ThreadsN>
static int index(std::size_t /*pid*/, std::size_t /*k*/, std::size_t n) {
return ix_[n];
}
};
@ -46,38 +50,36 @@ std::vector<int> alloc_ix_t<M>::ix_(LoopCount);
template <typename M>
bool alloc_ix_t<M>::inited_ = false;
template <std::size_t N>
struct alloc_FIFO : alloc_ix_t<alloc_FIFO<N>> {
static void init(std::vector<int>& ix) {
struct alloc_FIFO : alloc_ix_t<alloc_FIFO> {
static void init() {
for (int i = 0; i < LoopCount; ++i) {
ix[static_cast<std::size_t>(i)] = i;
ix_[static_cast<std::size_t>(i)] = i;
}
}
};
template <std::size_t N>
struct alloc_LIFO : alloc_ix_t<alloc_LIFO<N>> {
static void init(std::vector<int>& ix) {
struct alloc_LIFO : alloc_ix_t<alloc_LIFO> {
static void init() {
for (int i = 0; i < LoopCount; ++i) {
ix[static_cast<std::size_t>(i)] = i;
ix_[static_cast<std::size_t>(i)] = i;
}
}
int index(std::size_t pid, std::size_t k, std::size_t n) {
constexpr static int CacheSize = LoopCount / N;
template <int ThreadsN>
static int index(std::size_t pid, std::size_t k, std::size_t n) {
constexpr static int CacheSize = LoopCount / ThreadsN;
if (k) {
return this->ix_[(CacheSize * (2 * pid + 1)) - 1 - n];
return ix_[(CacheSize * (2 * pid + 1)) - 1 - n];
}
else return this->ix_[n];
else return ix_[n];
}
};
template <std::size_t N>
struct alloc_random : alloc_ix_t<alloc_random<N>> {
static void init(std::vector<int>& ix) {
struct alloc_Random : alloc_ix_t<alloc_Random> {
static void init() {
capo::random<> rdm_index(0, LoopCount - 1);
for (int i = 0; i < LoopCount; ++i) {
ix[static_cast<std::size_t>(i)] = rdm_index();
ix_[static_cast<std::size_t>(i)] = rdm_index();
}
}
};
@ -88,69 +90,50 @@ struct Init {
for (int i = 0; i < LoopCount; ++i) {
sizes__.emplace_back(static_cast<std::size_t>(rdm()));
}
for (auto& vec : ptr_cache__) {
vec.resize(LoopCount, nullptr);
}
}
} init__;
template <typename AllocT, int ThreadsN>
void benchmark_alloc() {
std::cout << std::endl
<< "[Threads: " << ThreadsN << "] "
<< type_name<AllocT>() << std::endl;
void benchmark_alloc(char const * message) {
std::string msg = std::to_string(ThreadsN) + "\t" + message;
constexpr static int CacheSize = LoopCount / ThreadsN;
ipc_ut::sender().start(static_cast<std::size_t>(ThreadsN));
ipc_ut::test_stopwatch sw;
std::atomic_int fini { 0 };
test_stopwatch sw;
std::thread works[ThreadsN];
int pid = 0;
for (auto& w : works) {
w = std::thread {[&, pid] {
for (int pid = 0; pid < ThreadsN; ++pid) {
ipc_ut::sender() << [&, pid] {
sw.start();
for (std::size_t k = 0; k < 100; ++k)
for (int n = (CacheSize * pid); n < (CacheSize * (pid + 1)); ++n) {
std::size_t s = sizes__[n];
AllocT::free(AllocT::alloc(s), s);
}
if ((fini.fetch_add(1, std::memory_order_relaxed) + 1) == ThreadsN) {
sw.print_elapsed<1, std::chrono::nanoseconds>(DataMin, DataMax, LoopCount * 100, " ns/d");
}
}};
++pid;
};
}
for (auto& w : works) w.join();
ipc_ut::sender().wait_for_done();
sw.print_elapsed<1>(DataMin, DataMax, LoopCount, msg.c_str());
}
template <typename AllocT, template <std::size_t> class ModeT, int ThreadsN>
void benchmark_alloc() {
std::cout << std::endl
<< "[Threads: " << ThreadsN << ", Mode: " << type_name<ModeT<ThreadsN>>() << "] "
<< type_name<AllocT>() << std::endl;
template <typename AllocT, typename ModeT, int ThreadsN>
void benchmark_alloc(char const * message) {
std::string msg = std::to_string(ThreadsN) + "\t" + message;
constexpr static int CacheSize = LoopCount / ThreadsN;
ModeT mode;
ipc_ut::sender().start(static_cast<std::size_t>(ThreadsN));
ipc_ut::test_stopwatch sw;
std::vector<void*> ptrs[ThreadsN];
for (auto& vec : ptrs) {
vec.resize(LoopCount);
}
ModeT<ThreadsN> mode;
std::atomic_int fini { 0 };
test_stopwatch sw;
std::thread works[ThreadsN];
int pid = 0;
for (auto& w : works) {
w = std::thread {[&, pid] {
auto& vec = ptrs[pid];
for (int pid = 0; pid < ThreadsN; ++pid) {
ipc_ut::sender() << [&, pid] {
auto& vec = ptr_cache__[pid];
sw.start();
for (std::size_t k = 0; k < 2; ++k)
for (int n = (CacheSize * pid); n < (CacheSize * (pid + 1)); ++n) {
int m = mode.index(pid, k, n);
int m = mode.template index<ThreadsN>(pid, k, n);
void*& p = vec[static_cast<std::size_t>(m)];
std::size_t s = sizes__[static_cast<std::size_t>(m)];
if (p == nullptr) {
@ -161,45 +144,40 @@ void benchmark_alloc() {
p = nullptr;
}
}
if ((fini.fetch_add(1, std::memory_order_relaxed) + 1) == ThreadsN) {
sw.print_elapsed<1>(DataMin, DataMax, LoopCount);
}
}};
++pid;
};
}
for (auto& w : works) w.join();
ipc_ut::sender().wait_for_done();
sw.print_elapsed<1>(DataMin, DataMax, LoopCount, msg.c_str());
}
template <typename AllocT, template <std::size_t> class ModeT, int ThreadsN>
template <typename AllocT, typename ModeT, int ThreadsN>
struct test_performance {
static void start() {
test_performance<AllocT, ModeT, ThreadsN / 2>::start();
benchmark_alloc<AllocT, ModeT, ThreadsN>();
static void start(char const * message) {
test_performance<AllocT, ModeT, ThreadsN / 2>::start(message);
benchmark_alloc<AllocT, ModeT, ThreadsN>(message);
}
};
template <typename AllocT, template <std::size_t> class ModeT>
template <typename AllocT, typename ModeT>
struct test_performance<AllocT, ModeT, 1> {
static void start() {
benchmark_alloc<AllocT, ModeT, 1>();
static void start(char const * message) {
benchmark_alloc<AllocT, ModeT, 1>(message);
}
};
template <std::size_t> struct dummy;
template <typename AllocT, int ThreadsN>
struct test_performance<AllocT, dummy, ThreadsN> {
static void start() {
test_performance<AllocT, dummy, ThreadsN / 2>::start();
benchmark_alloc<AllocT, ThreadsN>();
struct test_performance<AllocT, void, ThreadsN> {
static void start(char const * message) {
test_performance<AllocT, void, ThreadsN / 2>::start(message);
benchmark_alloc<AllocT, ThreadsN>(message);
}
};
template <typename AllocT>
struct test_performance<AllocT, dummy, 1> {
static void start() {
benchmark_alloc<AllocT, 1>();
struct test_performance<AllocT, void, 1> {
static void start(char const * message) {
benchmark_alloc<AllocT, 1>(message);
}
};
@ -217,27 +195,24 @@ struct test_performance<AllocT, dummy, 1> {
// };
TEST(Memory, static_alloc) {
// test_performance<ipc::mem::static_alloc, dummy , 128>::start();
// test_performance<ipc::mem::static_alloc, alloc_FIFO , 128>::start();
// test_performance<ipc::mem::static_alloc, alloc_LIFO , 128>::start();
// test_performance<ipc::mem::static_alloc, alloc_random, 128>::start();
test_performance<ipc::mem::static_alloc, void , ThreadMax>::start("alloc-free");
test_performance<ipc::mem::static_alloc, alloc_FIFO , ThreadMax>::start("alloc-FIFO");
test_performance<ipc::mem::static_alloc, alloc_LIFO , ThreadMax>::start("alloc-LIFO");
test_performance<ipc::mem::static_alloc, alloc_Random, ThreadMax>::start("alloc-Rand");
}
TEST(Memory, pool_alloc) {
//test_performance<ipc::mem::async_pool_alloc, dummy , 128>::start();
//test_performance<ipc::mem::async_pool_alloc, alloc_FIFO , 128>::start();
//test_performance<ipc::mem::async_pool_alloc, dummy , 128>::start();
//test_performance<ipc::mem::async_pool_alloc, alloc_FIFO , 128>::start();
//test_performance<ipc::mem::async_pool_alloc, alloc_LIFO , 128>::start();
//test_performance<ipc::mem::async_pool_alloc, alloc_random, 128>::start();
test_performance<ipc::mem::async_pool_alloc, void , ThreadMax>::start("alloc-free");
test_performance<ipc::mem::async_pool_alloc, alloc_FIFO , ThreadMax>::start("alloc-FIFO");
test_performance<ipc::mem::async_pool_alloc, alloc_LIFO , ThreadMax>::start("alloc-LIFO");
test_performance<ipc::mem::async_pool_alloc, alloc_Random, ThreadMax>::start("alloc-Rand");
}
TEST(Memory, tc_alloc) {
// test_performance<tc_alloc, dummy , 128>::start();
// test_performance<tc_alloc, alloc_FIFO , 128>::start();
// test_performance<tc_alloc, alloc_LIFO , 128>::start();
// test_performance<tc_alloc, alloc_random, 128>::start();
}
// TEST(Memory, tc_alloc) {
// test_performance<tc_alloc, void , ThreadMax>::start();
// test_performance<tc_alloc, alloc_FIFO , ThreadMax>::start();
// test_performance<tc_alloc, alloc_LIFO , ThreadMax>::start();
// test_performance<tc_alloc, alloc_Random, ThreadMax>::start();
// }
} // internal-linkage

181
test/test_queue.cpp Executable file
View File

@ -0,0 +1,181 @@
#include <iostream>
#include <string>
#include <type_traits>
#include <memory>
#include <new>
#include <vector>
#include <unordered_map>
#include "prod_cons.h"
#include "policy.h"
#include "circ/elem_array.h"
#include "queue.h"
#include "test.h"
namespace {
struct msg_t {
int pid_;
int dat_;
msg_t() = default;
msg_t(int p, int d) : pid_(p), dat_(d) {}
};
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
using queue_t = ipc::queue<msg_t, ipc::policy::choose<ipc::circ::elem_array, ipc::wr<Rp, Rc, Ts>>>;
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
struct elems_t : public queue_t<Rp, Rc, Ts>::elems_t {};
bool operator==(msg_t const & m1, msg_t const & m2) noexcept {
return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_);
}
bool operator!=(msg_t const & m1, msg_t const & m2) noexcept {
return !(m1 == m2);
}
constexpr int LoopCount = 1000000;
constexpr int PushRetry = 1000000;
constexpr int ThreadMax = 8;
template <typename Que>
void push(Que & que, int p, int d) {
for (int n = 0; !que.push(p, d); ++n) {
ASSERT_NE(n, PushRetry);
std::this_thread::yield();
}
}
template <typename Que>
msg_t pop(Que & que) {
msg_t msg;
while (!que.pop(msg)) {
std::this_thread::yield();
}
return msg;
}
template <ipc::trans Ts>
struct quitter;
template <>
struct quitter<ipc::trans::unicast> {
template <typename Que>
static void emit(Que && que, int r_cnt) {
for (int k = 0; k < r_cnt; ++k) {
push(que, -1, -1);
}
}
};
template <>
struct quitter<ipc::trans::broadcast> {
template <typename Que>
static void emit(Que && que, int /*r_cnt*/) {
push(que, -1, -1);
}
};
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
void test_sr(elems_t<Rp, Rc, Ts> && elems, int s_cnt, int r_cnt, char const * message) {
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
ipc_ut::test_stopwatch sw;
for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [&elems, &sw, r_cnt, k] {
queue_t<Rp, Rc, Ts> que { &elems };
while (que.conn_count() != static_cast<std::size_t>(r_cnt)) {
std::this_thread::yield();
}
sw.start();
for (int i = 0; i < LoopCount; ++i) {
push(que, k, i);
}
};
}
for (int k = 0; k < r_cnt; ++k) {
ipc_ut::reader() << [&elems, k] {
queue_t<Rp, Rc, Ts> que { &elems };
ASSERT_TRUE(que.connect());
while (pop(que).pid_ >= 0) ;
EXPECT_TRUE(que.disconnect());
};
}
ipc_ut::sender().wait_for_done();
quitter<Ts>::emit(queue_t<Rp, Rc, Ts> { &elems }, r_cnt);
ipc_ut::reader().wait_for_done();
sw.print_elapsed(s_cnt, r_cnt, LoopCount, message);
}
} // internal-linkage
TEST(Queue, check_size) {
using el_t = elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
std::cout << "cq_t::head_size = " << el_t::head_size << std::endl;
std::cout << "cq_t::data_size = " << el_t::data_size << std::endl;
std::cout << "cq_t::elem_size = " << el_t::elem_size << std::endl;
std::cout << "cq_t::block_size = " << el_t::block_size << std::endl;
EXPECT_EQ(static_cast<std::size_t>(el_t::data_size), sizeof(msg_t));
std::cout << "sizeof(elems_t<s, m, b>) = " << sizeof(el_t) << std::endl;
}
TEST(Queue, prod_cons_1v1_unicast) {
test_sr(elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> {}, 1, 1, "ssu");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast> {}, 1, 1, "smu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, 1, "mmu");
}
TEST(Queue, prod_cons_1v1_broadcast) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast> {}, 1, 1, "smb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, 1, "mmb");
}
TEST(Queue, prod_cons_1vN_unicast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "smu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "mmu");
}
}
TEST(Queue, prod_cons_1vN_broadcast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "smb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "mmb");
}
}
TEST(Queue, prod_cons_NvN_unicast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "mmu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, i, 1, "mmu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, i, i, "mmu");
}
}
TEST(Queue, prod_cons_NvN_broadcast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "mmb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, i, 1, "mmb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, i, i, "mmb");
}
}

View File

@ -9,83 +9,94 @@ using namespace ipc::shm;
namespace {
handle shm_hd__;
TEST(SHM, acquire) {
EXPECT_TRUE(!shm_hd__.valid());
handle shm_hd;
EXPECT_FALSE(shm_hd.valid());
EXPECT_TRUE(shm_hd__.acquire("my-test-1", 1024));
EXPECT_TRUE(shm_hd__.valid());
EXPECT_STREQ(shm_hd__.name(), "my-test-1");
EXPECT_TRUE(shm_hd.acquire("my-test-1", 1024));
EXPECT_TRUE(shm_hd.valid());
EXPECT_STREQ(shm_hd.name(), "my-test-1");
EXPECT_TRUE(shm_hd__.acquire("my-test-2", 2048));
EXPECT_TRUE(shm_hd__.valid());
EXPECT_STREQ(shm_hd__.name(), "my-test-2");
EXPECT_TRUE(shm_hd.acquire("my-test-2", 2048));
EXPECT_TRUE(shm_hd.valid());
EXPECT_STREQ(shm_hd.name(), "my-test-2");
EXPECT_TRUE(shm_hd__.acquire("my-test-3", 4096));
EXPECT_TRUE(shm_hd__.valid());
EXPECT_STREQ(shm_hd__.name(), "my-test-3");
EXPECT_TRUE(shm_hd.acquire("my-test-3", 4096));
EXPECT_TRUE(shm_hd.valid());
EXPECT_STREQ(shm_hd.name(), "my-test-3");
}
TEST(SHM, release) {
EXPECT_TRUE(shm_hd__.valid());
shm_hd__.release();
EXPECT_TRUE(!shm_hd__.valid());
handle shm_hd;
EXPECT_FALSE(shm_hd.valid());
shm_hd.release();
EXPECT_FALSE(shm_hd.valid());
EXPECT_TRUE(shm_hd.acquire("release-test-1", 512));
EXPECT_TRUE(shm_hd.valid());
shm_hd.release();
EXPECT_FALSE(shm_hd.valid());
}
TEST(SHM, get) {
EXPECT_TRUE(shm_hd__.get() == nullptr);
EXPECT_TRUE(shm_hd__.acquire("my-test", 1024));
handle shm_hd;
EXPECT_TRUE(shm_hd.get() == nullptr);
EXPECT_TRUE(shm_hd.acquire("get-test", 2048));
auto mem = shm_hd__.get();
auto mem = shm_hd.get();
EXPECT_TRUE(mem != nullptr);
EXPECT_TRUE(mem == shm_hd__.get());
EXPECT_TRUE(mem == shm_hd.get());
std::uint8_t buf[1024] = {};
EXPECT_TRUE(memcmp(mem, buf, sizeof(buf)) == 0);
handle shm_other(shm_hd__.name(), shm_hd__.size());
EXPECT_TRUE(shm_other.get() != shm_hd__.get());
handle shm_other(shm_hd.name(), shm_hd.size());
EXPECT_TRUE(shm_other.get() != shm_hd.get());
}
TEST(SHM, hello) {
auto mem = shm_hd__.get();
handle shm_hd;
EXPECT_TRUE(shm_hd.acquire("hello-test", 128));
auto mem = shm_hd.get();
EXPECT_TRUE(mem != nullptr);
constexpr char hello[] = "hello!";
std::memcpy(mem, hello, sizeof(hello));
EXPECT_STREQ((char const *)shm_hd__.get(), hello);
EXPECT_STREQ((char const *)shm_hd.get(), hello);
shm_hd__.release();
EXPECT_TRUE(shm_hd__.get() == nullptr);
EXPECT_TRUE(shm_hd__.acquire("my-test", 1024));
shm_hd.release();
EXPECT_TRUE(shm_hd.get() == nullptr);
EXPECT_TRUE(shm_hd.acquire("hello-test", 1024));
mem = shm_hd__.get();
mem = shm_hd.get();
EXPECT_TRUE(mem != nullptr);
std::uint8_t buf[1024] = {};
EXPECT_TRUE(memcmp(mem, buf, sizeof(buf)) == 0);
std::memcpy(mem, hello, sizeof(hello));
EXPECT_STREQ((char const *)shm_hd__.get(), hello);
EXPECT_STREQ((char const *)shm_hd.get(), hello);
}
TEST(SHM, mt) {
handle shm_hd;
EXPECT_TRUE(shm_hd.acquire("mt-test", 256));
constexpr char hello[] = "hello!";
std::memcpy(shm_hd.get(), hello, sizeof(hello));
std::thread {
[] {
handle shm_mt(shm_hd__.name(), shm_hd__.size());
shm_hd__.release();
[&shm_hd] {
handle shm_mt(shm_hd.name(), shm_hd.size());
shm_hd.release();
constexpr char hello[] = "hello!";
EXPECT_STREQ((char const *)shm_mt.get(), hello);
}
}.join();
EXPECT_TRUE(shm_hd__.get() == nullptr);
EXPECT_TRUE(!shm_hd__.valid());
EXPECT_TRUE(shm_hd__.acquire("my-test", 1024));
EXPECT_TRUE(shm_hd.get() == nullptr);
EXPECT_FALSE(shm_hd.valid());
EXPECT_TRUE(shm_hd.acquire("mt-test", 1024));
std::uint8_t buf[1024] = {};
EXPECT_TRUE(memcmp(shm_hd__.get(), buf, sizeof(buf)) == 0);
EXPECT_TRUE(memcmp(shm_hd.get(), buf, sizeof(buf)) == 0);
}
} // internal-linkage

202
test/test_thread_utility.cpp Executable file
View File

@ -0,0 +1,202 @@
#include <type_traits>
#include <iostream>
#include <shared_mutex> // std::shared_lock
#include <utility>
#include <thread>
#include <future> // std::async
#include <atomic>
#include <string> // std::string
#include "capo/spin_lock.hpp"
#include "capo/type_name.hpp"
#include "rw_lock.h"
#include "tls_pointer.h"
#include "test.h"
#include "thread_pool.h"
namespace {
constexpr int LoopCount = 100000;
constexpr int ThreadMax = 8;
template <typename T>
constexpr T acc(T b, T e) noexcept {
return (e + b) * (e - b + 1) / 2;
}
template <typename Mutex>
struct lc_wrapper : Mutex {
void lock_shared () { Mutex::lock (); }
void unlock_shared() { Mutex::unlock(); }
};
template <typename Lc, int Loops = LoopCount>
void benchmark_lc(int w, int r, char const * message) {
ipc_ut::sender().start(static_cast<std::size_t>(w));
ipc_ut::reader().start(static_cast<std::size_t>(r));
ipc_ut::test_stopwatch sw;
std::uint64_t data = 0;
std::uint64_t sum = acc<std::uint64_t>(1, Loops) * w;
Lc lc;
for (int k = 0; k < r; ++k) {
ipc_ut::reader() << [sum, &lc, &data] {
while (1) {
{
std::shared_lock<Lc> guard { lc };
if (data == sum) break;
}
std::this_thread::yield();
}
};
}
for (int k = 0; k < w; ++k) {
ipc_ut::sender() << [&sw, &lc, &data] {
sw.start();
for (int i = 1; i <= Loops; ++i) {
{
std::lock_guard<Lc> guard { lc };
data += i;
}
std::this_thread::yield();
}
};
}
ipc_ut::sender().wait_for_done();
sw.print_elapsed(w, r, Loops, message);
ipc_ut::reader().wait_for_done();
}
void test_lock_performance(int w, int r) {
std::cout << "test_lock_performance: [" << w << "-" << r << "]" << std::endl;
benchmark_lc<ipc::rw_lock >(w, r, "ipc::rw_lock");
benchmark_lc<lc_wrapper< ipc::spin_lock>>(w, r, "ipc::spin_lock");
benchmark_lc<lc_wrapper<capo::spin_lock>>(w, r, "capo::spin_lock");
benchmark_lc<lc_wrapper<std::mutex> >(w, r, "std::mutex");
// benchmark_lc<std::shared_mutex >(w, r, "std::shared_mutex");
}
} // internal-linkage
//TEST(Thread, rw_lock) {
// for (int i = 1; i <= ThreadMax; ++i) test_lock_performance(i, 0);
// for (int i = 1; i <= ThreadMax; ++i) test_lock_performance(1, i);
// for (int i = 2; i <= ThreadMax; ++i) test_lock_performance(i, i);
//}
TEST(Thread, tls_main_thread) {
ipc::tls::pointer<int> p;
EXPECT_FALSE(p);
EXPECT_NE(p.create(123), nullptr);
EXPECT_EQ(*p, 123);
}
namespace {
struct Foo {
std::atomic<int> * px_;
Foo(std::atomic<int> * px) : px_(px) {}
~Foo() {
px_->fetch_add(1, std::memory_order_relaxed);
}
};
} // namespace
TEST(Thread, tls_create_once) {
std::atomic<int> x { 0 };
Foo { &x };
EXPECT_EQ(x, 1);
{
ipc::tls::pointer<Foo> foo;
EXPECT_NE(foo.create(&x), nullptr);
EXPECT_EQ(x, 1);
std::thread {[&foo, &x] {
auto foo1 = foo.create_once(&x);
auto foo2 = foo.create_once(&x);
EXPECT_EQ(foo1, foo2);
}}.join();
EXPECT_EQ(x, 2);
}
EXPECT_EQ(x, 3);
}
TEST(Thread, tls_multi_thread) {
std::atomic<int> x { 0 };
{
ipc::tls::pointer<Foo> foo; // no create
EXPECT_EQ(x, 0);
ipc_ut::thread_pool pool { 10 };
for (int i = 0; i < 1000; ++i) {
pool << [&foo, &x] {
// foo.create_once(&x);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
}
pool.wait_for_done();
EXPECT_EQ(x, 0); // thread_pool hasn't destructed.
}
// EXPECT_EQ(x, 10);
}
namespace {
template <typename Tls, typename T>
void benchmark_tls(char const * message) {
ipc_ut::thread_pool pool { static_cast<std::size_t>(ThreadMax) };
ipc_ut::test_stopwatch sw;
pool.wait_for_started();
for (int k = 0; k < ThreadMax; ++k) {
pool << [&sw] {
sw.start();
for (int i = 0; i < LoopCount * 10; ++i) {
*Tls::template get<T>() = i;
}
};
}
pool.wait_for_done();
sw.print_elapsed(ThreadMax, LoopCount * 10, message);
}
struct ipc_tls {
template <typename T>
static T * get() {
static ipc::tls::pointer<T> p;
return p.create_once();
}
};
struct std_tls {
template <typename T>
static T * get() {
thread_local T p;
return &p;
}
};
struct Str {
std::string str_;
Str & operator=(int i) {
str_ = std::to_string(i);
return *this;
}
};
} // internal-linkage
TEST(Thread, tls_benchmark) {
benchmark_tls<std_tls, int>("std_tls: int");
benchmark_tls<ipc_tls, int>("ipc_tls: int");
benchmark_tls<std_tls, Str>("std_tls: Str");
benchmark_tls<ipc_tls, Str>("ipc_tls: Str");
}

119
test/thread_pool.h Executable file
View File

@ -0,0 +1,119 @@
#pragma once
#include <thread> // std::thread
#include <mutex> // std::mutex
#include <condition_variable> // std::condition_variable
#include <deque> // std::deque
#include <functional> // std::function
#include <utility> // std::forward, std::move
#include <cstddef> // std::size_t
#include <cassert> // assert
namespace ipc_ut {
class thread_pool final {
std::deque<std::thread> workers_;
std::deque<std::function<void()>> jobs_;
std::mutex lock_;
std::condition_variable cv_jobs_;
std::condition_variable cv_empty_;
std::size_t waiting_cnt_ = 0;
bool quit_ = false;
static void proc(thread_pool * pool) {
assert(pool != nullptr);
std::function<void()> job;
for (;;) {
{
std::unique_lock<std::mutex> guard { pool->lock_ };
if (pool->quit_) return;
if (pool->jobs_.empty()) {
pool->waiting_cnt_ += 1;
if (pool->waiting_cnt_ == pool->workers_.size()) {
pool->cv_empty_.notify_all();
}
assert(pool->waiting_cnt_ <= pool->workers_.size());
do {
pool->cv_jobs_.wait(guard);
if (pool->quit_) return;
} while (pool->jobs_.empty());
pool->waiting_cnt_ -= 1;
}
assert(!pool->jobs_.empty());
job = std::move(pool->jobs_.front());
pool->jobs_.pop_front();
}
if (job) job();
}
}
public:
thread_pool() = default;
~thread_pool() {
{
std::lock_guard<std::mutex> guard { lock_ };
static_cast<void>(guard);
quit_ = true;
}
cv_jobs_.notify_all();
cv_empty_.notify_all();
for (auto & trd : workers_) trd.join();
}
explicit thread_pool(std::size_t n) : thread_pool() {
start(n);
}
void start(std::size_t n) {
if (n <= workers_.size()) return;
for (std::size_t i = workers_.size(); i < n; ++i) {
workers_.push_back(std::thread { &thread_pool::proc, this });
}
}
std::size_t size() const noexcept {
return workers_.size();
}
std::size_t jobs_size() const noexcept {
return jobs_.size();
}
void wait_for_started() {
std::unique_lock<std::mutex> guard { lock_ };
if (quit_) return;
while (!workers_.empty() && (waiting_cnt_ != workers_.size())) {
cv_empty_.wait(guard);
if (quit_) return;
}
}
void wait_for_done() {
std::unique_lock<std::mutex> guard { lock_ };
if (quit_) return;
while (!jobs_.empty() || (waiting_cnt_ != workers_.size())) {
assert(waiting_cnt_ <= workers_.size());
cv_empty_.wait(guard);
if (quit_) return;
}
}
template <typename F>
thread_pool & operator<<(F && job) {
{
std::lock_guard<std::mutex> guard { lock_ };
static_cast<void>(guard);
jobs_.emplace_back(std::forward<F>(job));
}
cv_jobs_.notify_one();
return *this;
}
};
} // namespace ipc_ut