mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
243 lines
6.8 KiB
C++
243 lines
6.8 KiB
C++
#include "ipc.h"
|
|
|
|
#include <type_traits>
|
|
#include <cstring>
|
|
#include <algorithm>
|
|
#include <utility>
|
|
#include <atomic>
|
|
|
|
#include "def.h"
|
|
#include "circ_queue.h"
|
|
#include "shm.h"
|
|
#include "tls_pointer.h"
|
|
|
|
#include "memory/resource.hpp"
|
|
|
|
namespace {
|
|
|
|
using namespace ipc;
|
|
using data_t = byte_t[data_length];
|
|
|
|
#pragma pack(1)
|
|
struct msg_t {
|
|
std::size_t id_;
|
|
int remain_;
|
|
data_t data_;
|
|
};
|
|
#pragma pack()
|
|
|
|
using queue_t = circ::queue<msg_t>;
|
|
using msg_id_t = decltype(msg_t::id_);
|
|
|
|
struct shm_info_t {
|
|
queue_t::array_t elems_; // the circ_elem_array in shm
|
|
};
|
|
|
|
inline auto acc_of(queue_t*) {
|
|
static shm::handle g_shm { "GLOBAL_ACC_STORAGE__", sizeof(std::atomic<msg_id_t>) };
|
|
return static_cast<std::atomic<msg_id_t>*>(g_shm.get());
|
|
}
|
|
|
|
constexpr void* head_of(queue_t* que) {
|
|
return static_cast<void*>(que->elems());
|
|
}
|
|
|
|
constexpr queue_t* queue_of(handle_t h) {
|
|
return static_cast<queue_t*>(h);
|
|
}
|
|
|
|
using cache_t = mem::vector<byte_t>;
|
|
|
|
template <std::size_t N>
|
|
cache_t make_cache(byte_t const (& data)[N]) {
|
|
return {
|
|
static_cast<buff_t::value_type const *>(data),
|
|
static_cast<buff_t::value_type const *>(data) + N
|
|
};
|
|
}
|
|
|
|
template <typename T>
|
|
using remove_cv_ref_t = std::remove_cv_t<std::remove_reference_t<T>>;
|
|
|
|
template <typename Cache>
|
|
constexpr auto to_buff(Cache&& cac) -> Requires<std::is_same<remove_cv_ref_t<Cache>, buff_t>::value, Cache&&> {
|
|
return std::forward<Cache>(cac);
|
|
}
|
|
|
|
template <typename Cache>
|
|
auto to_buff(Cache&& cac) -> Requires<!std::is_same<remove_cv_ref_t<Cache>, buff_t>::value, buff_t> {
|
|
return make_buff(cac.data(), cac.size());
|
|
}
|
|
|
|
inline auto& recv_cache() {
|
|
/*
|
|
<Remarks> thread_local may have some bugs.
|
|
See: https://sourceforge.net/p/mingw-w64/bugs/727/
|
|
https://sourceforge.net/p/mingw-w64/bugs/527/
|
|
https://github.com/Alexpux/MINGW-packages/issues/2519
|
|
https://github.com/ChaiScript/ChaiScript/issues/402
|
|
https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
|
|
https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
|
|
*/
|
|
static tls::pointer<mem::unordered_map<msg_id_t, cache_t>> rc;
|
|
return *rc.create();
|
|
}
|
|
|
|
inline auto& queues_cache() {
|
|
static tls::pointer<mem::vector<queue_t*>> qc;
|
|
return *qc.create();
|
|
}
|
|
|
|
} // internal-linkage
|
|
|
|
namespace ipc {
|
|
|
|
buff_t make_buff(void const * data, std::size_t size) {
|
|
return {
|
|
static_cast<buff_t::value_type const *>(data),
|
|
static_cast<buff_t::value_type const *>(data) + size
|
|
};
|
|
}
|
|
|
|
handle_t connect(char const * name) {
|
|
auto mem = shm::acquire(name, sizeof(shm_info_t));
|
|
if (mem == nullptr) {
|
|
return nullptr;
|
|
}
|
|
return new queue_t { &(static_cast<shm_info_t*>(mem)->elems_) };
|
|
}
|
|
|
|
void disconnect(handle_t h) {
|
|
queue_t* que = queue_of(h);
|
|
if (que == nullptr) {
|
|
return;
|
|
}
|
|
que->disconnect(); // needn't to detach, cause it will be deleted soon.
|
|
shm::release(head_of(que), sizeof(shm_info_t));
|
|
delete que;
|
|
}
|
|
|
|
std::size_t recv_count(handle_t h) {
|
|
auto que = queue_of(h);
|
|
if (que == nullptr) {
|
|
return invalid_value;
|
|
}
|
|
return que->conn_count();
|
|
}
|
|
|
|
void clear_recv(handle_t h) {
|
|
auto* head = head_of(queue_of(h));
|
|
if (head == nullptr) {
|
|
return;
|
|
}
|
|
std::memset(head, 0, sizeof(shm_info_t));
|
|
}
|
|
|
|
void clear_recv(char const * name) {
|
|
auto h = ipc::connect(name);
|
|
ipc::clear_recv(h);
|
|
ipc::disconnect(h);
|
|
}
|
|
|
|
bool send(handle_t h, void const * data, std::size_t size) {
|
|
if (data == nullptr) {
|
|
return false;
|
|
}
|
|
if (size == 0) {
|
|
return false;
|
|
}
|
|
auto que = queue_of(h);
|
|
if (que == nullptr) {
|
|
return false;
|
|
}
|
|
// calc a new message id, start with 1
|
|
auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed) + 1;
|
|
// push message fragment, one fragment size is data_length
|
|
int offset = 0;
|
|
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
|
msg_t msg {
|
|
msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length), {}
|
|
};
|
|
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length);
|
|
if (!que->push(msg)) return false;
|
|
}
|
|
// if remain > 0, this is the last message fragment
|
|
int remain = static_cast<int>(size) - offset;
|
|
if (remain > 0) {
|
|
msg_t msg {
|
|
msg_id, remain - static_cast<int>(data_length), {}
|
|
};
|
|
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
|
|
static_cast<std::size_t>(remain));
|
|
if (!que->push(msg)) return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template <typename F>
|
|
buff_t multi_recv(F&& upd) {
|
|
auto& rc = recv_cache();
|
|
while(1) {
|
|
// pop a new message
|
|
auto msg = queue_t::pop(queue_t::multi_wait_for(upd));
|
|
if (msg.id_ == 0) return {};
|
|
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
|
std::size_t remain = static_cast<std::size_t>(
|
|
static_cast<int>(data_length) + msg.remain_);
|
|
// find cache with msg.id_
|
|
auto cache_it = rc.find(msg.id_);
|
|
if (cache_it == rc.end()) {
|
|
if (remain <= data_length) {
|
|
return make_buff(msg.data_, remain);
|
|
}
|
|
// cache the first message fragment
|
|
else rc.emplace(msg.id_, make_cache(msg.data_));
|
|
}
|
|
// has cached before this message
|
|
else {
|
|
auto& cache = cache_it->second;
|
|
// this is the last message fragment
|
|
if (msg.remain_ <= 0) {
|
|
cache.insert(cache.end(), msg.data_, msg.data_ + remain);
|
|
// finish this message, erase it from cache
|
|
auto cac = std::move(cache);
|
|
rc.erase(cache_it);
|
|
return to_buff(std::move(cac));
|
|
}
|
|
// there are remain datas after this message
|
|
cache.insert(cache.end(), msg.data_, msg.data_ + data_length);
|
|
}
|
|
}
|
|
}
|
|
|
|
buff_t recv(handle_t const * hs, std::size_t size) {
|
|
auto& q_arr = queues_cache();
|
|
q_arr.clear(); // make the size to 0
|
|
for (size_t i = 0; i < size; ++i) {
|
|
auto que = queue_of(hs[i]);
|
|
if (que == nullptr) continue;
|
|
que->connect(); // wouldn't connect twice
|
|
q_arr.push_back(que);
|
|
}
|
|
if (q_arr.empty()) {
|
|
return {};
|
|
}
|
|
return multi_recv([&] {
|
|
return std::make_tuple(q_arr.data(), q_arr.size());
|
|
});
|
|
}
|
|
|
|
buff_t recv(handle_t h) {
|
|
auto que = queue_of(h);
|
|
if (que == nullptr) return {};
|
|
que->connect(); // wouldn't connect twice
|
|
return multi_recv([&que] {
|
|
return std::make_tuple(&que, 1);
|
|
});
|
|
}
|
|
|
|
} // namespace ipc
|
|
|
|
#include "route.inc"
|
|
#include "channel.inc"
|