Merge pull request #59 from mutouyun/master

This commit is contained in:
木头云 2021-08-29 11:05:54 +08:00 committed by GitHub
commit 754661c467
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 484 additions and 679 deletions

View File

@ -2,7 +2,7 @@ name: C/C++ CI
on: on:
push: push:
branches: [ master, develop ] branches: [ master, develop, issue-* ]
pull_request: pull_request:
branches: [ master, develop ] branches: [ master, develop ]

3
.gitignore vendored
View File

@ -44,4 +44,7 @@ CMakeLists.txt.user*
# My output files # My output files
build build
# vs
.vs
.vscode .vscode

View File

@ -8,12 +8,16 @@
# ctest. You can select which tests to run using 'ctest -R regex'. # ctest. You can select which tests to run using 'ctest -R regex'.
# For more options, run 'ctest --help'. # For more options, run 'ctest --help'.
if (POLICY CMP0077)
cmake_policy(SET CMP0077 NEW)
endif (POLICY CMP0077)
# When other libraries are using a shared version of runtime libraries, # When other libraries are using a shared version of runtime libraries,
# Google Test also has to use one. # Google Test also has to use one.
option( option(
gtest_force_shared_crt gtest_force_shared_crt
"Use shared (DLL) run-time lib even when Google Test is built as static lib." "Use shared (DLL) run-time lib even when Google Test is built as static lib."
ON) OFF)
option(gtest_build_tests "Build all of gtest's own tests." OFF) option(gtest_build_tests "Build all of gtest's own tests." OFF)

View File

@ -1,25 +1,48 @@
cmake_minimum_required(VERSION 3.10) cmake_minimum_required(VERSION 3.10)
project(cpp-ipc) project(cpp-ipc)
option(LIBIPC_BUILD_TESTS "Build all of libipc's own tests." OFF) option(LIBIPC_BUILD_TESTS "Build all of libipc's own tests." OFF)
option(LIBIPC_BUILD_DEMOS "Build all of libipc's own demos." OFF) option(LIBIPC_BUILD_DEMOS "Build all of libipc's own demos." OFF)
option(LIBIPC_BUILD_SHARED_LIBS "Build shared libraries (DLLs)." OFF)
option(LIBIPC_USE_STATIC_CRT "Set to ON to build with static CRT on Windows (/MT)." OFF)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG")
if(NOT MSVC) if(NOT MSVC)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
endif() endif()
include_directories(${CMAKE_SOURCE_DIR}/include) if (MSVC AND LIBIPC_USE_STATIC_CRT)
set(CompilerFlags
CMAKE_CXX_FLAGS
CMAKE_CXX_FLAGS_DEBUG
CMAKE_CXX_FLAGS_RELEASE
CMAKE_C_FLAGS
CMAKE_C_FLAGS_DEBUG
CMAKE_C_FLAGS_RELEASE
)
foreach(CompilerFlag ${CompilerFlags})
string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}")
endforeach()
endif()
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin)
set(LIBIPC_PROJECT_DIR ${PROJECT_SOURCE_DIR}) set(LIBIPC_PROJECT_DIR ${PROJECT_SOURCE_DIR})
# Unicode Support
add_definitions(-DUNICODE -D_UNICODE)
add_subdirectory(src) add_subdirectory(src)
if (LIBIPC_BUILD_TESTS) if (LIBIPC_BUILD_TESTS)
set(GOOGLETEST_VERSION 1.10.0) set(GOOGLETEST_VERSION 1.10.0)
if (LIBIPC_USE_STATIC_CRT)
set(gtest_force_shared_crt OFF)
else()
set(gtest_force_shared_crt ON)
endif()
add_subdirectory(3rdparty/gtest) add_subdirectory(3rdparty/gtest)
add_subdirectory(test) add_subdirectory(test)
endif() endif()
@ -30,6 +53,6 @@ if (LIBIPC_BUILD_DEMOS)
endif() endif()
install( install(
DIRECTORY "include/" DIRECTORY "include/"
DESTINATION "include" DESTINATION "include"
) )

View File

@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16;
std::atomic<bool> is_quit__{ false }; std::atomic<bool> is_quit__{ false };
std::atomic<std::size_t> size_counter__{ 0 }; std::atomic<std::size_t> size_counter__{ 0 };
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>; using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
msg_que_t que__{ name__ }; msg_que_t que__{ name__ };
ipc::byte_t buff__[max_sz]; ipc::byte_t buff__[max_sz];

View File

