diff --git a/CMakeLists.txt b/CMakeLists.txt index cc4660f..b6df272 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,3 +16,4 @@ add_subdirectory(src) add_subdirectory(3rdparty/gtest) add_subdirectory(test) add_subdirectory(demo/chat) +add_subdirectory(demo/msg_que) diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index 53e1cbc..ec9c3be 100755 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -1,3 +1,4 @@ + #include #include #include @@ -8,51 +9,54 @@ namespace { -char name__[] = "ipc-chat"; -char quit__[] = "q"; -char id__ [] = "c"; +constexpr char const name__[] = "ipc-chat"; +constexpr char const quit__[] = "q"; +constexpr char const id__ [] = "c"; -std::size_t calc_unique_id() { +inline std::size_t calc_unique_id() { static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic) }; return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); } +ipc::channel sender__ { name__, ipc::sender }; +ipc::channel receiver__ { name__, ipc::receiver }; + } // namespace int main() { std::string buf, id = id__ + std::to_string(calc_unique_id()); std::regex reg { "(c\\d+)> (.*)" }; - ipc::channel cc { name__, ipc::sender }; - std::thread r {[&id, ®] { - ipc::channel cc { name__, ipc::receiver }; std::cout << id << " is ready." << std::endl; while (1) { - auto buf = cc.recv(); - if (buf.empty()) continue; + ipc::buff_t buf = receiver__.recv(); + if (buf.empty()) break; // quit std::string dat { buf.get(), buf.size() - 1 }; std::smatch mid; if (std::regex_match(dat, mid, reg)) { if (mid.str(1) == id) { if (mid.str(2) == quit__) { - std::cout << "receiver quit..." << std::endl; - return; + break; } continue; } } std::cout << dat << std::endl; } + std::cout << id << " receiver is quit..." << std::endl; }}; for (/*int i = 1*/;; /*++i*/) { std::cin >> buf; + if (buf.empty() || (buf == quit__)) break; // std::cout << "[" << i << "]" << std::endl; - cc.send(id + "> " + buf); - if (buf == quit__) break; + sender__.send(id + "> " + buf); + buf.clear(); } + receiver__.disconnect(); r.join(); + std::cout << id << " sender is quit..." << std::endl; return 0; } diff --git a/demo/msg_que/CMakeLists.txt b/demo/msg_que/CMakeLists.txt new file mode 100644 index 0000000..cd9e302 --- /dev/null +++ b/demo/msg_que/CMakeLists.txt @@ -0,0 +1,11 @@ +project(msg_que) + +include_directories( + ${CMAKE_SOURCE_DIR}/3rdparty) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp new file mode 100644 index 0000000..79b5433 --- /dev/null +++ b/demo/msg_que/main.cpp @@ -0,0 +1,124 @@ + +#include + +#include +#include +#include +#include +#include +#include + +#include "libipc/ipc.h" +#include "capo/random.hpp" + +namespace { + +constexpr char const name__ [] = "ipc-msg-que"; +constexpr char const mode_s__[] = "s"; +constexpr char const mode_r__[] = "r"; + +constexpr std::size_t const min_sz = 128; +constexpr std::size_t const max_sz = 1024 * 16; + +std::atomic is_quit__{ false }; +std::atomic size_per_1s__{ 0 }; + +using msg_que_t = ipc::chan; + +msg_que_t que__{ name__ }; +ipc::byte_t buff__[max_sz]; +capo::random<> rand__{ min_sz, max_sz }; + +inline std::string str_of_size(std::size_t sz) noexcept { + if (sz <= 1024) { + return std::to_string(sz) + " bytes"; + } + if (sz <= 1024 * 1024) { + return std::to_string(sz / 1024) + " KB"; + } + return std::to_string(sz / (1024 * 1024)) + " MB"; +} + +inline std::string speed_of(std::size_t sz) noexcept { + return str_of_size(sz) + "/s"; +} + +void do_counting() { + for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms + if (i % 10) continue; + i = 0; + std::cout + << speed_of(size_per_1s__.load(std::memory_order_acquire)) + << std::endl; + size_per_1s__.store(0, std::memory_order_release); + } +} + +void do_send() { + std::cout + << __func__ << ": start [" + << str_of_size(min_sz) << " - " << str_of_size(max_sz) + << "]...\n"; + if (!que__.reconnect(ipc::sender)) { + std::cerr << __func__ << ": connect failed.\n"; + } + else { + std::thread counting{ do_counting }; + while (!is_quit__.load(std::memory_order_acquire)) { + std::size_t sz = static_cast(rand__()); + if (!que__.send(ipc::buff_t(buff__, sz))) { + std::cerr << __func__ << ": send failed.\n"; + std::cout << __func__ << ": waiting for receiver...\n"; + if (!que__.wait_for_recv(1)) { + std::cerr << __func__ << ": wait receiver failed.\n"; + is_quit__.store(true, std::memory_order_release); + break; + } + } + size_per_1s__.fetch_add(sz, std::memory_order_release); + std::this_thread::yield(); + } + counting.join(); + } + std::cout << __func__ << ": quit...\n"; +} + +void do_recv() { + std::cout + << __func__ << ": start [" + << str_of_size(min_sz) << " - " << str_of_size(max_sz) + << "]...\n"; + if (!que__.reconnect(ipc::receiver)) { + std::cerr << __func__ << ": connect failed.\n"; + } + else { + std::thread counting{ do_counting }; + while (!is_quit__.load(std::memory_order_acquire)) { + auto msg = que__.recv(); + if (msg.empty()) break; + size_per_1s__.fetch_add(msg.size(), std::memory_order_release); + } + counting.join(); + } + std::cout << __func__ << ": quit...\n"; +} + +} // namespace + +int main(int argc, char ** argv) { + if (argc < 2) return 0; + + ::signal(SIGINT, [](int) { + is_quit__.store(true, std::memory_order_release); + que__.disconnect(); + }); + + if (std::string{ argv[1] } == mode_s__) { + do_send(); + } + else if (std::string{ argv[1] } == mode_r__) { + do_recv(); + } + return 0; +} diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index 44a7135..ed50d8e 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -19,19 +19,21 @@ enum : unsigned { template struct IPC_EXPORT chan_impl { - static handle_t connect (char const * name, unsigned mode); - static void disconnect(handle_t h); + static bool connect (ipc::handle_t * ph, char const * name, unsigned mode); + static bool reconnect (ipc::handle_t * ph, unsigned mode); + static void disconnect(ipc::handle_t h); + static void destroy (ipc::handle_t h); - static char const * name(handle_t h); + static char const * name(ipc::handle_t h); - static std::size_t recv_count(handle_t h); - static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm); + static std::size_t recv_count(ipc::handle_t h); + static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm); - static bool send(handle_t h, void const * data, std::size_t size, std::size_t tm); - static buff_t recv(handle_t h, std::size_t tm); + static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); + static buff_t recv(ipc::handle_t h, std::size_t tm); - static bool try_send(handle_t h, void const * data, std::size_t size, std::size_t tm); - static buff_t try_recv(handle_t h); + static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); + static buff_t try_recv(ipc::handle_t h); }; template @@ -39,7 +41,7 @@ class chan_wrapper { private: using detail_t = chan_impl; - handle_t h_ = nullptr; + ipc::handle_t h_ = nullptr; unsigned mode_ = ipc::sender; public: @@ -54,7 +56,7 @@ public: } ~chan_wrapper() { - disconnect(); + detail_t::destroy(h_); } void swap(chan_wrapper& rhs) noexcept { @@ -70,7 +72,7 @@ public: return detail_t::name(h_); } - handle_t handle() const noexcept { + ipc::handle_t handle() const noexcept { return h_; } @@ -89,14 +91,18 @@ public: bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) { if (name == nullptr || name[0] == '\0') return false; this->disconnect(); - h_ = detail_t::connect(name, mode_ = mode); - return valid(); + return detail_t::connect(&h_, name, mode_ = mode); + } + + bool reconnect(unsigned mode) { + if (!valid()) return false; + if (mode_ == mode) return true; + return detail_t::reconnect(&h_, mode_ = mode); } void disconnect() { if (!valid()) return; detail_t::disconnect(h_); - h_ = nullptr; } std::size_t recv_count() const { @@ -111,13 +117,25 @@ public: return chan_wrapper(name).wait_for_recv(r_count, tm); } - bool send (void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm) ; } - bool send (buff_t const & buff , std::size_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm) ; } - bool send (std::string const & str , std::size_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); } + bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + return detail_t::send(h_, data, size, tm); + } + bool send(buff_t const & buff, std::size_t tm = default_timeout) { + return this->send(buff.data(), buff.size(), tm); + } + bool send(std::string const & str, std::size_t tm = default_timeout) { + return this->send(str.c_str(), str.size() + 1, tm); + } - bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm) ; } - bool try_send(buff_t const & buff , std::size_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm) ; } - bool try_send(std::string const & str , std::size_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); } + bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + return detail_t::try_send(h_, data, size, tm); + } + bool try_send(buff_t const & buff, std::size_t tm = default_timeout) { + return this->try_send(buff.data(), buff.size(), tm); + } + bool try_send(std::string const & str, std::size_t tm = default_timeout) { + return this->try_send(str.c_str(), str.size() + 1, tm); + } buff_t recv(std::size_t tm = invalid_value) { return detail_t::recv(h_, tm); @@ -128,8 +146,8 @@ public: } }; -template -using chan = chan_wrapper; +template +using chan = chan_wrapper>; /* * class route @@ -142,7 +160,7 @@ using chan = chan_wrapper; * (one producer/writer to multi consumers/readers) */ -using route = chan>; +using route = chan; /* * class channel @@ -152,6 +170,6 @@ using route = chan>; * would receive your sent messages. */ -using channel = chan>; +using channel = chan; } // namespace ipc diff --git a/include/libipc/rw_lock.h b/include/libipc/rw_lock.h index 2a898b1..5757b23 100755 --- a/include/libipc/rw_lock.h +++ b/include/libipc/rw_lock.h @@ -77,11 +77,8 @@ inline void sleep(K& k, F&& f) { if (k < static_cast(N)) { std::this_thread::yield(); } - else if (std::forward(f)()) { - return; - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + static_cast(std::forward(f)()); return; } ++k; @@ -89,7 +86,9 @@ inline void sleep(K& k, F&& f) { template inline void sleep(K& k) { - sleep(k, [] { return false; }); + sleep(k, [] { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); } } // namespace ipc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dccff1..4dfae8e 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,6 +19,7 @@ aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_FILES) file(GLOB HEAD_FILES ${CMAKE_SOURCE_DIR}/include/libipc/*.h + ${CMAKE_SOURCE_DIR}/src/libipc/*.h ${CMAKE_SOURCE_DIR}/src/libipc/*.inc ${CMAKE_SOURCE_DIR}/src/libipc/circ/*.h ${CMAKE_SOURCE_DIR}/src/libipc/memory/*.h diff --git a/src/ipc.cpp b/src/ipc.cpp index e24cf6a..f2ddd37 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "libipc/ipc.h" #include "libipc/def.h" @@ -30,8 +31,6 @@ namespace { -using namespace ipc; - using msg_id_t = std::uint32_t; using acc_t = std::atomic; @@ -64,54 +63,54 @@ struct msg_t : msg_t<0, AlignSize> { }; template -buff_t make_cache(T& data, std::size_t size) { - auto ptr = mem::alloc(size); +ipc::buff_t make_cache(T& data, std::size_t size) { + auto ptr = ipc::mem::alloc(size); std::memcpy(ptr, &data, (ipc::detail::min)(sizeof(data), size)); - return { ptr, size, mem::free }; + return { ptr, size, ipc::mem::free }; } struct cache_t { std::size_t fill_; - buff_t buff_; + ipc::buff_t buff_; - cache_t(std::size_t f, buff_t&& b) + cache_t(std::size_t f, ipc::buff_t && b) : fill_(f), buff_(std::move(b)) {} void append(void const * data, std::size_t size) { if (fill_ >= buff_.size() || data == nullptr || size == 0) return; auto new_fill = (ipc::detail::min)(fill_ + size, buff_.size()); - std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); + std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); fill_ = new_fill; } }; auto cc_acc() { - static shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); + static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); return static_cast(acc_h.get()); } auto& cls_storages() { struct cls_t { - shm::handle id_info_; - std::array::max_count> mems_; + ipc::shm::handle id_info_; + std::array::max_count> mems_; }; static ipc::unordered_map cls_s; return cls_s; } auto& cls_lock() { - static spin_lock cls_l; + static ipc::spin_lock cls_l; return cls_l; } struct cls_info_t { - id_pool<> pool_; - spin_lock lock_; + ipc::id_pool<> pool_; + ipc::spin_lock lock_; }; constexpr std::size_t calc_cls_size(std::size_t size) noexcept { - return (((size - 1) / large_msg_limit) + 1) * large_msg_limit; + return (((size - 1) / ipc::large_msg_limit) + 1) * ipc::large_msg_limit; } auto& cls_storage(std::size_t cls_size) { @@ -135,8 +134,8 @@ cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size) } template -byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { - if (id == invalid_value) { +ipc::byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { + if (id == ipc::invalid_value) { return nullptr; } if (!cls_shm.mems_[id].valid() && @@ -147,7 +146,7 @@ byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std: return nullptr; } - byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + auto ptr = static_cast(cls_shm.mems_[id].get()); if (ptr == nullptr) { ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size); return nullptr; @@ -190,7 +189,7 @@ void* find_storage(std::size_t id, std::size_t size) { } void recycle_storage(std::size_t id, std::size_t size) { - if (id == invalid_value) { + if (id == ipc::invalid_value) { ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); return; } @@ -202,7 +201,7 @@ void recycle_storage(std::size_t id, std::size_t size) { ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size); return; } - byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + auto ptr = static_cast(cls_shm.mems_[id].get()); if (ptr == nullptr) { ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size); return; @@ -221,7 +220,7 @@ void recycle_storage(std::size_t id, std::size_t size) { } void clear_storage(std::size_t id, std::size_t size) { - if (id == invalid_value) { + if (id == ipc::invalid_value) { ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); return; } @@ -254,8 +253,8 @@ struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id - waiter cc_waiter_, wt_waiter_, rd_waiter_; - shm::handle acc_h_; + ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; + ipc::shm::handle acc_h_; /* * thread_local may have some bugs. @@ -268,7 +267,7 @@ struct conn_info_head { * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 */ - tls::pointer> recv_cache_; + ipc::tls::pointer> recv_cache_; conn_info_head(char const * name) : name_ (name) @@ -279,6 +278,12 @@ struct conn_info_head { , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { } + void quit_waiting() { + cc_waiter_.quit_waiting(); + wt_waiter_.quit_waiting(); + rd_waiter_.quit_waiting(); + } + auto acc() { return static_cast(acc_h_.get()); } @@ -295,10 +300,9 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) { bool loop = true, ret = true; ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { ret = waiter.wait_if([&loop, &pred] { - return loop = pred(); - }, tm); + return loop = pred(); + }, tm); k = 0; - return true; }); if (!ret ) return false; // timeout or fail if (!loop) break; @@ -307,7 +311,7 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) { } template struct queue_generator { @@ -341,35 +345,54 @@ constexpr static queue_t* queue_of(ipc::handle_t h) { /* API implementations */ -static ipc::handle_t connect(char const * name, bool start) { - auto h = mem::alloc(name); - auto que = queue_of(h); - if (que == nullptr) { - return nullptr; - } - if (start) { - if (que->connect()) { // wouldn't connect twice - info_of(h)->cc_waiter_.broadcast(); - } - } - return h; -} - static void disconnect(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { return; } - if (que->disconnect()) { - info_of(h)->cc_waiter_.broadcast(); + bool dis = que->disconnect(); + info_of(h)->quit_waiting(); + if (dis) { + info_of(h)->recv_cache().clear(); } - mem::free(info_of(h)); +} + +static bool reconnect(ipc::handle_t * ph, bool start) { + assert(ph != nullptr); + assert(*ph != nullptr); + auto que = queue_of(*ph); + if (que == nullptr) { + return false; + } + if (start) { + if (que->connect()) { // wouldn't connect twice + info_of(*ph)->cc_waiter_.broadcast(); + } + } + // start == false + else if (que->connected()) { + disconnect(*ph); + } + return true; +} + +static bool connect(ipc::handle_t * ph, char const * name, bool start) { + assert(ph != nullptr); + if (*ph == nullptr) { + *ph = ipc::mem::alloc(name); + } + return reconnect(ph, start); +} + +static void destroy(ipc::handle_t h) { + disconnect(h); + ipc::mem::free(info_of(h)); } static std::size_t recv_count(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { - return invalid_value; + return ipc::invalid_value; } return que->conn_count(); } @@ -411,30 +434,31 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s } auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); - if (size > large_msg_limit) { + if (size > ipc::large_msg_limit) { auto dat = apply_storage(que->conn_count(), size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); return try_push(static_cast(size) - - static_cast(data_length), &(dat.first), 0); + static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { - if (!try_push(static_cast(size) - offset - static_cast(data_length), - static_cast(data) + offset, data_length)) { + for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), + static_cast(data) + offset, ipc::data_length)) { return false; } } // if remain > 0, this is the last message fragment std::int32_t remain = static_cast(size) - offset; if (remain > 0) { - if (!try_push(remain - static_cast(data_length), - static_cast(data) + offset, static_cast(remain))) { + if (!try_push(remain - static_cast(ipc::data_length), + static_cast(data) + offset, + static_cast(remain))) { return false; } } @@ -451,8 +475,9 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size if (!que->force_push([](void* p) { auto tmp_msg = static_cast(p); if (tmp_msg->storage_) { - clear_storage(*reinterpret_cast(&tmp_msg->data_), - static_cast(data_length) + tmp_msg->remain_); + clear_storage( + *reinterpret_cast(&tmp_msg->data_), + static_cast(ipc::data_length) + tmp_msg->remain_); } return true; }, info->cc_id_, msg_id, remain, data, size)) { @@ -479,20 +504,22 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: }, h, data, size); } -static buff_t recv(ipc::handle_t h, std::size_t tm) { +static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { auto que = queue_of(h); if (que == nullptr) { ipc::error("fail: recv, queue_of(h) == nullptr\n"); return {}; } - if (que->connect()) { // wouldn't connect twice - info_of(h)->cc_waiter_.broadcast(); + if (!que->connected()) { + // hasn't connected yet, just return. + return {}; } auto& rc = info_of(h)->recv_cache(); while (1) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); @@ -500,18 +527,18 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(data_length) + msg.remain_; + std::size_t remain = static_cast(ipc::data_length) + msg.remain_; // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= data_length) { + if (remain <= ipc::data_length) { return make_cache(msg.data_, remain); } if (msg.storage_) { std::size_t buf_id = *reinterpret_cast(&msg.data_); void * buf = find_storage(buf_id, remain); if (buf != nullptr) { - return buff_t { buf, remain, [](void* ptr, std::size_t size) { + return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { recycle_storage(reinterpret_cast(ptr) - 1, size); }, reinterpret_cast(buf_id + 1) }; } @@ -529,7 +556,7 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); } // has cached before this message else { @@ -543,27 +570,32 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { return buff; } // there are remain datas after this message - cac.append(&(msg.data_), data_length); + cac.append(&(msg.data_), ipc::data_length); } } } -static buff_t try_recv(ipc::handle_t h) { +static ipc::buff_t try_recv(ipc::handle_t h) { return recv(h, 0); } }; // detail_impl template -using policy_t = policy::choose; +using policy_t = ipc::policy::choose; } // internal-linkage namespace ipc { template -ipc::handle_t chan_impl::connect(char const * name, unsigned mode) { - return detail_impl>::connect(name, mode & receiver); +bool chan_impl::connect(ipc::handle_t * ph, char const * name, unsigned mode) { + return detail_impl>::connect(ph, name, mode & receiver); +} + +template +bool chan_impl::reconnect(ipc::handle_t * ph, unsigned mode) { + return detail_impl>::reconnect(ph, mode & receiver); } template @@ -571,6 +603,11 @@ void chan_impl::disconnect(ipc::handle_t h) { detail_impl>::disconnect(h); } +template +void chan_impl::destroy(ipc::handle_t h) { + detail_impl>::destroy(h); +} + template char const * chan_impl::name(ipc::handle_t h) { auto info = detail_impl>::info_of(h); diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index f118478..25bed4f 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -9,8 +9,11 @@ #include #include +#include +#include #include "libipc/def.h" +#include "libipc/waiter_helper.h" #include "libipc/utility/log.h" #include "libipc/platform/detail.h" @@ -176,7 +179,7 @@ public: return SEM_FAILED; } - static handle_t open(char const* name, long count) { + static handle_t open(char const * name, long count) { handle_t sem = ::sem_open(name, O_CREAT, 0666, count); if (sem == SEM_FAILED) { ipc::error("fail sem_open[%d]: %s\n", errno, name); @@ -199,13 +202,19 @@ public: IPC_SEMAPHORE_FUNC_(sem_close, h); } - static bool destroy(char const* name) { + static bool destroy(char const * name) { IPC_SEMAPHORE_FUNC_(sem_unlink, name); } - static bool post(handle_t h) { + static bool post(handle_t h, long count) { if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_post, h); + auto spost = [](handle_t h) { + IPC_SEMAPHORE_FUNC_(sem_post, h); + }; + for (long i = 0; i < count; ++i) { + if (!spost(h)) return false; + } + return true; } static bool wait(handle_t h, std::size_t tm = invalid_value) { @@ -233,19 +242,63 @@ public: #pragma pop_macro("IPC_SEMAPHORE_FUNC_") }; -class waiter_helper { - mutex lock_; - - std::atomic waiting_ { 0 }; - long counter_ = 0; - +class waiter_holder { public: - using handle_t = std::tuple; + using handle_t = std::tuple< + ipc::string, + sem_helper::handle_t /* sema */, + sem_helper::handle_t /* handshake */>; static handle_t invalid() noexcept { - return std::make_tuple(ipc::string{}, sem_helper::invalid(), sem_helper::invalid()); + return std::make_tuple( + ipc::string{}, + sem_helper::invalid(), + sem_helper::invalid()); } +private: + using wait_flags = waiter_helper::wait_flags; + using wait_counter = waiter_helper::wait_counter; + + mutex lock_; + wait_counter cnt_; + + struct contrl { + waiter_holder * me_; + wait_flags * flags_; + handle_t const & h_; + + wait_flags & flags() noexcept { + assert(flags_ != nullptr); + return *flags_; + } + + wait_counter & counter() noexcept { + return me_->cnt_; + } + + auto get_lock() { + return ipc::detail::unique_lock(me_->lock_); + } + + bool sema_wait(std::size_t tm) { + return sem_helper::wait(std::get<1>(h_), tm); + } + + bool sema_post(long count) { + return sem_helper::post(std::get<1>(h_), count); + } + + bool handshake_wait(std::size_t tm) { + return sem_helper::wait(std::get<2>(h_), tm); + } + + bool handshake_post(long count) { + return sem_helper::post(std::get<2>(h_), count); + } + }; + +public: handle_t open_h(ipc::string && name) { auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); if (sem == sem_helper::invalid()) { @@ -278,63 +331,45 @@ public: } template - bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) { - waiting_.fetch_add(1, std::memory_order_release); - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (!std::forward(pred)()) return true; - ++ counter_; - } - bool ret = sem_helper::wait(std::get<1>(h), tm); - waiting_.fetch_sub(1, std::memory_order_release); - ret = sem_helper::post(std::get<2>(h)) && ret; - return ret; + bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { + assert(flags != nullptr); + contrl ctrl { this, flags, h }; + + class non_mutex { + public: + void lock () noexcept {} + void unlock() noexcept {} + } nm; + + return waiter_helper::wait_if(ctrl, nm, std::forward(pred), tm); } bool notify(handle_t const & h) { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_.load(std::memory_order_relaxed) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ > 0) { - ret = sem_helper::post(std::get<1>(h)); - -- counter_; - ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); - } - return ret; + contrl ctrl { this, nullptr, h }; + return waiter_helper::notify(ctrl); } bool broadcast(handle_t const & h) { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_.load(std::memory_order_relaxed) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ > 0) { - for (long i = 0; i < counter_; ++i) { - ret = ret && sem_helper::post(std::get<1>(h)); - } - do { - -- counter_; - ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); - } while (counter_ > 0); - } - return ret; + contrl ctrl { this, nullptr, h }; + return waiter_helper::broadcast(ctrl); + } + + bool quit_waiting(handle_t const & h, wait_flags * flags) { + assert(flags != nullptr); + contrl ctrl { this, flags, h }; + return waiter_helper::quit_waiting(ctrl); } }; class waiter { - waiter_helper helper_; + waiter_holder helper_; std::atomic opened_ { 0 }; public: - using handle_t = waiter_helper::handle_t; + using handle_t = waiter_holder::handle_t; static handle_t invalid() noexcept { - return waiter_helper::invalid(); + return waiter_holder::invalid(); } handle_t open(char const * name) { @@ -357,19 +392,24 @@ public: } template - bool wait_if(handle_t h, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t h, waiter_helper::wait_flags * flags, F && pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; - return helper_.wait_if(h, std::forward(pred), tm); + return helper_.wait_if(h, flags, std::forward(pred), tm); } - void notify(handle_t h) { - if (h == invalid()) return; - helper_.notify(h); + bool notify(handle_t h) { + if (h == invalid()) return false; + return helper_.notify(h); } - void broadcast(handle_t h) { - if (h == invalid()) return; - helper_.broadcast(h); + bool broadcast(handle_t h) { + if (h == invalid()) return false; + return helper_.broadcast(h); + } + + bool quit_waiting(handle_t h, waiter_helper::wait_flags * flags) { + if (h == invalid()) return false; + return helper_.quit_waiting(h, flags); } }; diff --git a/src/libipc/platform/waiter_win.h b/src/libipc/platform/waiter_win.h index ca1f86e..4f3d080 100755 --- a/src/libipc/platform/waiter_win.h +++ b/src/libipc/platform/waiter_win.h @@ -3,11 +3,14 @@ #include #include -#include +#include +#include +#include #include "libipc/rw_lock.h" #include "libipc/pool_alloc.h" #include "libipc/shm.h" +#include "libipc/waiter_helper.h" #include "libipc/utility/log.h" #include "libipc/platform/to_tchar.h" @@ -42,8 +45,9 @@ public: switch ((ret = ::WaitForSingleObject(h_, ms))) { case WAIT_OBJECT_0: return true; - case WAIT_ABANDONED: case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: default: ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); return false; @@ -73,15 +77,51 @@ public: }; class condition { + using wait_flags = waiter_helper::wait_flags; + using wait_counter = waiter_helper::wait_counter; + mutex lock_; semaphore sema_, handshake_; + wait_counter * cnt_ = nullptr; - std::atomic * waiting_ = nullptr; - long * counter_ = nullptr; + struct contrl { + condition * me_; + wait_flags * flags_; + + wait_flags & flags() noexcept { + assert(flags_ != nullptr); + return *flags_; + } + + wait_counter & counter() noexcept { + assert(me_->cnt_ != nullptr); + return *(me_->cnt_); + } + + auto get_lock() { + return ipc::detail::unique_lock(me_->lock_); + } + + bool sema_wait(std::size_t tm) { + return me_->sema_.wait(tm); + } + + bool sema_post(long count) { + return me_->sema_.post(count); + } + + bool handshake_wait(std::size_t tm) { + return me_->handshake_.wait(tm); + } + + bool handshake_post(long count) { + return me_->handshake_.post(count); + } + }; public: friend bool operator==(condition const & c1, condition const & c2) { - return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_); + return c1.cnt_ == c2.cnt_; } friend bool operator!=(condition const & c1, condition const & c2) { @@ -94,12 +134,11 @@ public: mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str()); } - bool open(ipc::string const & name, std::atomic * waiting, long * counter) { + bool open(ipc::string const & name, wait_counter * cnt) { if (lock_ .open("__COND_MTX__" + name) && sema_ .open("__COND_SEM__" + name) && handshake_.open("__COND_HAN__" + name)) { - waiting_ = waiting; - counter_ = counter; + cnt_ = cnt; return true; } return false; @@ -112,58 +151,31 @@ public: } template - bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) { - waiting_->fetch_add(1, std::memory_order_release); - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (!std::forward(pred)()) return true; - ++ *counter_; - } - mtx.unlock(); - bool ret = sema_.wait(tm); - waiting_->fetch_sub(1, std::memory_order_release); - ret = handshake_.post() && ret; - mtx.lock(); - return ret; + bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::size_t tm = invalid_value) { + assert(flags != nullptr); + contrl ctrl { this, flags }; + return waiter_helper::wait_if(ctrl, mtx, std::forward(pred), tm); } bool notify() { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_->load(std::memory_order_relaxed) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (*counter_ > 0) { - ret = sema_.post(); - -- *counter_; - ret = ret && handshake_.wait(default_timeout); - } - return ret; + contrl ctrl { this, nullptr }; + return waiter_helper::notify(ctrl); } bool broadcast() { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_->load(std::memory_order_relaxed) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (*counter_ > 0) { - ret = sema_.post(*counter_); - do { - -- *counter_; - ret = ret && handshake_.wait(default_timeout); - } while (*counter_ > 0); - } - return ret; + contrl ctrl { this, nullptr }; + return waiter_helper::broadcast(ctrl); + } + + bool quit_waiting(wait_flags * flags) { + assert(flags != nullptr); + contrl ctrl { this, flags }; + return waiter_helper::quit_waiting(ctrl); } }; class waiter { - - std::atomic waiting_ { 0 }; - long counter_ = 0; + waiter_helper::wait_counter cnt_; public: using handle_t = condition; @@ -177,7 +189,7 @@ public: return invalid(); } condition cond; - if (cond.open(name, &waiting_, &counter_)) { + if (cond.open(name, &cnt_)) { return cond; } return invalid(); @@ -189,7 +201,7 @@ public: } template - bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t& h, waiter_helper::wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; class non_mutex { @@ -198,17 +210,22 @@ public: void unlock() noexcept {} } nm; - return h.wait_if(nm, std::forward(pred), tm); + return h.wait_if(nm, flags, std::forward(pred), tm); } - void notify(handle_t& h) { - if (h == invalid()) return; - h.notify(); + bool notify(handle_t& h) { + if (h == invalid()) return false; + return h.notify(); } - void broadcast(handle_t& h) { - if (h == invalid()) return; - h.broadcast(); + bool broadcast(handle_t& h) { + if (h == invalid()) return false; + return h.broadcast(); + } + + bool quit_waiting(handle_t& h, waiter_helper::wait_flags * flags) { + if (h == invalid()) return false; + return h.quit_waiting(flags); } }; diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h index 4638c84..553d2e1 100755 --- a/src/libipc/platform/waiter_wrapper.h +++ b/src/libipc/platform/waiter_wrapper.h @@ -21,35 +21,39 @@ using mutex_impl = ipc::detail::mutex; using semaphore_impl = ipc::detail::semaphore; class condition_impl : public ipc::detail::condition { + using base_t = ipc::detail::condition; - ipc::shm::handle wait_h_, cnt_h_; + ipc::shm::handle cnt_h_; + waiter_helper::wait_flags flags_; public: static void remove(char const * name) { - ipc::detail::condition::remove(name); + base_t::remove(name); ipc::string n = name; ipc::shm::remove((n + "__COND_CNT__" ).c_str()); ipc::shm::remove((n + "__COND_WAIT__").c_str()); } - bool open(ipc::string const & name) { - if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic)) && - cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) { - return ipc::detail::condition::open(name, - static_cast *>(wait_h_.get()), - static_cast(cnt_h_.get())); + bool open(char const * name) { + if (cnt_h_ .acquire( + (ipc::string { name } + "__COND_CNT__" ).c_str(), + sizeof(waiter_helper::wait_counter))) { + flags_.is_closed_.store(false, std::memory_order_release); + return base_t::open(name, + static_cast(cnt_h_.get())); } return false; } void close() { - ipc::detail::condition::close(); - cnt_h_ .release(); - wait_h_.release(); + flags_.is_closed_.store(true, std::memory_order_release); + base_t::quit_waiting(&flags_); + base_t::close(); + cnt_h_.release(); } bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { - return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm); + return base_t::wait_if(mtx, &flags_, [] { return true; }, tm); } }; @@ -165,17 +169,11 @@ public: } bool wait(std::size_t tm = invalid_value) { - if (h_ == sem_helper::invalid()) return false; return sem_helper::wait(h_, tm); } bool post(long count) { - if (h_ == sem_helper::invalid()) return false; - bool ret = true; - for (long i = 0; i < count; ++i) { - ret = ret && sem_helper::post(h_); - } - return ret; + return sem_helper::post(h_, count); } }; @@ -194,6 +192,7 @@ public: private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); + waiter_helper::wait_flags flags_; public: waiter_wrapper() = default; @@ -218,20 +217,27 @@ public: bool open(char const * name) { if (w_ == nullptr) return false; close(); + flags_.is_closed_.store(false, std::memory_order_release); h_ = w_->open(name); return valid(); } void close() { if (!valid()) return; + flags_.is_closed_.store(true, std::memory_order_release); + quit_waiting(); w_->close(h_); h_ = waiter_t::invalid(); } + void quit_waiting() { + w_->quit_waiting(h_, &flags_); + } + template - bool wait_if(F&& pred, std::size_t tm = invalid_value) { + bool wait_if(F && pred, std::size_t tm = invalid_value) { if (!valid()) return false; - return w_->wait_if(h_, std::forward(pred), tm); + return w_->wait_if(h_, &flags_, std::forward(pred), tm); } bool notify() { diff --git a/src/libipc/utility/log.h b/src/libipc/utility/log.h index f09b385..bb1dcea 100755 --- a/src/libipc/utility/log.h +++ b/src/libipc/utility/log.h @@ -7,8 +7,8 @@ namespace ipc { namespace detail { template -void print(O out, char const * fmt) { - std::fprintf(out, "%s", fmt); +void print(O out, char const * str) { + std::fprintf(out, "%s", str); } template @@ -27,8 +27,8 @@ void log(char const * fmt, P1&& p1, P&&... params) { ipc::detail::print(stdout, fmt, std::forward(p1), std::forward

