diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 3ca4d7e..c5542c8 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -2,7 +2,7 @@ name: C/C++ CI on: push: - branches: [ master, develop ] + branches: [ master, develop, issue-* ] pull_request: branches: [ master, develop ] diff --git a/.gitignore b/.gitignore index c83c97d..9bca061 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,7 @@ CMakeLists.txt.user* # My output files build + +# vs +.vs .vscode \ No newline at end of file diff --git a/3rdparty/gtest/CMakeLists.txt b/3rdparty/gtest/CMakeLists.txt index 77f0b58..97af605 100644 --- a/3rdparty/gtest/CMakeLists.txt +++ b/3rdparty/gtest/CMakeLists.txt @@ -8,12 +8,16 @@ # ctest. You can select which tests to run using 'ctest -R regex'. # For more options, run 'ctest --help'. +if (POLICY CMP0077) + cmake_policy(SET CMP0077 NEW) +endif (POLICY CMP0077) + # When other libraries are using a shared version of runtime libraries, # Google Test also has to use one. option( gtest_force_shared_crt "Use shared (DLL) run-time lib even when Google Test is built as static lib." - ON) + OFF) option(gtest_build_tests "Build all of gtest's own tests." OFF) diff --git a/CMakeLists.txt b/CMakeLists.txt index 57f62e7..b4845b0 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,25 +1,48 @@ cmake_minimum_required(VERSION 3.10) project(cpp-ipc) -option(LIBIPC_BUILD_TESTS "Build all of libipc's own tests." OFF) -option(LIBIPC_BUILD_DEMOS "Build all of libipc's own demos." OFF) +option(LIBIPC_BUILD_TESTS "Build all of libipc's own tests." OFF) +option(LIBIPC_BUILD_DEMOS "Build all of libipc's own demos." OFF) +option(LIBIPC_BUILD_SHARED_LIBS "Build shared libraries (DLLs)." OFF) +option(LIBIPC_USE_STATIC_CRT "Set to ON to build with static CRT on Windows (/MT)." OFF) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG") if(NOT MSVC) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") endif() -include_directories(${CMAKE_SOURCE_DIR}/include) +if (MSVC AND LIBIPC_USE_STATIC_CRT) + set(CompilerFlags + CMAKE_CXX_FLAGS + CMAKE_CXX_FLAGS_DEBUG + CMAKE_CXX_FLAGS_RELEASE + CMAKE_C_FLAGS + CMAKE_C_FLAGS_DEBUG + CMAKE_C_FLAGS_RELEASE + ) + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() +endif() set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(LIBIPC_PROJECT_DIR ${PROJECT_SOURCE_DIR}) +# Unicode Support +add_definitions(-DUNICODE -D_UNICODE) + add_subdirectory(src) if (LIBIPC_BUILD_TESTS) set(GOOGLETEST_VERSION 1.10.0) + if (LIBIPC_USE_STATIC_CRT) + set(gtest_force_shared_crt OFF) + else() + set(gtest_force_shared_crt ON) + endif() add_subdirectory(3rdparty/gtest) add_subdirectory(test) endif() @@ -30,6 +53,6 @@ if (LIBIPC_BUILD_DEMOS) endif() install( - DIRECTORY "include/" - DESTINATION "include" + DIRECTORY "include/" + DESTINATION "include" ) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..5635101 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; -using msg_que_t = ipc::chan; +using msg_que_t = ipc::chan; msg_que_t que__{ name__ }; ipc::byte_t buff__[max_sz]; diff --git a/include/libipc/def.h b/include/libipc/def.h index d02ff43..8c1a72b 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -33,7 +33,7 @@ enum : std::uint32_t { enum : std::size_t { data_length = 64, large_msg_limit = data_length, - large_msg_align = 512, + large_msg_align = 1024, large_msg_cache = 32, }; diff --git a/include/libipc/export.h b/include/libipc/export.h index c555574..6024f2f 100755 --- a/include/libipc/export.h +++ b/include/libipc/export.h @@ -44,9 +44,11 @@ */ #ifndef IPC_EXPORT -#if defined(__IPC_LIBRARY__) +#if defined(LIBIPC_LIBRARY_SHARED_BUILDING__) # define IPC_EXPORT IPC_DECL_EXPORT -#else +#elif defined(LIBIPC_LIBRARY_SHARED_USING__) # define IPC_EXPORT IPC_DECL_IMPORT +#else +# define IPC_EXPORT #endif #endif /*IPC_EXPORT*/ diff --git a/include/libipc/pool_alloc.h b/include/libipc/pool_alloc.h index c6042c4..a889be7 100755 --- a/include/libipc/pool_alloc.h +++ b/include/libipc/pool_alloc.h @@ -94,6 +94,7 @@ inline void free(void* p, std::size_t size) { template void free(T* p) { + if (p == nullptr) return; destruct(p); pool_alloc::free(p, sizeof(T)); } diff --git a/include/libipc/tls_pointer.h b/include/libipc/tls_pointer.h deleted file mode 100755 index b6fd4e1..0000000 --- a/include/libipc/tls_pointer.h +++ /dev/null @@ -1,112 +0,0 @@ -#pragma once - -#include // std::unique_ptr -#include // std::forward -#include // std::size_t - -#include "libipc/export.h" - -namespace ipc { -namespace tls { - -using key_t = std::size_t; -using destructor_t = void (*)(void *); - -struct key_info { - key_t key_; -}; - -IPC_EXPORT bool create (key_info * pkey, destructor_t destructor = nullptr); -IPC_EXPORT void release(key_info const * pkey); - -IPC_EXPORT bool set(key_info const * pkey, void * ptr); -IPC_EXPORT void * get(key_info const * pkey); - -//////////////////////////////////////////////////////////////// -/// Thread-local pointer -//////////////////////////////////////////////////////////////// - -/** - * @remarks - * You need to set the ipc::tls::pointer's storage manually: - * ``` - * tls::pointer p; - * if (!p) p = new int(123); - * ``` - * It would be like an ordinary pointer. - * Or you could just call create_once to 'new' this pointer automatically. - * ``` - * tls::pointer p; - * p.create_once(123); - * ``` -*/ - -template -class pointer : public key_info { - - pointer(pointer const &) = delete; - pointer & operator=(pointer const &) = delete; - - void destruct() const { - delete static_cast(tls::get(this)); - } - -public: - using value_type = T; - - pointer() { - tls::create(this, [](void* p) { - delete static_cast(p); - }); - } - - ~pointer() { - destruct(); - tls::release(this); - } - - template - T* create(P&&... params) { - destruct(); - std::unique_ptr ptr { new T(std::forward