@ -33,7 +33,7 @@ enum : std::uint32_t {
enum : std::size_t { enum : std::size_t {
data_length = 64, data_length = 64,
large_msg_limit = data_length, large_msg_limit = data_length,
large_msg_align = 512, large_msg_align = 1024,
large_msg_cache = 32, large_msg_cache = 32,
}; };

View File

@ -44,9 +44,11 @@
*/ */
#ifndef IPC_EXPORT #ifndef IPC_EXPORT
#if defined(__IPC_LIBRARY__) #if defined(LIBIPC_LIBRARY_SHARED_BUILDING__)
# define IPC_EXPORT IPC_DECL_EXPORT # define IPC_EXPORT IPC_DECL_EXPORT
#else #elif defined(LIBIPC_LIBRARY_SHARED_USING__)
# define IPC_EXPORT IPC_DECL_IMPORT # define IPC_EXPORT IPC_DECL_IMPORT
#else
# define IPC_EXPORT
#endif #endif
#endif /*IPC_EXPORT*/ #endif /*IPC_EXPORT*/

View File

@ -94,6 +94,7 @@ inline void free(void* p, std::size_t size) {
template <typename T> template <typename T>
void free(T* p) { void free(T* p) {
if (p == nullptr) return;
destruct(p); destruct(p);
pool_alloc::free(p, sizeof(T)); pool_alloc::free(p, sizeof(T));
} }

View File

@ -1,112 +0,0 @@
#pragma once
#include <memory> // std::unique_ptr
#include <utility> // std::forward
#include <cstddef> // std::size_t
#include "libipc/export.h"
namespace ipc {
namespace tls {
using key_t = std::size_t;
using destructor_t = void (*)(void *);
struct key_info {
key_t key_;
};
IPC_EXPORT bool create (key_info * pkey, destructor_t destructor = nullptr);
IPC_EXPORT void release(key_info const * pkey);
IPC_EXPORT bool set(key_info const * pkey, void * ptr);
IPC_EXPORT void * get(key_info const * pkey);
////////////////////////////////////////////////////////////////
/// Thread-local pointer
////////////////////////////////////////////////////////////////
/**
* @remarks
* You need to set the ipc::tls::pointer's storage manually:
* ```
* tls::pointer<int> p;
* if (!p) p = new int(123);
* ```
* It would be like an ordinary pointer.
* Or you could just call create_once to 'new' this pointer automatically.
* ```
* tls::pointer<int> p;
* p.create_once(123);
* ```
*/
template <typename T>
class pointer : public key_info {
pointer(pointer const &) = delete;
pointer & operator=(pointer const &) = delete;
void destruct() const {
delete static_cast<T*>(tls::get(this));
}
public:
using value_type = T;
pointer() {
tls::create(this, [](void* p) {
delete static_cast<T*>(p);
});
}
~pointer() {
destruct();
tls::release(this);
}
template <typename... P>
T* create(P&&... params) {
destruct();
std::unique_ptr<T> ptr { new T(std::forward<P>(params)...) };
if (!tls::set(this, ptr.get())) {
return nullptr;
}
return ptr.release();
}
template <typename... P>
T* create_once(P&&... params) {
auto p = static_cast<T*>(tls::get(this));
if (p == nullptr) {
std::unique_ptr<T> ptr { new T(std::forward<P>(params)...) };
if (!tls::set(this, ptr.get())) {
return nullptr;
}
p = ptr.release();
}
return p;
}
T* operator=(T* p) {
set(this, p);
return p;
}
explicit operator T *() const {
return static_cast<T *>(tls::get(this));
}
explicit operator bool() const {
return tls::get(this) != nullptr;
}
T & operator* () { return *static_cast<T *>(*this); }
const T & operator* () const { return *static_cast<T *>(*this); }
T * operator->() { return static_cast<T *>(*this); }
const T * operator->() const { return static_cast<T *>(*this); }
};
} // namespace tls
} // namespace ipc

View File

@ -1,15 +1,5 @@
project(ipc) project(ipc)
add_compile_options(-D__IPC_LIBRARY__)
if(NOT MSVC)
add_compile_options(-fPIC)
endif()
include_directories(
${LIBIPC_PROJECT_DIR}/include
${LIBIPC_PROJECT_DIR}/src)
if(UNIX) if(UNIX)
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_linux.cpp) file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_linux.cpp)
else() else()
@ -26,14 +16,44 @@ file(GLOB HEAD_FILES
${LIBIPC_PROJECT_DIR}/src/libipc/platform/*.h ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*.h
${LIBIPC_PROJECT_DIR}/src/libipc/utility/*.h) ${LIBIPC_PROJECT_DIR}/src/libipc/utility/*.h)
add_library(${PROJECT_NAME} SHARED ${SRC_FILES} ${HEAD_FILES}) if (LIBIPC_BUILD_SHARED_LIBS)
add_library(${PROJECT_NAME} SHARED ${SRC_FILES} ${HEAD_FILES})
target_compile_definitions(${PROJECT_NAME}
INTERFACE
LIBIPC_LIBRARY_SHARED_USING__
PRIVATE
LIBIPC_LIBRARY_SHARED_BUILDING__)
else()
add_library(${PROJECT_NAME} STATIC ${SRC_FILES} ${HEAD_FILES})
endif()
# set output directory
set_target_properties(${PROJECT_NAME}
PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin" )
# set version
set_target_properties(${PROJECT_NAME}
PROPERTIES
VERSION 1.0.0
SOVERSION 1)
target_include_directories(${PROJECT_NAME}
PUBLIC ${LIBIPC_PROJECT_DIR}/include
PRIVATE ${LIBIPC_PROJECT_DIR}/src
)
if(NOT MSVC) if(NOT MSVC)
target_link_libraries(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} PUBLIC
pthread pthread
$<$<NOT:$<STREQUAL:${CMAKE_SYSTEM_NAME},Windows>>:rt>) $<$<NOT:$<STREQUAL:${CMAKE_SYSTEM_NAME},Windows>>:rt>)
endif() endif()
install( install(
TARGETS ${PROJECT_NAME} TARGETS ${PROJECT_NAME}
DESTINATION "lib" RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
) )

View File

@ -13,7 +13,6 @@
#include "libipc/ipc.h" #include "libipc/ipc.h"
#include "libipc/def.h" #include "libipc/def.h"
#include "libipc/shm.h" #include "libipc/shm.h"
#include "libipc/tls_pointer.h"
#include "libipc/pool_alloc.h" #include "libipc/pool_alloc.h"
#include "libipc/queue.h" #include "libipc/queue.h"
#include "libipc/policy.h" #include "libipc/policy.h"
@ -21,6 +20,7 @@
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/id_pool.h" #include "libipc/utility/id_pool.h"
#include "libipc/utility/scope_guard.h"
#include "libipc/utility/utility.h" #include "libipc/utility/utility.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
@ -40,7 +40,7 @@ struct msg_t;
template <std::size_t AlignSize> template <std::size_t AlignSize>
struct msg_t<0, AlignSize> { struct msg_t<0, AlignSize> {
msg_id_t conn_; msg_id_t cc_id_;
msg_id_t id_; msg_id_t id_;
std::int32_t remain_; std::int32_t remain_;
bool storage_; bool storage_;
@ -51,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> {
std::aligned_storage_t<DataSize, AlignSize> data_ {}; std::aligned_storage_t<DataSize, AlignSize> data_ {};
msg_t() = default; msg_t() = default;
msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size)
: msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} {
if (this->storage_) { if (this->storage_) {
if (d != nullptr) { if (data != nullptr) {
// copy storage-id // copy storage-id
*reinterpret_cast<std::size_t*>(&data_) = *static_cast<std::size_t const *>(d); *reinterpret_cast<ipc::storage_id_t*>(&data_) =
*static_cast<ipc::storage_id_t const *>(data);
} }
} }
else std::memcpy(&data_, d, s); else std::memcpy(&data_, data, size);
} }
}; };
@ -91,26 +92,46 @@ auto cc_acc() {
return static_cast<acc_t*>(acc_h.get()); return static_cast<acc_t*>(acc_h.get());
} }
IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align;
}
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(
ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>)) + size));
}
struct chunk_t {
std::atomic<ipc::circ::cc_t> &conns() noexcept {
return *reinterpret_cast<std::atomic<ipc::circ::cc_t> *>(this);
}
void *data() noexcept {
return reinterpret_cast<ipc::byte_t *>(this)
+ ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>));
}
};
struct chunk_info_t { struct chunk_info_t {
ipc::id_pool<> pool_; ipc::id_pool<> pool_;
ipc::spin_lock lock_; ipc::spin_lock lock_;
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t));
}
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); return ipc::id_pool<>::max_count * chunk_size;
} }
ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { ipc::byte_t *chunks_mem() noexcept {
if (id == ipc::invalid_value) return nullptr; return reinterpret_cast<ipc::byte_t *>(this + 1);
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunks_elem_size(chunk_size) * id); }
chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
if (id < 0) return nullptr;
return reinterpret_cast<chunk_t *>(chunks_mem() + (chunk_size * id));
} }
}; };
auto& chunk_storages() { auto& chunk_storages() {
class chunk_t { class chunk_handle_t {
ipc::shm::handle handle_; ipc::shm::handle handle_;
public: public:
@ -129,31 +150,29 @@ auto& chunk_storages() {
return info; return info;
} }
}; };
static ipc::unordered_map<std::size_t, chunk_t> chunk_s; static ipc::map<std::size_t, chunk_handle_t> chunk_hs;
return chunk_s; return chunk_hs;
} }
auto& chunk_lock() { chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
static ipc::spin_lock chunk_l; auto &storages = chunk_storages();
return chunk_l; std::decay_t<decltype(storages)>::iterator it;
{
static ipc::rw_lock lock;
IPC_UNUSED_ std::shared_lock<ipc::rw_lock> guard {lock};
if ((it = storages.find(chunk_size)) == storages.end()) {
using chunk_handle_t = std::decay_t<decltype(storages)>::value_type::second_type;
guard.unlock();
IPC_UNUSED_ std::lock_guard<ipc::rw_lock> guard {lock};
it = storages.emplace(chunk_size, chunk_handle_t{}).first;
}
}
return it->second.get_info(chunk_size);
} }
constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size, ipc::circ::cc_t conns) {
return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align;
}
auto& chunk_storage(std::size_t chunk_size) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock());
return chunk_storages()[chunk_size];
}
std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t size) {
if (conn_count == 0) return {};
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return {}; if (info == nullptr) return {};
info->lock_.lock(); info->lock_.lock();
@ -162,90 +181,92 @@ std::pair<std::size_t, void*> apply_storage(std::size_t conn_count, std::size_t
auto id = info->pool_.acquire(); auto id = info->pool_.acquire();
info->lock_.unlock(); info->lock_.unlock();
auto ptr = info->at(chunk_size, id); auto chunk = info->at(chunk_size, id);
if (ptr == nullptr) return {}; if (chunk == nullptr) return {};
reinterpret_cast<acc_t*>(ptr + chunk_size)->store(static_cast<msg_id_t>(conn_count), std::memory_order_release); chunk->conns().store(conns, std::memory_order_relaxed);
return { id, ptr }; return { id, chunk->data() };
} }
void *find_storage(std::size_t id, std::size_t size) { void *find_storage(ipc::storage_id_t id, std::size_t size) {
if (id == ipc::invalid_value) { if (id < 0) {
ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
return nullptr; return nullptr;
} }
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return nullptr; if (info == nullptr) return nullptr;
return info->at(chunk_size, id)->data();
auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) return nullptr;
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->load(std::memory_order_acquire) == 0) {
ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size);
return nullptr;
}
return ptr;
} }
void recycle_storage(std::size_t id, std::size_t size) { void release_storage(ipc::storage_id_t id, std::size_t size) {
if (id == ipc::invalid_value) { if (id < 0) {
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
return; return;
} }
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return; if (info == nullptr) return;
auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) {
ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size);
return;
}
if (reinterpret_cast<acc_t*>(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) {
// not the last receiver, just return
return;
}
info->lock_.lock(); info->lock_.lock();
info->pool_.release(id); info->pool_.release(id);
info->lock_.unlock(); info->lock_.unlock();
} }
void clear_storage(std::size_t id, std::size_t size) { template <ipc::relat Rp, ipc::relat Rc>
if (id == ipc::invalid_value) { bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::unicast>,
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); std::atomic<ipc::circ::cc_t> &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept {
return; return true;
} }
std::size_t chunk_size = calc_chunk_size(size); template <ipc::relat Rp, ipc::relat Rc>
auto & chunk_shm = chunk_storage(chunk_size); bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,
std::atomic<ipc::circ::cc_t> &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept {
auto info = chunk_shm.get_info(chunk_size); auto last_conns = curr_conns & ~conn_id;
if (info == nullptr) return;
auto ptr = info->at(chunk_size, id);
if (ptr == nullptr) return;
auto cc_flag = reinterpret_cast<acc_t*>(ptr + chunk_size);
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cc_curr = cc_flag->load(std::memory_order_acquire); auto chunk_conns = conns.load(std::memory_order_acquire);
if (cc_curr == 0) return; // means this id has been cleared if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) {
if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { return (chunk_conns & last_conns) == 0;
break;
} }
ipc::yield(k); ipc::yield(k);
} }
}
template <typename Flag>
void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
if (id < 0) {
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
return;
}
std::size_t chunk_size = calc_chunk_size(size);
auto info = chunk_storage_info(chunk_size);
if (info == nullptr) return;
auto chunk = info->at(chunk_size, id);
if (chunk == nullptr) return;
if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) {
return;
}
info->lock_.lock(); info->lock_.lock();
info->pool_.release(id); info->pool_.release(id);
info->lock_.unlock(); info->lock_.unlock();
} }
template <typename MsgT>
bool clear_message(void* p) {
auto msg = static_cast<MsgT*>(p);
if (msg->storage_) {
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
if (r_size <= 0) {
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
return true;
}
release_storage(
*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
static_cast<std::size_t>(r_size));
}
return true;
}
struct conn_info_head { struct conn_info_head {
ipc::string name_; ipc::string name_;
@ -253,19 +274,6 @@ struct conn_info_head {
ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_;
ipc::shm::handle acc_h_; ipc::shm::handle acc_h_;
/*
* <Remarks> thread_local may have some bugs.
*
* <Reference>
* - https://sourceforge.net/p/mingw-w64/bugs/727/
* - https://sourceforge.net/p/mingw-w64/bugs/527/
* - https://github.com/Alexpux/MINGW-packages/issues/2519
* - https://github.com/ChaiScript/ChaiScript/issues/402
* - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
* - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
*/
ipc::tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_;
conn_info_head(char const * name) conn_info_head(char const * name)
: name_ {name} : name_ {name}
, cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)} , cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)}
@ -286,7 +294,8 @@ struct conn_info_head {
} }
auto& recv_cache() { auto& recv_cache() {
return *recv_cache_.create_once(); thread_local ipc::unordered_map<msg_id_t, cache_t> tls;
return tls;
} }
}; };
@ -337,8 +346,10 @@ struct queue_generator {
template <typename Policy> template <typename Policy>
struct detail_impl { struct detail_impl {
using queue_t = typename queue_generator<Policy>::queue_t; using policy_t = Policy;
using conn_info_t = typename queue_generator<Policy>::conn_info_t; using flag_t = typename policy_t::flag_t;
using queue_t = typename queue_generator<policy_t>::queue_t;
using conn_info_t = typename queue_generator<policy_t>::conn_info_t;
constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept {
return static_cast<conn_info_t*>(h); return static_cast<conn_info_t*>(h);
@ -432,7 +443,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
ipc::error("fail: send, que->ready_sending() == false\n"); ipc::error("fail: send, que->ready_sending() == false\n");
return false; return false;
} }
if (que->elems()->connections(std::memory_order_relaxed) == 0) { ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
if (conns == 0) {
ipc::error("fail: send, there is no receiver on this connection.\n"); ipc::error("fail: send, there is no receiver on this connection.\n");
return false; return false;
} }
@ -445,7 +457,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id); auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
if (size > ipc::large_msg_limit) { if (size > ipc::large_msg_limit) {
auto dat = apply_storage(que->conn_count(), size); auto dat = acquire_storage(size, conns);
void * buf = dat.second; void * buf = dat.second;
if (buf != nullptr) { if (buf != nullptr) {
std::memcpy(buf, data, size); std::memcpy(buf, data, size);
@ -453,11 +465,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0); static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
} }
// try using message fragment // try using message fragment
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
} }
// push message fragment // push message fragment
std::int32_t offset = 0; std::int32_t offset = 0;
for (int i = 0; i < static_cast<int>(size / ipc::data_length); ++i, offset += ipc::data_length) { for (std::int32_t i = 0; i < static_cast<std::int32_t>(size / ipc::data_length); ++i, offset += ipc::data_length) {
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length), if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length),
static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) { static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) {
return false; return false;
@ -479,18 +491,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint
return send([tm](auto info, auto que, auto msg_id) { return send([tm](auto info, auto que, auto msg_id) {
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] { if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size); return !que->push(
[](void*) { return true; },
info->cc_id_, msg_id, remain, data, size);
}, tm)) { }, tm)) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
if (!que->force_push([](void* p) { if (!que->force_push(
auto tmp_msg = static_cast<typename queue_t::value_t*>(p); clear_message<typename queue_t::value_t>,
if (tmp_msg->storage_) { info->cc_id_, msg_id, remain, data, size)) {
clear_storage(
*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
static_cast<std::int32_t>(ipc::data_length) + tmp_msg->remain_);
}
return true;
}, info->cc_id_, msg_id, remain, data, size)) {
return false; return false;
} }
} }
@ -504,7 +512,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
return send([tm](auto info, auto que, auto msg_id) { return send([tm](auto info, auto que, auto msg_id) {
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] { if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size); return !que->push(
[](void*) { return true; },
info->cc_id_, msg_id, remain, data, size);
}, tm)) { }, tm)) {
return false; return false;
} }
@ -525,34 +535,60 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
return {}; return {};
} }
auto& rc = info_of(h)->recv_cache(); auto& rc = info_of(h)->recv_cache();
while (1) { for (;;) {
// pop a new message // pop a new message
typename queue_t::value_t msg; typename queue_t::value_t msg;
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] {
return !que->pop(msg);
}, tm)) {
// pop failed, just return. // pop failed, just return.
return {}; return {};
} }
info_of(h)->wt_waiter_.broadcast(); info_of(h)->wt_waiter_.broadcast();
if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) {
continue; // ignore message to self continue; // ignore message to self
} }
// msg.remain_ may minus & abs(msg.remain_) < data_length // msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::int32_t>(ipc::data_length) + msg.remain_; std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
if (r_size <= 0) {
ipc::error("fail: recv, r_size = %d\n", (int)r_size);
return {};
}
std::size_t msg_size = static_cast<std::size_t>(r_size);
// large message
if (msg.storage_) {
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
void* buf = find_storage(buf_id, msg_size);
if (buf != nullptr) {
struct recycle_t {
ipc::storage_id_t storage_id;
ipc::circ::cc_t curr_conns;
ipc::circ::cc_t conn_id;
} *r_info = ipc::mem::alloc<recycle_t>(recycle_t{
buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id()
});
if (r_info == nullptr) {
ipc::log("fail: ipc::mem::alloc<recycle_t>.\n");
return ipc::buff_t{buf, msg_size}; // no recycle
} else {
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
auto r_info = static_cast<recycle_t *>(p_info);
IPC_UNUSED_ auto finally = ipc::guard([r_info] {
ipc::mem::free(r_info);
});
recycle_storage<flag_t>(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id);
}, r_info};
}
} else {
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
continue;
}
}
// find cache with msg.id_ // find cache with msg.id_
auto cac_it = rc.find(msg.id_); auto cac_it = rc.find(msg.id_);
if (cac_it == rc.end()) { if (cac_it == rc.end()) {
if (remain <= ipc::data_length) { if (msg_size <= ipc::data_length) {
return make_cache(msg.data_, remain); return make_cache(msg.data_, msg_size);
}
if (msg.storage_) {
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
void * buf = find_storage(buf_id, remain);
if (buf != nullptr) {
return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) {
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
}, reinterpret_cast<void*>(buf_id + 1) };
}
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
} }
// gc // gc
if (rc.size() > 1024) { if (rc.size() > 1024) {
@ -566,14 +602,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
for (auto id : need_del) rc.erase(id); for (auto id : need_del) rc.erase(id);
} }
// cache the first message fragment // cache the first message fragment
rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) });
} }
// has cached before this message // has cached before this message
else { else {
auto& cac = cac_it->second; auto& cac = cac_it->second;
// this is the last message fragment // this is the last message fragment
if (msg.remain_ <= 0) { if (msg.remain_ <= 0) {
cac.append(&(msg.data_), remain); cac.append(&(msg.data_), msg_size);
// finish this message, erase it from cache // finish this message, erase it from cache
auto buff = std::move(cac.buff_); auto buff = std::move(cac.buff_);
rc.erase(cac_it); rc.erase(cac_it);

View File

@ -130,10 +130,10 @@ public:
return head_.force_push(que, std::forward<F>(f), block_); return head_.force_push(que, std::forward<F>(f), block_);
} }
template <typename Q, typename F> template <typename Q, typename F, typename R>
bool pop(Q* que, cursor_t* cur, F&& f) { bool pop(Q* que, cursor_t* cur, F&& f, R&& out) {
if (cur == nullptr) return false; if (cur == nullptr) return false;
return head_.pop(que, *cur, std::forward<F>(f), block_); return head_.pop(que, *cur, std::forward<F>(f), std::forward<R>(out), block_);
} }
}; };