(params)...); } -inline void error(char const * fmt) { - ipc::detail::print(stderr, fmt); +inline void error(char const * str) { + ipc::detail::print(stderr, str); } template diff --git a/src/libipc/utility/scope_guard.h b/src/libipc/utility/scope_guard.h new file mode 100644 index 0000000..9f42149 --- /dev/null +++ b/src/libipc/utility/scope_guard.h @@ -0,0 +1,64 @@ +#pragma once + +#include // std::forward, std::move +#include // std::swap +#include // std::decay + +namespace ipc { + +//////////////////////////////////////////////////////////////// +/// Execute guard function when the enclosing scope exits +//////////////////////////////////////////////////////////////// + +template +class scope_guard { + F destructor_; + mutable bool dismiss_; + +public: + template + scope_guard(D && destructor) + : destructor_(std::forward(destructor)) + , dismiss_(false) { + } + + scope_guard(scope_guard&& rhs) + : destructor_(std::move(rhs.destructor_)) + , dismiss_(true) /* dismiss rhs */ { + std::swap(dismiss_, rhs.dismiss_); + } + + ~scope_guard() { + try { do_exit(); } + /** + * In the realm of exceptions, it is fundamental that you can do nothing + * if your "undo/recover" action fails. + */ + catch (...) { /* Do nothing */ } + } + + void swap(scope_guard & rhs) { + std::swap(destructor_, rhs.destructor_); + std::swap(dismiss_ , rhs.dismiss_); + } + + void dismiss() const noexcept { + dismiss_ = true; + } + + void do_exit() { + if (!dismiss_) { + dismiss_ = true; + destructor_(); + } + } +}; + +template +constexpr auto guard(D && destructor) noexcept { + return scope_guard> { + std::forward(destructor) + }; +} + +} // namespace ipc \ No newline at end of file diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h new file mode 100644 index 0000000..46b03a3 --- /dev/null +++ b/src/libipc/waiter_helper.h @@ -0,0 +1,129 @@ +#pragma once + +#include +#include +#include + +#include "libipc/def.h" +#include "libipc/utility/scope_guard.h" + +namespace ipc { +namespace detail { + +class waiter_helper { + + enum : unsigned { + destruct_mask = (std::numeric_limits::max)() >> 1, + destruct_flag = ~destruct_mask + }; + +public: + struct wait_counter { + std::atomic waiting_ { 0 }; + long counter_ = 0; + }; + + struct wait_flags { + std::atomic is_waiting_ { false }; + std::atomic is_closed_ { true }; + }; + + template + static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::size_t tm) { + auto & flags = ctrl.flags(); + if (flags.is_closed_.load(std::memory_order_acquire)) { + return false; + } + + auto & counter = ctrl.counter(); + counter.waiting_.fetch_add(1, std::memory_order_release); + flags.is_waiting_.store(true, std::memory_order_relaxed); + auto finally = ipc::guard([&counter, &flags] { + counter.waiting_.fetch_sub(1, std::memory_order_release); + flags.is_waiting_.store(false, std::memory_order_relaxed); + }); + { + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (!std::forward(pred)()) return true; + counter.counter_ += 1; + } + mtx.unlock(); + + bool ret = false; + do { + bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); + bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); + if (!is_waiting || is_closed) { + ret = false; + break; + } + ret = ctrl.sema_wait(tm); + } while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag); + finally.do_exit(); + ret = ctrl.handshake_post(1) && ret; + + mtx.lock(); + return ret; + } + + template + static bool notify(Ctrl & ctrl) { + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(1); + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } + return ret; + } + + template + static bool broadcast(Ctrl & ctrl) { + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(counter.counter_); + do { + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } while (counter.counter_ > 0); + } + return ret; + } + + template + static bool quit_waiting(Ctrl & ctrl) { + auto & flags = ctrl.flags(); + if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { + return true; + } + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed); + IPC_UNUSED_ auto finally = ipc::guard([&counter] { + counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed); + }); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(counter.counter_); + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } + return ret; + } +}; + +} // namespace detail +} // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 6221802..faae13c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -52,7 +52,7 @@ public: template void test_basic(char const * name) { - using que_t = chan>; + using que_t = chan; rand_buf test1, test2; que_t que1 { name }; @@ -69,7 +69,7 @@ void test_basic(char const * name) { template void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan>; + using que_t = chan; ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt));