diff --git a/build/src.pro b/build/src.pro index 17c2765..f5a2566 100644 --- a/build/src.pro +++ b/build/src.pro @@ -21,7 +21,8 @@ HEADERS += \ ../include/circ_queue.h \ ../include/ipc.h \ ../include/def.h \ - ../include/rw_lock.h + ../include/rw_lock.h \ + ../src/thread_local_ptr.h SOURCES += \ ../src/shm.cpp \ @@ -42,7 +43,8 @@ INSTALLS += target else:win32 { SOURCES += \ - ../src/platform/shm_win.cpp + ../src/platform/shm_win.cpp \ + ../src/platform/thread_local_ptr_win.cpp LIBS += -lKernel32 diff --git a/include/def.h b/include/def.h index 4655e86..4edbaa7 100644 --- a/include/def.h +++ b/include/def.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace ipc { diff --git a/src/ipc.cpp b/src/ipc.cpp index 55eab34..7e79d38 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -13,6 +13,7 @@ #include "def.h" #include "circ_queue.h" #include "rw_lock.h" +#include "thread_local_ptr.h" namespace { @@ -29,6 +30,14 @@ struct msg_t { using queue_t = circ::queue; using guard_t = std::unique_ptr, void(*)(handle_t)>; +/* + * thread_local stl object's destructor causing crash + * See: https://sourceforge.net/p/mingw-w64/bugs/527/ + * https://sourceforge.net/p/mingw-w64/bugs/727/ +*/ +/*thread_local*/ +thread_local_ptr>> recv_caches__; + std::unordered_map h2q__; rw_lock h2q_lc__; @@ -128,27 +137,34 @@ std::vector recv(handle_t h) { if (!queue->connected()) { queue->connect(); } - static thread_local std::unordered_map> all; + auto rcs = recv_caches__.create(); do { + // pop a new message auto msg = queue->pop(); - // here comes a new message - auto& cache = all[msg.id_]; // find the cache using message id - auto last_size = cache.size(); - if (msg.remain_ > 0) { - cache.resize(last_size + data_length); - std::memcpy(cache.data() + last_size, msg.data_, data_length); + // remain_ may minus & abs(remain_) < data_length + std::size_t remain = static_cast( + static_cast(data_length) + msg.remain_); + auto cache_it = rcs->find(msg.id_); + if (cache_it == rcs->end()) { + std::vector buf(remain); + std::memcpy(buf.data(), msg.data_, remain); + return buf; } - else { - // remain_ is minus & abs(remain_) < data_length - std::size_t remain = static_cast( - static_cast(data_length) + msg.remain_); + // has cache before this message + auto& cache = cache_it->second; + auto last_size = cache.size(); + // this is the last message fragment + if (msg.remain_ <= 0) { cache.resize(last_size + remain); std::memcpy(cache.data() + last_size, msg.data_, remain); // finish this message, erase it from cache - auto ret = std::move(cache); - all.erase(msg.id_); - return ret; + auto buf = std::move(cache); + rcs->erase(cache_it); + return buf; } + // there are remain datas after this message + cache.resize(last_size + data_length); + std::memcpy(cache.data() + last_size, msg.data_, data_length); } while(1); } diff --git a/src/platform/thread_local_ptr_win.cpp b/src/platform/thread_local_ptr_win.cpp new file mode 100644 index 0000000..33e5be3 --- /dev/null +++ b/src/platform/thread_local_ptr_win.cpp @@ -0,0 +1,166 @@ +#include "thread_local_ptr.h" + +#include // ::Tls... +#include // std::unordered_map + +namespace ipc { + +/* + + + Windows doesn't support a per-thread destructor with its TLS primitives. + So, here will build it manually by inserting a function to be called on each thread's exit. + See: https://www.codeproject.com/Articles/8113/Thread-Local-Storage-The-C-Way + https://src.chromium.org/viewvc/chrome/trunk/src/base/threading/thread_local_storage_win.cc + https://github.com/mirror/mingw-org-wsl/blob/master/src/libcrt/crt/tlssup.c + https://github.com/Alexpux/mingw-w64/blob/master/mingw-w64-crt/crt/tlssup.c + http://svn.boost.org/svn/boost/trunk/libs/thread/src/win32/tss_pe.cpp +*/ + +namespace { + struct tls_data { + using destructor_t = void(*)(void*); + using map_t = std::unordered_map; + + static DWORD& key() { + static IPC_THREAD_LOCAL_KEY_ rec_key = ::TlsAlloc(); + return rec_key; + } + + static map_t* records(map_t* rec) { + IPC_THREAD_LOCAL_SET(key(), rec); + return rec; + } + + static map_t* records() { + return static_cast(IPC_THREAD_LOCAL_GET(key())); + } + + IPC_THREAD_LOCAL_KEY_ key_ = 0; + destructor_t destructor_ = nullptr; + + tls_data() = default; + + tls_data(IPC_THREAD_LOCAL_KEY_ key, destructor_t destructor) + : key_ (key) + , destructor_(destructor) + {} + + tls_data(tls_data&& rhs) : tls_data() { + (*this) = std::move(rhs); + } + + tls_data& operator=(tls_data&& rhs) { + key_ = rhs.key_; + destructor_ = rhs.destructor_; + rhs.key_ = 0; + rhs.destructor_ = nullptr; + return *this; + } + + ~tls_data() { + if (destructor_) destructor_(IPC_THREAD_LOCAL_GET(key_)); + } + }; +} + +void thread_local_create(IPC_THREAD_LOCAL_KEY_& key, void (*destructor)(void*)) { + key = ::TlsAlloc(); + if (key == TLS_OUT_OF_INDEXES) return; + auto rec = tls_data::records(); + if (!rec) rec = tls_data::records(new tls_data::map_t); + if (!rec) return; + rec->emplace(key, tls_data{ key, destructor }); +} + +void thread_local_delete(IPC_THREAD_LOCAL_KEY_ key) { + auto rec = tls_data::records(); + if (!rec) return; + rec->erase(key); + ::TlsFree(key); +} + +//////////////////////////////////////////////////////////////// +/// Call destructors on thread exit +//////////////////////////////////////////////////////////////// + +void OnThreadExit() { + auto rec = tls_data::records(); + if (rec == nullptr) return; + delete rec; + tls_data::records(nullptr); +} + +void NTAPI OnTlsCallback(PVOID, DWORD dwReason, PVOID) { + if (dwReason == DLL_THREAD_DETACH) OnThreadExit(); +} + +#if defined(_MSC_VER) + +#if defined(IPC_OS_WIN64_) + +#pragma comment(linker, "/INCLUDE:_tls_used") +#pragma comment(linker, "/INCLUDE:_tls_xl_b__") + +extern "C" +{ +# pragma const_seg(".CRT$XLB") + extern const PIMAGE_TLS_CALLBACK _tls_xl_b__; + const PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; +# pragma const_seg() +} + +#else /*!IPC_OS_WIN64_*/ + +#pragma comment(linker, "/INCLUDE:__tls_used") +#pragma comment(linker, "/INCLUDE:__tls_xl_b__") + +extern "C" +{ +# pragma data_seg(".CRT$XLB") + PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; +# pragma data_seg() +} + +#endif/*!IPC_OS_WIN64_*/ + +#elif defined(__GNUC__) + +#define IPC_CRTALLOC__(x) __attribute__ ((section (x) )) + +#if defined(__MINGW64__) || (__MINGW64_VERSION_MAJOR) || \ + (__MINGW32_MAJOR_VERSION > 3) || ((__MINGW32_MAJOR_VERSION == 3) && (__MINGW32_MINOR_VERSION >= 18)) + +extern "C" +{ + IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; +} + +#else /*!__MINGW*/ + +extern "C" +{ + ULONG _tls_index__ = 0; + + IPC_CRTALLOC__(".tls$AAA") char _tls_start__ = 0; + IPC_CRTALLOC__(".tls$ZZZ") char _tls_end__ = 0; + + IPC_CRTALLOC__(".CRT$XLA") PIMAGE_TLS_CALLBACK _tls_xl_a__ = 0; + IPC_CRTALLOC__(".CRT$XLB") PIMAGE_TLS_CALLBACK _tls_xl_b__ = OnTlsCallback; + IPC_CRTALLOC__(".CRT$XLZ") PIMAGE_TLS_CALLBACK _tls_xl_z__ = 0; +} + +extern "C" NX_CRTALLOC_(".tls") const IMAGE_TLS_DIRECTORY _tls_used = +{ + (ULONG_PTR)(&_tls_start__ + 1), + (ULONG_PTR) &_tls_end__, + (ULONG_PTR) &_tls_index__, + (ULONG_PTR) &_tls_xl_b__, + (ULONG)0, (ULONG)0 +} + +#endif/*!__MINGW*/ + +#endif/*_MSC_VER, __GNUC__*/ + +} // namespace ipc diff --git a/src/thread_local_ptr.h b/src/thread_local_ptr.h new file mode 100644 index 0000000..4d3675b --- /dev/null +++ b/src/thread_local_ptr.h @@ -0,0 +1,100 @@ +#pragma once + +#include // std::forward + +#if defined(WINCE) || defined(_WIN32_WCE) || \ + defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +#include // DWORD, ::Tls... +#define IPC_OS_WIN_ +#else +#include // pthread_... +#endif + +namespace ipc { + +#if defined(IPC_OS_WIN_) + +#define IPC_THREAD_LOCAL_KEY_ DWORD +#define IPC_THREAD_LOCAL_SET(KEY, PTR) (::TlsSetValue(KEY, (LPVOID)PTR) == TRUE) +#define IPC_THREAD_LOCAL_GET(KEY) (::TlsGetValue(KEY)) + +void thread_local_create(IPC_THREAD_LOCAL_KEY_& key, void (*destructor)(void*)); +void thread_local_delete(IPC_THREAD_LOCAL_KEY_ key); + +#define IPC_THREAD_LOCAL_CREATE(KEY, DESTRUCTOR) thread_local_create(KEY, DESTRUCTOR) +#define IPC_THREAD_LOCAL_DELETE(KEY) thread_local_delete(KEY) + +#else /*!IPC_OS_WIN_*/ + +#define IPC_THREAD_LOCAL_KEY_ pthread_key_t +#define IPC_THREAD_LOCAL_CREATE(KEY, DESTRUCTOR) pthread_key_create(&KEY, DESTRUCTOR) +#define IPC_THREAD_LOCAL_DELETE(KEY) pthread_key_delete(KEY) +#define IPC_THREAD_LOCAL_SET(KEY, PTR) (pthread_setspecific(KEY, (void*)PTR) == 0) +#define IPC_THREAD_LOCAL_GET(KEY) pthread_getspecific(KEY) + +#endif/*!IPC_OS_WIN_*/ + +//////////////////////////////////////////////////////////////// +/// Thread-local pointer +//////////////////////////////////////////////////////////////// + +/* + + + 1. In Windows, if you do not compile thread_local_ptr.cpp, + use thread_local_ptr will cause memory leaks. + + 2. You need to set the thread_local_ptr's storage manually: + ``` + thread_local_ptr p; + if (!p) p = new int(123); + ``` + Just like an ordinary pointer. Or you could just call create: + ``` + thread_local_ptr p; + p.create(123); + ``` +*/ + +template +class thread_local_ptr { + IPC_THREAD_LOCAL_KEY_ key_; + +public: + using value_type = T; + + thread_local_ptr() { + IPC_THREAD_LOCAL_CREATE(key_, [](void* p) { + delete static_cast(p); + }); + } + + ~thread_local_ptr() { + IPC_THREAD_LOCAL_DELETE(key_); + } + + template + T* create(P&&... params) { + auto ptr = static_cast(*this); + if (ptr == nullptr) { + return (*this) = new T(std::forward

(params)...); + } + return ptr; + } + + T* operator=(T* ptr) { + IPC_THREAD_LOCAL_SET(key_, ptr); + return ptr; + } + + operator T*() const { return static_cast(IPC_THREAD_LOCAL_GET(key_)); } + + T& operator* () { return *static_cast(*this); } + const T& operator* () const { return *static_cast(*this); } + + T* operator->() { return static_cast(*this); } + const T* operator->() const { return static_cast(*this); } +}; + +} // namespace ipc