View File

@ -16,7 +16,7 @@ namespace circ {
using u1_t = ipc::uint_t<8>; using u1_t = ipc::uint_t<8>;
using u2_t = ipc::uint_t<32>; using u2_t = ipc::uint_t<32>;
/** only supports max 32 connections */ /** only supports max 32 connections in broadcast mode */
using cc_t = u2_t; using cc_t = u2_t;
constexpr u1_t index_of(u2_t c) noexcept { constexpr u1_t index_of(u2_t c) noexcept {

View File

@ -5,6 +5,7 @@
#include <utility> #include <utility>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
#include <map>
#include <string> #include <string>
#include <cstdio> #include <cstdio>
@ -49,6 +50,11 @@ using unordered_map = std::unordered_map<
Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>> Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>>
>; >;
template <typename Key, typename T>
using map = std::map<
Key, T, std::less<Key>, ipc::mem::allocator<std::pair<const Key, T>>
>;
template <typename Char> template <typename Char>
using basic_string = std::basic_string< using basic_string = std::basic_string<
Char, std::char_traits<Char>, ipc::mem::allocator<Char> Char, std::char_traits<Char>, ipc::mem::allocator<Char>

View File

@ -11,7 +11,6 @@
#include "libipc/def.h" #include "libipc/def.h"
#include "libipc/rw_lock.h" #include "libipc/rw_lock.h"
#include "libipc/tls_pointer.h"
#include "libipc/pool_alloc.h" #include "libipc/pool_alloc.h"
#include "libipc/utility/concept.h" #include "libipc/utility/concept.h"
@ -155,18 +154,16 @@ private:
}; };
friend class alloc_proxy; friend class alloc_proxy;
using ref_t = alloc_proxy&; using ref_t = alloc_proxy&;
using tls_t = tls::pointer<alloc_proxy>;
tls_t tls_;
std::function<ref_t()> get_alloc_; std::function<ref_t()> get_alloc_;
public: public:
template <typename ... P> template <typename ... P>
async_wrapper(P ... pars) { async_wrapper(P ... pars) {
get_alloc_ = [this, pars ...]()->ref_t { get_alloc_ = [this, pars ...]()->ref_t {
return *tls_.create_once(this, pars ...); thread_local alloc_proxy tls(pars ...);
return tls;
}; };
} }

View File

@ -123,5 +123,17 @@ constexpr const T& (min)(const T& a, const T& b) {
#endif/*__cplusplus < 201703L*/ #endif/*__cplusplus < 201703L*/
template <typename T, typename U>
auto horrible_cast(U rhs) noexcept
-> typename std::enable_if<std::is_trivially_copyable<T>::value
&& std::is_trivially_copyable<U>::value, T>::type {
union {
T t;
U u;
} r = {};
r.u = rhs;
return r.t;
}
} // namespace detail } // namespace detail
} // namespace ipc } // namespace ipc

