#include "ipc.h" #include #include #include #include #include #include "def.h" #include "shm.h" #include "tls_pointer.h" #include "pool_alloc.h" #include "queue.h" #include "policy.h" #include "memory/resource.h" #include "platform/detail.h" namespace { using namespace ipc; using msg_id_t = std::size_t; template = 201703L std::size_t AlignSize = (std::min)(DataSize, alignof(std::size_t))> #else /*__cplusplus < 201703L*/ std::size_t AlignSize = (alignof(std::size_t) < DataSize) ? alignof(std::size_t) : DataSize> #endif/*__cplusplus < 201703L*/ struct msg_t; template struct msg_t<0, AlignSize> { void* que_; msg_id_t id_; int remain_; }; template struct msg_t { msg_t<0, AlignSize> head_; alignas(AlignSize) byte_t data_[DataSize]; }; template struct detail_impl { using queue_t = ipc::queue, Policy>; constexpr static void* head_of(queue_t* que) { return static_cast(que->elems()); } constexpr static queue_t* queue_of(ipc::handle_t h) { return static_cast(h); } static buff_t make_cache(void const * data, std::size_t size) { auto ptr = mem::alloc(size); std::memcpy(ptr, data, size); return { ptr, size, mem::free }; } struct cache_t { std::size_t fill_; buff_t buff_; cache_t(std::size_t f, buff_t&& b) : fill_(f), buff_(std::move(b)) {} void append(void const * data, std::size_t size) { std::memcpy(static_cast(buff_.data()) + fill_, data, size); fill_ += size; } }; static auto& recv_cache() { /* thread_local may have some bugs. See: https://sourceforge.net/p/mingw-w64/bugs/727/ https://sourceforge.net/p/mingw-w64/bugs/527/ https://github.com/Alexpux/MINGW-packages/issues/2519 https://github.com/ChaiScript/ChaiScript/issues/402 https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 */ static tls::pointer> rc; return *rc.create(); } static auto& queues_cache() { static tls::pointer> qc; return *qc.create(); } /* API implementations */ static ipc::handle_t connect(char const * name) { return mem::alloc(name); } static void disconnect(ipc::handle_t h) { queue_t* que = queue_of(h); if (que == nullptr) { return; } que->disconnect(); // needn't to detach, cause it will be deleted soon. mem::free(que); } static std::size_t recv_count(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { return invalid_value; } return que->conn_count(); } static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) { auto que = queue_of(h); if (que == nullptr) { return false; } return que->wait_for_connect(r_count); } static bool send(ipc::handle_t h, void const * data, std::size_t size) { if (data == nullptr) { return false; } if (size == 0) { return false; } auto que = queue_of(h); if (que == nullptr) { return false; } // calc a new message id auto msg_id = ipc::detail::calc_unique_id(); // push message fragment int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { msg_t msg { { que, msg_id, static_cast(size) - offset - static_cast(data_length) }, {} }; std::memcpy(msg.data_, static_cast(data) + offset, data_length); if (!que->push(msg)) return false; } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; if (remain > 0) { msg_t msg { { que, msg_id, remain - static_cast(data_length) }, {} }; std::memcpy(msg.data_, static_cast(data) + offset, static_cast(remain)); if (!que->push(msg)) return false; } return true; } static buff_t recv(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) return {}; que->connect(); // wouldn't connect twice auto& rc = recv_cache(); while (1) { // pop a new message auto msg = que->pop(); if (msg.head_.que_ == nullptr) return {}; if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length std::size_t remain = static_cast( static_cast(data_length) + msg.head_.remain_); // find cache with msg.head_.id_ auto cac_it = rc.find(msg.head_.id_); if (cac_it == rc.end()) { if (remain <= data_length) { return make_cache(msg.data_, remain); } // cache the first message fragment else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.head_.remain_ <= 0) { cac.append(msg.data_, remain); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); return buff; } // there are remain datas after this message cac.append(msg.data_, data_length); } } } }; // detail_impl template using policy_t = policy::choose; } // internal-linkage namespace ipc { namespace detail { std::size_t calc_unique_id() { static shm::handle g_shm { "__GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); } } // namespace detail template ipc::handle_t chan_impl::connect(char const * name) { return detail_impl>::connect(name); } template void chan_impl::disconnect(ipc::handle_t h) { detail_impl>::disconnect(h); } template std::size_t chan_impl::recv_count(ipc::handle_t h) { return detail_impl>::recv_count(h); } template bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count) { return detail_impl>::wait_for_recv(h, r_count); } template bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size) { return detail_impl>::send(h, data, size); } template buff_t chan_impl::recv(ipc::handle_t h) { return detail_impl>::recv(h); } template struct chan_impl>; template struct chan_impl>; template struct chan_impl>; template struct chan_impl>; template struct chan_impl>; } // namespace ipc