(params)...) }; - if (!tls::set(this, ptr.get())) { - return nullptr; - } - return ptr.release(); - } - - template - T* create_once(P&&... params) { - auto p = static_cast(tls::get(this)); - if (p == nullptr) { - std::unique_ptr ptr { new T(std::forward

(params)...) }; - if (!tls::set(this, ptr.get())) { - return nullptr; - } - p = ptr.release(); - } - return p; - } - - T* operator=(T* p) { - set(this, p); - return p; - } - - explicit operator T *() const { - return static_cast(tls::get(this)); - } - - explicit operator bool() const { - return tls::get(this) != nullptr; - } - - T & operator* () { return *static_cast(*this); } - const T & operator* () const { return *static_cast(*this); } - - T * operator->() { return static_cast(*this); } - const T * operator->() const { return static_cast(*this); } -}; - -} // namespace tls -} // namespace ipc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08faf05..ad60120 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,15 +1,5 @@ project(ipc) -add_compile_options(-D__IPC_LIBRARY__) - -if(NOT MSVC) - add_compile_options(-fPIC) -endif() - -include_directories( - ${LIBIPC_PROJECT_DIR}/include - ${LIBIPC_PROJECT_DIR}/src) - if(UNIX) file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_linux.cpp) else() @@ -26,14 +16,44 @@ file(GLOB HEAD_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*.h ${LIBIPC_PROJECT_DIR}/src/libipc/utility/*.h) -add_library(${PROJECT_NAME} SHARED ${SRC_FILES} ${HEAD_FILES}) +if (LIBIPC_BUILD_SHARED_LIBS) + add_library(${PROJECT_NAME} SHARED ${SRC_FILES} ${HEAD_FILES}) + target_compile_definitions(${PROJECT_NAME} + INTERFACE + LIBIPC_LIBRARY_SHARED_USING__ + PRIVATE + LIBIPC_LIBRARY_SHARED_BUILDING__) +else() + add_library(${PROJECT_NAME} STATIC ${SRC_FILES} ${HEAD_FILES}) +endif() + +# set output directory +set_target_properties(${PROJECT_NAME} + PROPERTIES + ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" + LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib" + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin" ) + +# set version +set_target_properties(${PROJECT_NAME} + PROPERTIES + VERSION 1.0.0 + SOVERSION 1) + +target_include_directories(${PROJECT_NAME} + PUBLIC ${LIBIPC_PROJECT_DIR}/include + PRIVATE ${LIBIPC_PROJECT_DIR}/src +) + if(NOT MSVC) target_link_libraries(${PROJECT_NAME} PUBLIC - pthread - $<$>:rt>) + pthread + $<$>:rt>) endif() install( TARGETS ${PROJECT_NAME} - DESTINATION "lib" + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib ) diff --git a/src/ipc.cpp b/src/ipc.cpp index bfa1376..418f61d 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -13,7 +13,6 @@ #include "libipc/ipc.h" #include "libipc/def.h" #include "libipc/shm.h" -#include "libipc/tls_pointer.h" #include "libipc/pool_alloc.h" #include "libipc/queue.h" #include "libipc/policy.h" @@ -21,6 +20,7 @@ #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" +#include "libipc/utility/scope_guard.h" #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" @@ -40,7 +40,7 @@ struct msg_t; template struct msg_t<0, AlignSize> { - msg_id_t conn_; + msg_id_t cc_id_; msg_id_t id_; std::int32_t remain_; bool storage_; @@ -51,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) - : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { - if (d != nullptr) { + if (data != nullptr) { // copy storage-id - *reinterpret_cast(&data_) = *static_cast(d); + *reinterpret_cast(&data_) = + *static_cast(data); } } - else std::memcpy(&data_, d, s); + else std::memcpy(&data_, data, size); } }; @@ -91,26 +92,46 @@ auto cc_acc() { return static_cast(acc_h.get()); } +IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { + return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; +} + +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), align_chunk_size( + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); +} + +struct chunk_t { + std::atomic &conns() noexcept { + return *reinterpret_cast *>(this); + } + + void *data() noexcept { + return reinterpret_cast(this) + + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); + } +}; + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; - IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t)); - } - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); + return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { - if (id == ipc::invalid_value) return nullptr; - return reinterpret_cast(this + 1) + (chunks_elem_size(chunk_size) * id); + ipc::byte_t *chunks_mem() noexcept { + return reinterpret_cast(this + 1); + } + + chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + if (id < 0) return nullptr; + return reinterpret_cast(chunks_mem() + (chunk_size * id)); } }; auto& chunk_storages() { - class chunk_t { + class chunk_handle_t { ipc::shm::handle handle_; public: @@ -129,31 +150,29 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; - return chunk_s; + static ipc::map chunk_hs; + return chunk_hs; } -auto& chunk_lock() { - static ipc::spin_lock chunk_l; - return chunk_l; +chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + auto &storages = chunk_storages(); + std::decay_t::iterator it; + { + static ipc::rw_lock lock; + IPC_UNUSED_ std::shared_lock guard {lock}; + if ((it = storages.find(chunk_size)) == storages.end()) { + using chunk_handle_t = std::decay_t::value_type::second_type; + guard.unlock(); + IPC_UNUSED_ std::lock_guard guard {lock}; + it = storages.emplace(chunk_size, chunk_handle_t{}).first; + } + } + return it->second.get_info(chunk_size); } -constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { - return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; -} - -auto& chunk_storage(std::size_t chunk_size) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock()); - return chunk_storages()[chunk_size]; -} - -std::pair apply_storage(std::size_t conn_count, std::size_t size) { - if (conn_count == 0) return {}; - +std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); @@ -162,90 +181,92 @@ std::pair apply_storage(std::size_t conn_count, std::size_t auto id = info->pool_.acquire(); info->lock_.unlock(); - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return {}; - reinterpret_cast(ptr + chunk_size)->store(static_cast(conn_count), std::memory_order_release); - return { id, ptr }; + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return {}; + chunk->conns().store(conns, std::memory_order_relaxed); + return { id, chunk->data() }; } -void *find_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); +void *find_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { + ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return nullptr; - if (reinterpret_cast(ptr + chunk_size)->load(std::memory_order_acquire) == 0) { - ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size); - return nullptr; - } - return ptr; + return info->at(chunk_size, id)->data(); } -void recycle_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); +void release_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { + ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) { - ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size); - return; - } - if (reinterpret_cast(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { - // not the last receiver, just return - return; - } - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } -void clear_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); - return; - } +template +bool sub_rc(ipc::wr, + std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { + return true; +} - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); - if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return; - - auto cc_flag = reinterpret_cast(ptr + chunk_size); +template +bool sub_rc(ipc::wr, + std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { + auto last_conns = curr_conns & ~conn_id; for (unsigned k = 0;;) { - auto cc_curr = cc_flag->load(std::memory_order_acquire); - if (cc_curr == 0) return; // means this id has been cleared - if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { - break; + auto chunk_conns = conns.load(std::memory_order_acquire); + if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { + return (chunk_conns & last_conns) == 0; } ipc::yield(k); } +} +template +void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { + if (id < 0) { + ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + return; + } + std::size_t chunk_size = calc_chunk_size(size); + auto info = chunk_storage_info(chunk_size); + if (info == nullptr) return; + + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return; + + if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { + return; + } info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } +template +bool clear_message(void* p) { + auto msg = static_cast(p); + if (msg->storage_) { + std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; + if (r_size <= 0) { + ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); + return true; + } + release_storage( + *reinterpret_cast(&msg->data_), + static_cast(r_size)); + } + return true; +} + struct conn_info_head { ipc::string name_; @@ -253,19 +274,6 @@ struct conn_info_head { ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::shm::handle acc_h_; - /* - * thread_local may have some bugs. - * - * - * - https://sourceforge.net/p/mingw-w64/bugs/727/ - * - https://sourceforge.net/p/mingw-w64/bugs/527/ - * - https://github.com/Alexpux/MINGW-packages/issues/2519 - * - https://github.com/ChaiScript/ChaiScript/issues/402 - * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html - * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 - */ - ipc::tls::pointer> recv_cache_; - conn_info_head(char const * name) : name_ {name} , cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)} @@ -286,7 +294,8 @@ struct conn_info_head { } auto& recv_cache() { - return *recv_cache_.create_once(); + thread_local ipc::unordered_map tls; + return tls; } }; @@ -337,8 +346,10 @@ struct queue_generator { template struct detail_impl { -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; +using policy_t = Policy; +using flag_t = typename policy_t::flag_t; +using queue_t = typename queue_generator::queue_t; +using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { return static_cast(h); @@ -432,7 +443,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s ipc::error("fail: send, que->ready_sending() == false\n"); return false; } - if (que->elems()->connections(std::memory_order_relaxed) == 0) { + ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); + if (conns == 0) { ipc::error("fail: send, there is no receiver on this connection.\n"); return false; } @@ -445,7 +457,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(que->conn_count(), size); + auto dat = acquire_storage(size, conns); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -453,11 +465,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment - // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; @@ -479,18 +491,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + [](void*) { return true; }, + info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push([](void* p) { - auto tmp_msg = static_cast(p); - if (tmp_msg->storage_) { - clear_storage( - *reinterpret_cast(&tmp_msg->data_), - static_cast(ipc::data_length) + tmp_msg->remain_); - } - return true; - }, info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push( + clear_message, + info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -504,7 +512,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + [](void*) { return true; }, + info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } @@ -525,34 +535,60 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { return {}; } auto& rc = info_of(h)->recv_cache(); - while (1) { + for (;;) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { + return !que->pop(msg); + }, tm)) { // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { + if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(ipc::data_length) + msg.remain_; + std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; + if (r_size <= 0) { + ipc::error("fail: recv, r_size = %d\n", (int)r_size); + return {}; + } + std::size_t msg_size = static_cast(r_size); + // large message + if (msg.storage_) { + ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); + void* buf = find_storage(buf_id, msg_size); + if (buf != nullptr) { + struct recycle_t { + ipc::storage_id_t storage_id; + ipc::circ::cc_t curr_conns; + ipc::circ::cc_t conn_id; + } *r_info = ipc::mem::alloc(recycle_t{ + buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() + }); + if (r_info == nullptr) { + ipc::log("fail: ipc::mem::alloc.\n"); + return ipc::buff_t{buf, msg_size}; // no recycle + } else { + return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { + auto r_info = static_cast(p_info); + IPC_UNUSED_ auto finally = ipc::guard([r_info] { + ipc::mem::free(r_info); + }); + recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); + }, r_info}; + } + } else { + ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; + } + } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= ipc::data_length) { - return make_cache(msg.data_, remain); - } - if (msg.storage_) { - std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, remain); - if (buf != nullptr) { - return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { - recycle_storage(reinterpret_cast(ptr) - 1, size); - }, reinterpret_cast(buf_id + 1) }; - } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); + if (msg_size <= ipc::data_length) { + return make_cache(msg.data_, msg_size); } // gc if (rc.size() > 1024) { @@ -566,14 +602,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cac.append(&(msg.data_), remain); + cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 74f031a..0b21f48 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -130,10 +130,10 @@ public: return head_.force_push(que, std::forward(f), block_); } - template - bool pop(Q* que, cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); } }; diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index f2379e2..4003948 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -16,7 +16,7 @@ namespace circ { using u1_t = ipc::uint_t<8>; using u2_t = ipc::uint_t<32>; -/** only supports max 32 connections */ +/** only supports max 32 connections in broadcast mode */ using cc_t = u2_t; constexpr u1_t index_of(u2_t c) noexcept { diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 1084e64..063e8dc 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,11 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; +template +using map = std::map< + Key, T, std::less, ipc::mem::allocator> +>; + template using basic_string = std::basic_string< Char, std::char_traits, ipc::mem::allocator diff --git a/src/libipc/memory/wrapper.h b/src/libipc/memory/wrapper.h index 5e76f88..b0b7c58 100755 --- a/src/libipc/memory/wrapper.h +++ b/src/libipc/memory/wrapper.h @@ -11,7 +11,6 @@ #include "libipc/def.h" #include "libipc/rw_lock.h" -#include "libipc/tls_pointer.h" #include "libipc/pool_alloc.h" #include "libipc/utility/concept.h" @@ -155,18 +154,16 @@ private: }; friend class alloc_proxy; - using ref_t = alloc_proxy&; - using tls_t = tls::pointer; - tls_t tls_; std::function get_alloc_; public: template async_wrapper(P ... pars) { get_alloc_ = [this, pars ...]()->ref_t { - return *tls_.create_once(this, pars ...); + thread_local alloc_proxy tls(pars ...); + return tls; }; } diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index 97bbd12..8f4c4f5 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -123,5 +123,17 @@ constexpr const T& (min)(const T& a, const T& b) { #endif/*__cplusplus < 201703L*/ +template +auto horrible_cast(U rhs) noexcept + -> typename std::enable_if::value + && std::is_trivially_copyable::value, T>::type { + union { + T t; + U u; + } r = {}; + r.u = rhs; + return r.t; +} + } // namespace detail } // namespace ipc diff --git a/src/libipc/platform/tls_detail_win.h b/src/libipc/platform/tls_detail_win.h deleted file mode 100755 index 27b9f14..0000000 --- a/src/libipc/platform/tls_detail_win.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include // std::unordered_map -#include // assert - -#include "libipc/tls_pointer.h" - -#include "libipc/utility/utility.h" - -namespace ipc { -namespace tls { - -inline void tls_destruct(key_info const * pkey, void * p) { - assert(pkey != nullptr); - auto destructor = horrible_cast(pkey->key_); - if (destructor != nullptr) destructor(p); -} - -struct tls_recs : public std::unordered_map { - ~tls_recs() { - for (auto & pair : *this) { - tls_destruct(pair.first, pair.second); - } - } -}; - -inline tls_recs * tls_get_recs() { - thread_local tls_recs * recs_ptr = nullptr; - if (recs_ptr == nullptr) { - recs_ptr = new tls_recs; - } - assert(recs_ptr != nullptr); - return recs_ptr; -} - -inline void at_thread_exit() { - delete tls_get_recs(); -} - -} // namespace tls -} // namespace ipc diff --git a/src/libipc/platform/tls_pointer_linux.h b/src/libipc/platform/tls_pointer_linux.h deleted file mode 100755 index 21ab44b..0000000 --- a/src/libipc/platform/tls_pointer_linux.h +++ /dev/null @@ -1,83 +0,0 @@ - -#include // pthread_... - -#include // std::atomic_thread_fence -#include // assert - -#include "libipc/tls_pointer.h" - -#include "libipc/utility/log.h" -#include "libipc/utility/utility.h" - -namespace ipc { -namespace tls { - -namespace { -namespace native { - -using key_t = pthread_key_t; - -bool create(key_t * pkey, void (*destructor)(void*)) { - assert(pkey != nullptr); - int err = ::pthread_key_create(pkey, destructor); - if (err != 0) { - ipc::error("[native::create] pthread_key_create failed [%d].\n", err); - return false; - } - return true; -} - -bool release(key_t key) { - int err = ::pthread_key_delete(key); - if (err != 0) { - ipc::error("[native::release] pthread_key_delete failed [%d].\n", err); - return false; - } - return true; -} - -bool set(key_t key, void * ptr) { - int err = ::pthread_setspecific(key, ptr); - if (err != 0) { - ipc::error("[native::set] pthread_setspecific failed [%d].\n", err); - return false; - } - return true; -} - -void * get(key_t key) { - return ::pthread_getspecific(key); -} - -} // namespace native -} // internal-linkage - -bool create(key_info * pkey, destructor_t destructor) { - assert(pkey != nullptr); - native::key_t k; - if (!native::create(&k, destructor)) { - return false; - } - pkey->key_ = horrible_cast(k); - std::atomic_thread_fence(std::memory_order_seq_cst); - return true; -} - -void release(key_info const * pkey) { - assert(pkey != nullptr); - static_cast( - native::release(horrible_cast(pkey->key_))); -} - -bool set(key_info const * pkey, void* ptr) { - assert(pkey != nullptr); - return native::set(horrible_cast(pkey->key_), ptr); -} - -void * get(key_info const * pkey) { - assert(pkey != nullptr); - return native::get(horrible_cast(pkey->key_)); -} - -} // namespace tls -} // namespace ipc diff --git a/src/libipc/platform/tls_pointer_win.cpp b/src/libipc/platform/tls_pointer_win.cpp deleted file mode 100755 index cdbe208..0000000 --- a/src/libipc/platform/tls_pointer_win.cpp +++ /dev/null @@ -1,100 +0,0 @@ - -#include - -#include "libipc/platform/tls_detail_win.h" - -/** - * @remarks - * Windows doesn't support a per-thread destructor with its TLS primitives. - * So, here will build it manually by inserting a function to be called on each thread's exit. - * - * @see - * - https://www.codeproject.com/Articles/8113/Thread-Local-Storage-The-C-Way - * - https://src.chromium.org/viewvc/chrome/trunk/src/base/threading/thread_local_storage_win.cc - * - https://github.com/mirror/mingw-org-wsl/blob/master/src/libcrt/crt/tlssup.c - * - https://github.com/Alexpux/mingw-w64/blob/master/mingw-w64-crt/crt/tlssup.c - * - http://svn.boost.org/svn/boost/trunk/libs/thread/src/win32/tss_pe.cpp -*/ - -namespace ipc { -namespace tls { - -namespace { - -void NTAPI OnTlsCallback(PVOID, DWORD dwReason, PVOID) { - if (dwReason == DLL_THREAD_DETACH) { - ipc::tls::at_thread_exit(); - } -} - -} // internal-linkage - -//////////////////////////////////////////////////////////////// -/// Call destructors on thread exit -//////////////////////////////////////////////////////////////// - -#if defined(_MSC_VER) - -#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) - -#pragma comment(linker, "/INCLUDE:_tls_used") -#pragma comment(linker, "/INCLUDE:_tls_xl_b__") - -extern "C" { -# pragma const_seg(".CRT$XLB") - extern const PIMAGE_TLS_CALLBACK _tls_xl_b__; - const PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; -# pragma const_seg() -} - -#else /*!WIN64*/ - -#pragma comment(linker, "/INCLUDE:__tls_used") -#pragma comment(linker, "/INCLUDE:__tls_xl_b__") - -extern "C" { -# pragma data_seg(".CRT$XLB") - PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; -# pragma data_seg() -} - -#endif/*!WIN64*/ - -#elif defined(__GNUC__) - -#define IPC_CRTALLOC__(x) __attribute__ ((section (x) )) - -#if defined(__MINGW64__) || (__MINGW64_VERSION_MAJOR) || \ - (__MINGW32_MAJOR_VERSION > 3) || ((__MINGW32_MAJOR_VERSION == 3) && (__MINGW32_MINOR_VERSION >= 18)) - -extern "C" { - IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; -} - -#else /*!MINGW*/ - -extern "C" { - ULONG _tls_index__ = 0; - - IPC_CRTALLOC__(".tls$AAA") char _tls_start__ = 0; - IPC_CRTALLOC__(".tls$ZZZ") char _tls_end__ = 0; - - IPC_CRTALLOC__(".CRT$XLA") PIMAGE_TLS_CALLBACK _tls_xl_a__ = 0; - IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; - IPC_CRTALLOC__(".CRT$XLZ") PIMAGE_TLS_CALLBACK _tls_xl_z__ = 0; -} - -extern "C" NX_CRTALLOC_(".tls") const IMAGE_TLS_DIRECTORY _tls_used = { - (ULONG_PTR)(&_tls_start__ + 1), - (ULONG_PTR) &_tls_end__, - (ULONG_PTR) &_tls_index__, - (ULONG_PTR) &_tls_xl_b__, - (ULONG)0, (ULONG)0 -} - -#endif/*!MINGW*/ - -#endif/*_MSC_VER, __GNUC__*/ - -} // namespace tls -} // namespace ipc diff --git a/src/libipc/platform/tls_pointer_win.h b/src/libipc/platform/tls_pointer_win.h deleted file mode 100755 index 2e9e5a7..0000000 --- a/src/libipc/platform/tls_pointer_win.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#include // std::atomic_thread_fence -#include // assert - -#include "libipc/tls_pointer.h" - -#include "libipc/platform/tls_detail_win.h" -#include "libipc/utility/utility.h" - -namespace ipc { -namespace tls { - -bool create(key_info * pkey, destructor_t destructor) { - assert(pkey != nullptr); - pkey->key_ = horrible_cast(destructor); - std::atomic_thread_fence(std::memory_order_seq_cst); - return true; -} - -void release(key_info const * pkey) { - assert(pkey != nullptr); - assert(tls_get_recs() != nullptr); - tls_get_recs()->erase(pkey); -} - -bool set(key_info const * pkey, void * ptr) { - assert(pkey != nullptr); - assert(tls_get_recs() != nullptr); - (*tls_get_recs())[pkey] = ptr; - return true; -} - -void * get(key_info const * pkey) { - assert(pkey != nullptr); - assert(tls_get_recs() != nullptr); - auto const recs = tls_get_recs(); - auto it = recs->find(pkey); - if (it == recs->end()) { - return nullptr; - } - return it->second; -} - -} // namespace tls -} // namespace ipc diff --git a/src/libipc/platform/to_tchar.h b/src/libipc/platform/to_tchar.h index df67b2d..3cc2840 100755 --- a/src/libipc/platform/to_tchar.h +++ b/src/libipc/platform/to_tchar.h @@ -1,12 +1,14 @@ #pragma once -#include +#include #include #include #include #include #include +#include +#include #include "libipc/utility/concept.h" #include "libipc/platform/detail.h" @@ -34,34 +36,38 @@ using IsSameChar = ipc::require::value, R>; //////////////////////////////////////////////////////////////// template -constexpr auto to_tchar(ipc::string && str) -> IsSameChar { - return std::move(str); +constexpr auto to_tchar(ipc::string &&str) -> IsSameChar { + return std::move(str); // noconv } +/** + * codecvt_utf8_utf16/std::wstring_convert is deprecated + * @see https://codingtidbit.com/2020/02/09/c17-codecvt_utf8-is-deprecated/ + * https://stackoverflow.com/questions/42946335/deprecated-header-codecvt-replacement + * https://en.cppreference.com/w/cpp/locale/codecvt/in + * https://docs.microsoft.com/en-us/windows/win32/api/stringapiset/nf-stringapiset-multibytetowidechar +*/ template -constexpr auto to_tchar(ipc::string && str) -> IsSameChar { - return std::wstring_convert< - std::codecvt_utf8_utf16, - wchar_t, - ipc::mem::allocator, - ipc::mem::allocator - >{}.from_bytes(std::move(str)); -} - -template -inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar { - std::memcpy(dst, src, size); -} - -template -inline auto to_tchar(T* dst, char const * src, std::size_t size) -> IsSameChar { - auto wstr = std::wstring_convert< - std::codecvt_utf8_utf16, - wchar_t, - ipc::mem::allocator, - ipc::mem::allocator - >{}.from_bytes(src, src + size); - std::memcpy(dst, wstr.data(), (ipc::detail::min)(wstr.size(), size)); +auto to_tchar(ipc::string &&external) -> IsSameChar { + if (external.empty()) { + return {}; // noconv + } + /** + * CP_ACP : The system default Windows ANSI code page. + * CP_MACCP : The current system Macintosh code page. + * CP_OEMCP : The current system OEM code page. + * CP_SYMBOL : Symbol code page (42). + * CP_THREAD_ACP: The Windows ANSI code page for the current thread. + * CP_UTF7 : UTF-7. Use this value only when forced by a 7-bit transport mechanism. Use of UTF-8 is preferred. + * CP_UTF8 : UTF-8. + */ + int size_needed = ::MultiByteToWideChar(CP_UTF8, 0, &external[0], (int)external.size(), NULL, 0); + if (size_needed <= 0) { + return {}; + } + ipc::wstring internal(size_needed, L'\0'); + ::MultiByteToWideChar(CP_UTF8, 0, &external[0], (int)external.size(), &internal[0], size_needed); + return internal; } } // namespace detail diff --git a/src/libipc/policy.h b/src/libipc/policy.h index d2ad313..8959607 100755 --- a/src/libipc/policy.h +++ b/src/libipc/policy.h @@ -15,8 +15,10 @@ struct choose; template struct choose { + using flag_t = Flag; + template - using elems_t = circ::elem_array, DataSize, AlignSize>; + using elems_t = circ::elem_array, DataSize, AlignSize>; }; } // namespace policy diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index d5684fa..28d99bd 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -58,13 +58,14 @@ struct prod_cons_impl> { return false; } - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); + std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -80,8 +81,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -92,6 +94,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -156,8 +159,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -179,6 +183,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -263,20 +268,20 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (rem_cc == 0) { + if ((cur_rc & ep_mask) == 0) { + std::forward(out)(true); return true; } - if (el->rc_.compare_exchange_weak(cur_rc, - cur_rc & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)((nxt_rc & ep_mask) == 0); return true; } ipc::yield(k); @@ -395,8 +400,8 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -406,17 +411,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc == 0) { + if ((cur_rc & rc_mask) == 0) { + std::forward(out)(true); el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; } - if ((rem_cc & ~wrapper->connected_id()) == 0) { + auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); + bool last_one = false; + if ((last_one = (nxt_rc & rc_mask) == 0)) { el->f_ct_.store(cur + N - 1, std::memory_order_release); } - if (el->rc_.compare_exchange_weak(cur_rc, - inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)(last_one); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f04febf..1a782a9 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -155,11 +155,11 @@ public: return !valid() || (cursor_ == elems_->cursor()); } - template - bool push(P&&... params) { + template + bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->push(this, [&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } @@ -171,14 +171,14 @@ public: }); } - template - bool pop(T& item) { + template + bool pop(T& item, F&& out) { if (elems_ == nullptr) { return false; } return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }); + }, std::forward(out)); } }; @@ -204,7 +204,12 @@ public: } bool pop(T& item) { - return base_t::pop(item); + return base_t::pop(item, [](bool) {}); + } + + template + bool pop(T& item, F&& out) { + return base_t::pop(item, std::forward(out)); } }; diff --git a/src/libipc/utility/id_pool.h b/src/libipc/utility/id_pool.h index 7207aa9..24d32e4 100755 --- a/src/libipc/utility/id_pool.h +++ b/src/libipc/utility/id_pool.h @@ -2,13 +2,15 @@ #include // std::aligned_storage_t #include // std::memcmp +#include #include "libipc/def.h" - #include "libipc/platform/detail.h" namespace ipc { +using storage_id_t = std::int32_t; + template struct id_type; @@ -16,7 +18,7 @@ template struct id_type<0, AlignSize> { uint_t<8> id_; - id_type& operator=(std::size_t val) { + id_type& operator=(storage_id_t val) { id_ = static_cast>(val); return (*this); } @@ -57,7 +59,7 @@ public: } void init() { - for (std::size_t i = 0; i < max_count;) { + for (storage_id_t i = 0; i < max_count;) { i = next_[i] = (i + 1); } } @@ -71,22 +73,22 @@ public: return cursor_ == max_count; } - std::size_t acquire() { - if (empty()) return invalid_value; - std::size_t id = cursor_; + storage_id_t acquire() { + if (empty()) return -1; + storage_id_t id = cursor_; cursor_ = next_[id]; // point to next return id; } - bool release(std::size_t id) { - if (id == invalid_value) return false; + bool release(storage_id_t id) { + if (id < 0) return false; next_[id] = cursor_; cursor_ = static_cast>(id); // put it back return true; } - void * at(std::size_t id) { return &(next_[id].data_); } - void const * at(std::size_t id) const { return &(next_[id].data_); } + void * at(storage_id_t id) { return &(next_[id].data_); } + void const * at(storage_id_t id) const { return &(next_[id].data_); } }; template @@ -94,8 +96,8 @@ class obj_pool : public id_pool { using base_t = id_pool; public: - T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } - T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } + T * at(storage_id_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(storage_id_t id) const { return reinterpret_cast(base_t::at(id)); } }; } // namespace ipc diff --git a/src/tls_pointer.cpp b/src/tls_pointer.cpp deleted file mode 100755 index dcfba88..0000000 --- a/src/tls_pointer.cpp +++ /dev/null @@ -1,8 +0,0 @@ - -#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ - defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ - defined(WINCE) || defined(_WIN32_WCE) -#include "libipc/platform/tls_pointer_win.h" -#else /*!WIN*/ -#include "libipc/platform/tls_pointer_linux.h" -#endif/*!WIN*/ diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index faae13c..16c076c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include "libipc/ipc.h" @@ -16,15 +18,28 @@ using namespace ipc; namespace { +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; + +struct msg_head { + int id_; +}; + class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{1, 65536}(); + int size = capo::random<>{sizeof(msg_head), 65536}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); } + rand_buf(msg_head const &msg) { + *this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) { + delete static_cast(p); + }); + } + rand_buf(rand_buf &&) = default; rand_buf(rand_buf const & rhs) { if (rhs.empty()) return; @@ -40,11 +55,11 @@ public: } void set_id(int k) noexcept { - *get() = static_cast(k); + get()->id_ = k; } int get_id() const noexcept { - return static_cast(*get()); + return get()->id_; } using buffer::operator=; @@ -60,64 +75,79 @@ void test_basic(char const * name) { EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; - EXPECT_TRUE(que1.send(test1)); - EXPECT_TRUE(que1.try_send(test2)); + ASSERT_TRUE(que1.send(test1)); + ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); } -template -void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan; +class data_set { + std::vector datas_; +public: + data_set() { + datas_.resize(LoopCount); + for (int i = 0; i < LoopCount; ++i) { + datas_[i].set_id(i); + } + } + + std::vector const &get() const noexcept { + return datas_; + } +} const data_set__; + +template > +void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt)); + + std::atomic_thread_fence(std::memory_order_seq_cst); ipc_ut::test_stopwatch sw; - std::vector tests(size); for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { - que_t que1 { name }; - EXPECT_TRUE(que1.wait_for_recv(r_cnt)); + ipc_ut::sender() << [name, &sw, r_cnt, k] { + Que que { name, ipc::sender }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); - for (auto & buf : tests) { - rand_buf data { buf }; - data.set_id(k); - EXPECT_TRUE(que1.send(data)); + for (int i = 0; i < (int)data_set__.get().size(); ++i) { + EXPECT_TRUE(que.send(data_set__.get()[i])); } }; } for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [name, &tests, s_cnt] { - que_t que2 { name, ipc::receiver }; - std::vector cursors(s_cnt); + ipc_ut::reader() << [name] { + Que que { name, ipc::receiver }; for (;;) { - rand_buf got { que2.recv() }; + rand_buf got { que.recv() }; ASSERT_FALSE(got.empty()); - int & cur = cursors.at(got.get_id()); - ASSERT_TRUE((cur >= 0) && (cur < static_cast(tests.size()))); - rand_buf buf { tests.at(cur++) }; - buf.set_id(got.get_id()); - EXPECT_EQ(got, buf); - int n = 0; - for (; n < static_cast(cursors.size()); ++n) { - if (cursors[n] < static_cast(tests.size())) break; + int i = got.get_id(); + if (i == -1) { + return; + } + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + auto const &data_set = data_set__.get()[i]; + if (data_set != got) { + printf("data_set__.get()[%d] != got, size = %zd/%zd\n", + i, data_set.size(), got.size()); + EXPECT_TRUE(false); } - if (n == static_cast(cursors.size())) break; } }; } ipc_ut::sender().wait_for_done(); + Que que { name }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); + for (int k = 0; k < r_cnt; ++k) { + que.send(rand_buf{msg_head{-1}}); + } ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, size, name); + sw.print_elapsed(s_cnt, r_cnt, (int)data_set__.get().size(), name); } -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; - } // internal-linkage TEST(IPC, basic) { @@ -129,22 +159,26 @@ TEST(IPC, basic) { } TEST(IPC, 1v1) { - test_sr("ssu", LoopCount, 1, 1); - test_sr("smu", LoopCount, 1, 1); - test_sr("mmu", LoopCount, 1, 1); - test_sr("smb", LoopCount, 1, 1); - test_sr("mmb", LoopCount, 1, 1); + test_sr("ssu", 1, 1); + test_sr("smu", 1, 1); + test_sr("mmu", 1, 1); + test_sr("smb", 1, 1); + test_sr("mmb", 1, 1); } TEST(IPC, 1vN) { - test_sr("smb", LoopCount, 1, MultiMax); - test_sr("mmb", LoopCount, 1, MultiMax); + //test_sr("smu", 1, MultiMax); + //test_sr("mmu", 1, MultiMax); + test_sr("smb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { - test_sr("mmb", LoopCount, MultiMax, 1); + //test_sr("mmu", MultiMax, 1); + test_sr("mmb", MultiMax, 1); } TEST(IPC, NvN) { - test_sr("mmb", LoopCount, MultiMax, MultiMax); + //test_sr("mmu", MultiMax, MultiMax); + test_sr("mmb", MultiMax, MultiMax); } diff --git a/test/test_platform.cpp b/test/test_platform.cpp new file mode 100644 index 0000000..e7a6059 --- /dev/null +++ b/test/test_platform.cpp @@ -0,0 +1,31 @@ +#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ + defined(WINCE) || defined(_WIN32_WCE) + +#include +#include +//#include + +#include "test.h" + +#include "libipc/platform/to_tchar.h" + +TEST(Platform, to_tchar) { + char const *utf8 = "hello world, " + "\xe4\xbd\xa0\xe5\xa5\xbd\xef\xbc" + "\x8c\xe3\x81\x93\xe3\x82\x93\xe3" + "\x81\xab\xe3\x81\xa1\xe3\x81\xaf"; + wchar_t const *utf16 = L"hello world, \u4f60\u597d\uff0c\u3053\u3093\u306b\u3061\u306f"; + { + ipc::string str = ipc::detail::to_tchar(utf8); + EXPECT_STREQ(str.c_str(), utf8); + } + { + ipc::wstring wtr = ipc::detail::to_tchar(utf8); + EXPECT_STREQ(wtr.c_str(), utf16); + //std::ofstream out("out.txt", std::ios::binary|std::ios::out); + //out.write((char const *)wtr.c_str(), wtr.size() * sizeof(wchar_t)); + } +} + +#endif/*WIN*/ \ No newline at end of file diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c1114ef..a59b3a7 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -44,7 +44,7 @@ constexpr int ThreadMax = 8; template void push(Que & que, int p, int d) { - for (int n = 0; !que.push(p, d); ++n) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { ASSERT_NE(n, PushRetry); std::this_thread::yield(); } diff --git a/test/test_thread_utility.cpp b/test/test_thread_utility.cpp index e0b0512..02dd0c2 100755 --- a/test/test_thread_utility.cpp +++ b/test/test_thread_utility.cpp @@ -11,7 +11,6 @@ #include "capo/type_name.hpp" #include "libipc/rw_lock.h" -#include "libipc/tls_pointer.h" #include "test.h" #include "thread_pool.h" @@ -89,6 +88,7 @@ void test_lock_performance(int w, int r) { // for (int i = 2; i <= ThreadMax; ++i) test_lock_performance(i, i); //} +#if 0 // disable ipc::tls TEST(Thread, tls_main_thread) { ipc::tls::pointer p; EXPECT_FALSE(p); @@ -199,4 +199,5 @@ TEST(Thread, tls_benchmark) { benchmark_tls("ipc_tls: int"); benchmark_tls("std_tls: Str"); benchmark_tls("ipc_tls: Str"); -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/test/thread_pool.h b/test/thread_pool.h index a7d2b28..03b7e8f 100755 --- a/test/thread_pool.h +++ b/test/thread_pool.h @@ -9,6 +9,8 @@ #include // std::size_t #include // assert +#include "capo/scope_guard.hpp" + namespace ipc_ut { class thread_pool final { @@ -32,6 +34,9 @@ class thread_pool final { if (pool->quit_) return; if (pool->jobs_.empty()) { pool->waiting_cnt_ += 1; + CAPO_SCOPE_GUARD_ = [pool] { + pool->waiting_cnt_ -= 1; + }; if (pool->waiting_cnt_ == pool->workers_.size()) { pool->cv_empty_.notify_all(); @@ -41,8 +46,6 @@ class thread_pool final { pool->cv_jobs_.wait(guard); if (pool->quit_) return; } while (pool->jobs_.empty()); - - pool->waiting_cnt_ -= 1; } assert(!pool->jobs_.empty()); job = std::move(pool->jobs_.front()); @@ -71,6 +74,7 @@ public: } void start(std::size_t n) { + std::unique_lock guard { lock_ }; if (n <= workers_.size()) return; for (std::size_t i = workers_.size(); i < n; ++i) { workers_.push_back(std::thread { &thread_pool::proc, this });