View File

@ -1,41 +0,0 @@
#pragma once
#include <unordered_map> // std::unordered_map
#include <cassert> // assert
#include "libipc/tls_pointer.h"
#include "libipc/utility/utility.h"
namespace ipc {
namespace tls {
inline void tls_destruct(key_info const * pkey, void * p) {
assert(pkey != nullptr);
auto destructor = horrible_cast<destructor_t>(pkey->key_);
if (destructor != nullptr) destructor(p);
}
struct tls_recs : public std::unordered_map<key_info const *, void *> {
~tls_recs() {
for (auto & pair : *this) {
tls_destruct(pair.first, pair.second);
}
}
};
inline tls_recs * tls_get_recs() {
thread_local tls_recs * recs_ptr = nullptr;
if (recs_ptr == nullptr) {
recs_ptr = new tls_recs;
}
assert(recs_ptr != nullptr);
return recs_ptr;
}
inline void at_thread_exit() {
delete tls_get_recs();
}
} // namespace tls
} // namespace ipc

View File

@ -1,83 +0,0 @@
#include <pthread.h> // pthread_...
#include <atomic> // std::atomic_thread_fence
#include <cassert> // assert
#include "libipc/tls_pointer.h"
#include "libipc/utility/log.h"
#include "libipc/utility/utility.h"
namespace ipc {
namespace tls {
namespace {
namespace native {
using key_t = pthread_key_t;
bool create(key_t * pkey, void (*destructor)(void*)) {
assert(pkey != nullptr);
int err = ::pthread_key_create(pkey, destructor);
if (err != 0) {
ipc::error("[native::create] pthread_key_create failed [%d].\n", err);
return false;
}
return true;
}
bool release(key_t key) {
int err = ::pthread_key_delete(key);
if (err != 0) {
ipc::error("[native::release] pthread_key_delete failed [%d].\n", err);
return false;
}
return true;
}
bool set(key_t key, void * ptr) {
int err = ::pthread_setspecific(key, ptr);
if (err != 0) {
ipc::error("[native::set] pthread_setspecific failed [%d].\n", err);
return false;
}
return true;
}
void * get(key_t key) {
return ::pthread_getspecific(key);
}
} // namespace native
} // internal-linkage
bool create(key_info * pkey, destructor_t destructor) {
assert(pkey != nullptr);
native::key_t k;
if (!native::create(&k, destructor)) {
return false;
}
pkey->key_ = horrible_cast<key_t>(k);
std::atomic_thread_fence(std::memory_order_seq_cst);
return true;
}
void release(key_info const * pkey) {
assert(pkey != nullptr);
static_cast<void>(
native::release(horrible_cast<native::key_t>(pkey->key_)));
}
bool set(key_info const * pkey, void* ptr) {
assert(pkey != nullptr);
return native::set(horrible_cast<native::key_t>(pkey->key_), ptr);
}
void * get(key_info const * pkey) {
assert(pkey != nullptr);
return native::get(horrible_cast<native::key_t>(pkey->key_));
}
} // namespace tls
} // namespace ipc

View File

@ -1,100 +0,0 @@
#include <Windows.h>
#include "libipc/platform/tls_detail_win.h"
/**
* @remarks
* Windows doesn't support a per-thread destructor with its TLS primitives.
* So, here will build it manually by inserting a function to be called on each thread's exit.
*
* @see
* - https://www.codeproject.com/Articles/8113/Thread-Local-Storage-The-C-Way
* - https://src.chromium.org/viewvc/chrome/trunk/src/base/threading/thread_local_storage_win.cc
* - https://github.com/mirror/mingw-org-wsl/blob/master/src/libcrt/crt/tlssup.c
* - https://github.com/Alexpux/mingw-w64/blob/master/mingw-w64-crt/crt/tlssup.c
* - http://svn.boost.org/svn/boost/trunk/libs/thread/src/win32/tss_pe.cpp
*/
namespace ipc {
namespace tls {
namespace {
void NTAPI OnTlsCallback(PVOID, DWORD dwReason, PVOID) {
if (dwReason == DLL_THREAD_DETACH) {
ipc::tls::at_thread_exit();
}
}
} // internal-linkage
////////////////////////////////////////////////////////////////
/// Call destructors on thread exit
////////////////////////////////////////////////////////////////
#if defined(_MSC_VER)
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__)
#pragma comment(linker, "/INCLUDE:_tls_used")
#pragma comment(linker, "/INCLUDE:_tls_xl_b__")
extern "C" {
# pragma const_seg(".CRT$XLB")
extern const PIMAGE_TLS_CALLBACK _tls_xl_b__;
const PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback;
# pragma const_seg()
}
#else /*!WIN64*/
#pragma comment(linker, "/INCLUDE:__tls_used")
#pragma comment(linker, "/INCLUDE:__tls_xl_b__")
extern "C" {
# pragma data_seg(".CRT$XLB")
PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback;
# pragma data_seg()
}
#endif/*!WIN64*/
#elif defined(__GNUC__)
#define IPC_CRTALLOC__(x) __attribute__ ((section (x) ))
#if defined(__MINGW64__) || (__MINGW64_VERSION_MAJOR) || \
(__MINGW32_MAJOR_VERSION > 3) || ((__MINGW32_MAJOR_VERSION == 3) && (__MINGW32_MINOR_VERSION >= 18))
extern "C" {
IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback;
}
#else /*!MINGW*/
extern "C" {
ULONG _tls_index__ = 0;
IPC_CRTALLOC__(".tls$AAA") char _tls_start__ = 0;
IPC_CRTALLOC__(".tls$ZZZ") char _tls_end__ = 0;
IPC_CRTALLOC__(".CRT$XLA") PIMAGE_TLS_CALLBACK _tls_xl_a__ = 0;
IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback;
IPC_CRTALLOC__(".CRT$XLZ") PIMAGE_TLS_CALLBACK _tls_xl_z__ = 0;
}
extern "C" NX_CRTALLOC_(".tls") const IMAGE_TLS_DIRECTORY _tls_used = {
(ULONG_PTR)(&_tls_start__ + 1),
(ULONG_PTR) &_tls_end__,
(ULONG_PTR) &_tls_index__,
(ULONG_PTR) &_tls_xl_b__,
(ULONG)0, (ULONG)0
}
#endif/*!MINGW*/
#endif/*_MSC_VER, __GNUC__*/
} // namespace tls
} // namespace ipc

View File

@ -1,46 +0,0 @@
#pragma once
#include <atomic> // std::atomic_thread_fence
#include <cassert> // assert
#include "libipc/tls_pointer.h"
#include "libipc/platform/tls_detail_win.h"
#include "libipc/utility/utility.h"
namespace ipc {
namespace tls {
bool create(key_info * pkey, destructor_t destructor) {
assert(pkey != nullptr);
pkey->key_ = horrible_cast<key_t>(destructor);
std::atomic_thread_fence(std::memory_order_seq_cst);
return true;
}
void release(key_info const * pkey) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
tls_get_recs()->erase(pkey);
}
bool set(key_info const * pkey, void * ptr) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
(*tls_get_recs())[pkey] = ptr;
return true;
}
void * get(key_info const * pkey) {
assert(pkey != nullptr);
assert(tls_get_recs() != nullptr);
auto const recs = tls_get_recs();
auto it = recs->find(pkey);
if (it == recs->end()) {
return nullptr;
}
return it->second;
}
} // namespace tls
} // namespace ipc

View File

@ -1,12 +1,14 @@
#pragma once #pragma once
#include <tchar.h> #include <Windows.h>
#include <type_traits> #include <type_traits>
#include <string> #include <string>
#include <locale> #include <locale>
#include <codecvt> #include <codecvt>
#include <cstring> #include <cstring>
#include <stdexcept>
#include <cstddef>
#include "libipc/utility/concept.h" #include "libipc/utility/concept.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
@ -34,34 +36,38 @@ using IsSameChar = ipc::require<is_same_char<T, S>::value, R>;
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
template <typename T = TCHAR> template <typename T = TCHAR>
constexpr auto to_tchar(ipc::string && str) -> IsSameChar<T, ipc::string, ipc::string &&> { constexpr auto to_tchar(ipc::string &&str) -> IsSameChar<T, ipc::string, ipc::string &&> {
return std::move(str); return std::move(str); // noconv
} }
/**
* codecvt_utf8_utf16/std::wstring_convert is deprecated
* @see https://codingtidbit.com/2020/02/09/c17-codecvt_utf8-is-deprecated/
* https://stackoverflow.com/questions/42946335/deprecated-header-codecvt-replacement
* https://en.cppreference.com/w/cpp/locale/codecvt/in
* https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar
*/
template <typename T = TCHAR> template <typename T = TCHAR>
constexpr auto to_tchar(ipc::string && str) -> IsSameChar<T, ipc::wstring> { auto to_tchar(ipc::string &&external) -> IsSameChar<T, ipc::wstring> {
return std::wstring_convert< if (external.empty()) {
std::codecvt_utf8_utf16<wchar_t>, return {}; // noconv
wchar_t, }
ipc::mem::allocator<wchar_t>, /**
ipc::mem::allocator<char> * CP_ACP : The system default Windows ANSI code page.
>{}.from_bytes(std::move(str)); * CP_MACCP : The current system Macintosh code page.
} * CP_OEMCP : The current system OEM code page.
* CP_SYMBOL : Symbol code page (42).
template <typename T> * CP_THREAD_ACP: The Windows ANSI code page for the current thread.
inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar<T, char, void> { * CP_UTF7 : UTF-7. Use this value only when forced by a 7-bit transport mechanism. Use of UTF-8 is preferred.
std::memcpy(dst, src, size); * CP_UTF8 : UTF-8.
} */
int size_needed = ::MultiByteToWideChar(CP_UTF8, 0, &external[0], (int)external.size(), NULL, 0);
template <typename T> if (size_needed <= 0) {
inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar<T, wchar_t, void> { return {};
auto wstr = std::wstring_convert< }
std::codecvt_utf8_utf16<wchar_t>, ipc::wstring internal(size_needed, L'\0');
wchar_t, ::MultiByteToWideChar(CP_UTF8, 0, &external[0], (int)external.size(), &internal[0], size_needed);
ipc::mem::allocator<wchar_t>, return internal;
ipc::mem::allocator<char>
>{}.from_bytes(src, src + size);
std::memcpy(dst, wstr.data(), (ipc::detail::min)(wstr.size(), size));
} }
} // namespace detail } // namespace detail

View File

@ -15,8 +15,10 @@ struct choose;
template <typename Flag> template <typename Flag>
struct choose<circ::elem_array, Flag> { struct choose<circ::elem_array, Flag> {
using flag_t = Flag;
template <std::size_t DataSize, std::size_t AlignSize> template <std::size_t DataSize, std::size_t AlignSize>
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize, AlignSize>; using elems_t = circ::elem_array<ipc::prod_cons_impl<flag_t>, DataSize, AlignSize>;
}; };
} // namespace policy } // namespace policy

View File

@ -58,13 +58,14 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
return false; return false;
} }
template <typename W, typename F, typename E> template <typename W, typename F, typename R, typename E>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) {
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
return false; // empty return false; // empty
} }
std::forward<F>(f)(&(elems[cur_rd].data_)); std::forward<F>(f)(&(elems[cur_rd].data_));
std::forward<R>(out)(true);
rd_.fetch_add(1, std::memory_order_release); rd_.fetch_add(1, std::memory_order_release);
return true; return true;
} }
@ -80,8 +81,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
return false; return false;
} }
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS> template <typename W, typename F, typename R,
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) { template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
byte_t buff[DS]; byte_t buff[DS];
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_relaxed); auto cur_rd = rd_.load(std::memory_order_relaxed);
@ -92,6 +94,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
std::forward<F>(f)(buff); std::forward<F>(f)(buff);
std::forward<R>(out)(true);
return true; return true;
} }
ipc::yield(k); ipc::yield(k);
@ -156,8 +159,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
return false; return false;
} }
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS> template <typename W, typename F, typename R,
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) { template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
byte_t buff[DS]; byte_t buff[DS];
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_relaxed); auto cur_rd = rd_.load(std::memory_order_relaxed);
@ -179,6 +183,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
std::forward<F>(f)(buff); std::forward<F>(f)(buff);
std::forward<R>(out)(true);
return true; return true;
} }
ipc::yield(k); ipc::yield(k);
@ -263,20 +268,20 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
return true; return true;
} }
template <typename W, typename F, typename E> template <typename W, typename F, typename R, typename E>
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
if (cur == cursor()) return false; // acquire if (cur == cursor()) return false; // acquire
auto* el = elems + circ::index_of(cur++); auto* el = elems + circ::index_of(cur++);
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
circ::cc_t rem_cc = cur_rc & ep_mask; if ((cur_rc & ep_mask) == 0) {
if (rem_cc == 0) { std::forward<R>(out)(true);
return true; return true;
} }
if (el->rc_.compare_exchange_weak(cur_rc, auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
cur_rc & ~static_cast<rc_t>(wrapper->connected_id()), if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
std::memory_order_release)) { std::forward<R>(out)((nxt_rc & ep_mask) == 0);
return true; return true;
} }
ipc::yield(k); ipc::yield(k);
@ -395,8 +400,8 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
return true; return true;
} }
template <typename W, typename F, typename E, std::size_t N> template <typename W, typename F, typename R, typename E, std::size_t N>
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
auto* el = elems + circ::index_of(cur); auto* el = elems + circ::index_of(cur);
auto cur_fl = el->f_ct_.load(std::memory_order_acquire); auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if (cur_fl != ~static_cast<flag_t>(cur)) { if (cur_fl != ~static_cast<flag_t>(cur)) {
@ -406,17 +411,18 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
circ::cc_t rem_cc = cur_rc & rc_mask; if ((cur_rc & rc_mask) == 0) {
if (rem_cc == 0) { std::forward<R>(out)(true);
el->f_ct_.store(cur + N - 1, std::memory_order_release); el->f_ct_.store(cur + N - 1, std::memory_order_release);
return true; return true;
} }
if ((rem_cc & ~wrapper->connected_id()) == 0) { auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
bool last_one = false;
if ((last_one = (nxt_rc & rc_mask) == 0)) {
el->f_ct_.store(cur + N - 1, std::memory_order_release); el->f_ct_.store(cur + N - 1, std::memory_order_release);
} }
if (el->rc_.compare_exchange_weak(cur_rc, if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()), std::forward<R>(out)(last_one);
std::memory_order_release)) {
return true; return true;
} }
ipc::yield(k); ipc::yield(k);

View File

@ -155,11 +155,11 @@ public:
return !valid() || (cursor_ == elems_->cursor()); return !valid() || (cursor_ == elems_->cursor());
} }
template <typename T, typename... P> template <typename T, typename F, typename... P>
bool push(P&&... params) { bool push(F&& prep, P&&... params) {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
return elems_->push(this, [&](void* p) { return elems_->push(this, [&](void* p) {
::new (p) T(std::forward<P>(params)...); if (prep(p)) ::new (p) T(std::forward<P>(params)...);
}); });
} }
@ -171,14 +171,14 @@ public:
}); });
} }
template <typename T> template <typename T, typename F>
bool pop(T& item) { bool pop(T& item, F&& out) {
if (elems_ == nullptr) { if (elems_ == nullptr) {
return false; return false;
} }
return elems_->pop(this, &(this->cursor_), [&item](void* p) { return elems_->pop(this, &(this->cursor_), [&item](void* p) {
::new (&item) T(std::move(*static_cast<T*>(p))); ::new (&item) T(std::move(*static_cast<T*>(p)));
}); }, std::forward<F>(out));
} }
}; };
@ -204,7 +204,12 @@ public:
} }
bool pop(T& item) { bool pop(T& item) {
return base_t::pop(item); return base_t::pop(item, [](bool) {});
}
template <typename F>
bool pop(T& item, F&& out) {
return base_t::pop(item, std::forward<F>(out));
} }
}; };

View File

@ -2,13 +2,15 @@
#include <type_traits> // std::aligned_storage_t #include <type_traits> // std::aligned_storage_t
#include <cstring> // std::memcmp #include <cstring> // std::memcmp
#include <cstdint>
#include "libipc/def.h" #include "libipc/def.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
namespace ipc { namespace ipc {
using storage_id_t = std::int32_t;
template <std::size_t DataSize, std::size_t AlignSize> template <std::size_t DataSize, std::size_t AlignSize>
struct id_type; struct id_type;
@ -16,7 +18,7 @@ template <std::size_t AlignSize>
struct id_type<0, AlignSize> { struct id_type<0, AlignSize> {
uint_t<8> id_; uint_t<8> id_;
id_type& operator=(std::size_t val) { id_type& operator=(storage_id_t val) {
id_ = static_cast<uint_t<8>>(val); id_ = static_cast<uint_t<8>>(val);
return (*this); return (*this);
} }
@ -57,7 +59,7 @@ public:
} }
void init() { void init() {
for (std::size_t i = 0; i < max_count;) { for (storage_id_t i = 0; i < max_count;) {
i = next_[i] = (i + 1); i = next_[i] = (i + 1);
} }
} }
@ -71,22 +73,22 @@ public:
return cursor_ == max_count; return cursor_ == max_count;
} }
std::size_t acquire() { storage_id_t acquire() {
if (empty()) return invalid_value; if (empty()) return -1;
std::size_t id = cursor_; storage_id_t id = cursor_;
cursor_ = next_[id]; // point to next cursor_ = next_[id]; // point to next
return id; return id;
} }
bool release(std::size_t id) { bool release(storage_id_t id) {
if (id == invalid_value) return false; if (id < 0) return false;
next_[id] = cursor_; next_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back cursor_ = static_cast<uint_t<8>>(id); // put it back
return true; return true;
} }
void * at(std::size_t id) { return &(next_[id].data_); } void * at(storage_id_t id) { return &(next_[id].data_); }
void const * at(std::size_t id) const { return &(next_[id].data_); } void const * at(storage_id_t id) const { return &(next_[id].data_); }
}; };
template <typename T> template <typename T>
@ -94,8 +96,8 @@ class obj_pool : public id_pool<sizeof(T), alignof(T)> {
using base_t = id_pool<sizeof(T), alignof(T)>; using base_t = id_pool<sizeof(T), alignof(T)>;
public: public:
T * at(std::size_t id) { return reinterpret_cast<T *>(base_t::at(id)); } T * at(storage_id_t id) { return reinterpret_cast<T *>(base_t::at(id)); }
T const * at(std::size_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); } T const * at(storage_id_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); }
}; };
} // namespace ipc } // namespace ipc

View File

@ -1,8 +0,0 @@
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
#include "libipc/platform/tls_pointer_win.h"
#else /*!WIN*/
#include "libipc/platform/tls_pointer_linux.h"
#endif/*!WIN*/

View File

@ -1,6 +1,8 @@
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include <mutex>
#include <atomic>
#include <cstring> #include <cstring>
#include "libipc/ipc.h" #include "libipc/ipc.h"
@ -16,15 +18,28 @@ using namespace ipc;
namespace { namespace {
constexpr int LoopCount = 10000;
constexpr int MultiMax = 8;
struct msg_head {
int id_;
};
class rand_buf : public buffer { class rand_buf : public buffer {
public: public:
rand_buf() { rand_buf() {
int size = capo::random<>{1, 65536}(); int size = capo::random<>{sizeof(msg_head), 65536}();
*this = buffer(new char[size], size, [](void * p, std::size_t) { *this = buffer(new char[size], size, [](void * p, std::size_t) {
delete [] static_cast<char *>(p); delete [] static_cast<char *>(p);
}); });
} }
rand_buf(msg_head const &msg) {
*this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) {
delete static_cast<msg_head *>(p);
});
}
rand_buf(rand_buf &&) = default; rand_buf(rand_buf &&) = default;
rand_buf(rand_buf const & rhs) { rand_buf(rand_buf const & rhs) {
if (rhs.empty()) return; if (rhs.empty()) return;
@ -40,11 +55,11 @@ public:
} }
void set_id(int k) noexcept { void set_id(int k) noexcept {
*get<char *>() = static_cast<char>(k); get<msg_head *>()->id_ = k;
} }
int get_id() const noexcept { int get_id() const noexcept {
return static_cast<int>(*get<char *>()); return get<msg_head *>()->id_;
} }
using buffer::operator=; using buffer::operator=;
@ -60,64 +75,79 @@ void test_basic(char const * name) {
EXPECT_FALSE(que1.try_send(test2)); EXPECT_FALSE(que1.try_send(test2));
que_t que2 { que1.name(), ipc::receiver }; que_t que2 { que1.name(), ipc::receiver };
EXPECT_TRUE(que1.send(test1)); ASSERT_TRUE(que1.send(test1));
EXPECT_TRUE(que1.try_send(test2)); ASSERT_TRUE(que1.try_send(test2));
EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test1);
EXPECT_EQ(que2.recv(), test2); EXPECT_EQ(que2.recv(), test2);
} }
template <relat Rp, relat Rc, trans Ts> class data_set {
void test_sr(char const * name, int size, int s_cnt, int r_cnt) { std::vector<rand_buf> datas_;
using que_t = chan<Rp, Rc, Ts>;
public:
data_set() {
datas_.resize(LoopCount);
for (int i = 0; i < LoopCount; ++i) {
datas_[i].set_id(i);
}
}
std::vector<rand_buf> const &get() const noexcept {
return datas_;
}
} const data_set__;
template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>>
void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt)); ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt)); ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
std::atomic_thread_fence(std::memory_order_seq_cst);
ipc_ut::test_stopwatch sw; ipc_ut::test_stopwatch sw;
std::vector<rand_buf> tests(size);
for (int k = 0; k < s_cnt; ++k) { for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { ipc_ut::sender() << [name, &sw, r_cnt, k] {
que_t que1 { name }; Que que { name, ipc::sender };
EXPECT_TRUE(que1.wait_for_recv(r_cnt)); EXPECT_TRUE(que.wait_for_recv(r_cnt));
sw.start(); sw.start();
for (auto & buf : tests) { for (int i = 0; i < (int)data_set__.get().size(); ++i) {
rand_buf data { buf }; EXPECT_TRUE(que.send(data_set__.get()[i]));
data.set_id(k);
EXPECT_TRUE(que1.send(data));
} }
}; };
} }
for (int k = 0; k < r_cnt; ++k) { for (int k = 0; k < r_cnt; ++k) {
ipc_ut::reader() << [name, &tests, s_cnt] { ipc_ut::reader() << [name] {
que_t que2 { name, ipc::receiver }; Que que { name, ipc::receiver };
std::vector<int> cursors(s_cnt);
for (;;) { for (;;) {
rand_buf got { que2.recv() }; rand_buf got { que.recv() };
ASSERT_FALSE(got.empty()); ASSERT_FALSE(got.empty());
int & cur = cursors.at(got.get_id()); int i = got.get_id();
ASSERT_TRUE((cur >= 0) && (cur < static_cast<int>(tests.size()))); if (i == -1) {
rand_buf buf { tests.at(cur++) }; return;
buf.set_id(got.get_id()); }
EXPECT_EQ(got, buf); ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
int n = 0; auto const &data_set = data_set__.get()[i];
for (; n < static_cast<int>(cursors.size()); ++n) { if (data_set != got) {
if (cursors[n] < static_cast<int>(tests.size())) break; printf("data_set__.get()[%d] != got, size = %zd/%zd\n",
i, data_set.size(), got.size());
EXPECT_TRUE(false);
} }
if (n == static_cast<int>(cursors.size())) break;
} }
}; };
} }
ipc_ut::sender().wait_for_done(); ipc_ut::sender().wait_for_done();
Que que { name };
EXPECT_TRUE(que.wait_for_recv(r_cnt));
for (int k = 0; k < r_cnt; ++k) {
que.send(rand_buf{msg_head{-1}});
}
ipc_ut::reader().wait_for_done(); ipc_ut::reader().wait_for_done();
sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, size, name); sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, (int)data_set__.get().size(), name);
} }
constexpr int LoopCount = 10000;
constexpr int MultiMax = 8;
} // internal-linkage } // internal-linkage
TEST(IPC, basic) { TEST(IPC, basic) {
@ -129,22 +159,26 @@ TEST(IPC, basic) {
} }
TEST(IPC, 1v1) { TEST(IPC, 1v1) {
test_sr<relat::single, relat::single, trans::unicast >("ssu", LoopCount, 1, 1); test_sr<relat::single, relat::single, trans::unicast >("ssu", 1, 1);
test_sr<relat::single, relat::multi , trans::unicast >("smu", LoopCount, 1, 1); test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, 1);
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", LoopCount, 1, 1); test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, 1);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, 1); test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, 1); test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, 1);
} }
TEST(IPC, 1vN) { TEST(IPC, 1vN) {
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, MultiMax); //test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, MultiMax); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, MultiMax);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, MultiMax);
} }
TEST(IPC, Nv1) { TEST(IPC, Nv1) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, 1); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, 1);
} }
TEST(IPC, NvN) { TEST(IPC, NvN) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, MultiMax); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, MultiMax);
} }

31
test/test_platform.cpp Normal file
View File

@ -0,0 +1,31 @@
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
#include <locale>
#include <iostream>
//#include <fstream>
#include "test.h"
#include "libipc/platform/to_tchar.h"
TEST(Platform, to_tchar) {
char const *utf8 = "hello world, "
"\xe4\xbd\xa0\xe5\xa5\xbd\xef\xbc"
"\x8c\xe3\x81\x93\xe3\x82\x93\xe3"
"\x81\xab\xe3\x81\xa1\xe3\x81\xaf";
wchar_t const *utf16 = L"hello world, \u4f60\u597d\uff0c\u3053\u3093\u306b\u3061\u306f";
{
ipc::string str = ipc::detail::to_tchar<char>(utf8);
EXPECT_STREQ(str.c_str(), utf8);
}
{
ipc::wstring wtr = ipc::detail::to_tchar<wchar_t>(utf8);
EXPECT_STREQ(wtr.c_str(), utf16);
//std::ofstream out("out.txt", std::ios::binary|std::ios::out);
//out.write((char const *)wtr.c_str(), wtr.size() * sizeof(wchar_t));
}
}
#endif/*WIN*/

View File

@ -44,7 +44,7 @@ constexpr int ThreadMax = 8;
template <typename Que> template <typename Que>
void push(Que & que, int p, int d) { void push(Que & que, int p, int d) {
for (int n = 0; !que.push(p, d); ++n) { for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) {
ASSERT_NE(n, PushRetry); ASSERT_NE(n, PushRetry);
std::this_thread::yield(); std::this_thread::yield();
} }

View File

@ -11,7 +11,6 @@
#include "capo/type_name.hpp" #include "capo/type_name.hpp"
#include "libipc/rw_lock.h" #include "libipc/rw_lock.h"
#include "libipc/tls_pointer.h"
#include "test.h" #include "test.h"
#include "thread_pool.h" #include "thread_pool.h"
@ -89,6 +88,7 @@ void test_lock_performance(int w, int r) {
// for (int i = 2; i <= ThreadMax; ++i) test_lock_performance(i, i); // for (int i = 2; i <= ThreadMax; ++i) test_lock_performance(i, i);
//} //}
#if 0 // disable ipc::tls
TEST(Thread, tls_main_thread) { TEST(Thread, tls_main_thread) {
ipc::tls::pointer<int> p; ipc::tls::pointer<int> p;
EXPECT_FALSE(p); EXPECT_FALSE(p);
@ -200,3 +200,4 @@ TEST(Thread, tls_benchmark) {
benchmark_tls<std_tls, Str>("std_tls: Str"); benchmark_tls<std_tls, Str>("std_tls: Str");
benchmark_tls<ipc_tls, Str>("ipc_tls: Str"); benchmark_tls<ipc_tls, Str>("ipc_tls: Str");
} }
#endif

View File

@ -9,6 +9,8 @@
#include <cstddef> // std::size_t #include <cstddef> // std::size_t
#include <cassert> // assert #include <cassert> // assert
#include "capo/scope_guard.hpp"
namespace ipc_ut { namespace ipc_ut {
class thread_pool final { class thread_pool final {
@ -32,6 +34,9 @@ class thread_pool final {
if (pool->quit_) return; if (pool->quit_) return;
if (pool->jobs_.empty()) { if (pool->jobs_.empty()) {
pool->waiting_cnt_ += 1; pool->waiting_cnt_ += 1;
CAPO_SCOPE_GUARD_ = [pool] {
pool->waiting_cnt_ -= 1;
};
if (pool->waiting_cnt_ == pool->workers_.size()) { if (pool->waiting_cnt_ == pool->workers_.size()) {
pool->cv_empty_.notify_all(); pool->cv_empty_.notify_all();
@ -41,8 +46,6 @@ class thread_pool final {
pool->cv_jobs_.wait(guard); pool->cv_jobs_.wait(guard);
if (pool->quit_) return; if (pool->quit_) return;
} while (pool->jobs_.empty()); } while (pool->jobs_.empty());
pool->waiting_cnt_ -= 1;
} }
assert(!pool->jobs_.empty()); assert(!pool->jobs_.empty());
job = std::move(pool->jobs_.front()); job = std::move(pool->jobs_.front());
@ -71,6 +74,7 @@ public:
} }
void start(std::size_t n) { void start(std::size_t n) {
std::unique_lock<std::mutex> guard { lock_ };
if (n <= workers_.size()) return; if (n <= workers_.size()) return;
for (std::size_t i = workers_.size(); i < n; ++i) { for (std::size_t i = workers_.size(); i < n; ++i) {
workers_.push_back(std::thread { &thread_pool::proc, this }); workers_.push_back(std::thread { &thread_pool::proc, this });