mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
fix bugs; modify shm interfaces
This commit is contained in:
parent
79985df800
commit
d223e3abb8
@ -28,7 +28,8 @@ using uint_t = typename uint<N>::type;
|
|||||||
|
|
||||||
enum : std::size_t {
|
enum : std::size_t {
|
||||||
invalid_value = (std::numeric_limits<std::size_t>::max)(),
|
invalid_value = (std::numeric_limits<std::size_t>::max)(),
|
||||||
data_length = 64
|
data_length = 64,
|
||||||
|
name_length = 64
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class relat { // multiplicity of the relationship
|
enum class relat { // multiplicity of the relationship
|
||||||
|
|||||||
@ -7,8 +7,11 @@
|
|||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace shm {
|
namespace shm {
|
||||||
|
|
||||||
IPC_EXPORT void* acquire(char const * name, std::size_t size);
|
using id_t = void*;
|
||||||
IPC_EXPORT void release(void* mem, std::size_t size);
|
|
||||||
|
IPC_EXPORT id_t acquire(char const * name, std::size_t size);
|
||||||
|
IPC_EXPORT void * to_mem (id_t id);
|
||||||
|
IPC_EXPORT void release(id_t id, void * mem, std::size_t size);
|
||||||
|
|
||||||
class IPC_EXPORT handle {
|
class IPC_EXPORT handle {
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -13,12 +13,12 @@ namespace ipc {
|
|||||||
namespace circ {
|
namespace circ {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
template <typename Policy, std::size_t DataSize>
|
template <typename Policy, std::size_t DataSize, std::size_t AlignSize>
|
||||||
class elem_array {
|
class elem_array {
|
||||||
public:
|
public:
|
||||||
using policy_t = Policy;
|
using policy_t = Policy;
|
||||||
using cursor_t = decltype(std::declval<policy_t>().cursor());
|
using cursor_t = decltype(std::declval<policy_t>().cursor());
|
||||||
using elem_t = typename policy_t::template elem_t<DataSize>;
|
using elem_t = typename policy_t::template elem_t<DataSize, AlignSize>;
|
||||||
|
|
||||||
enum : std::size_t {
|
enum : std::size_t {
|
||||||
data_size = DataSize,
|
data_size = DataSize,
|
||||||
@ -50,11 +50,11 @@ public:
|
|||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
template <typename Policy, std::size_t DataSize>
|
template <typename Policy, std::size_t DataSize, std::size_t AlignSize>
|
||||||
class elem_array : public ipc::circ::conn_head {
|
class elem_array : public ipc::circ::conn_head {
|
||||||
public:
|
public:
|
||||||
using base_t = ipc::circ::conn_head;
|
using base_t = ipc::circ::conn_head;
|
||||||
using array_t = detail::elem_array<Policy, DataSize>;
|
using array_t = detail::elem_array<Policy, DataSize, AlignSize>;
|
||||||
using policy_t = typename array_t::policy_t;
|
using policy_t = typename array_t::policy_t;
|
||||||
using cursor_t = typename array_t::cursor_t;
|
using cursor_t = typename array_t::cursor_t;
|
||||||
using elem_t = typename array_t::elem_t;
|
using elem_t = typename array_t::elem_t;
|
||||||
|
|||||||
68
src/ipc.cpp
68
src/ipc.cpp
@ -5,6 +5,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "shm.h"
|
#include "shm.h"
|
||||||
@ -38,8 +39,16 @@ struct msg_t<0, AlignSize> {
|
|||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct msg_t {
|
struct msg_t {
|
||||||
msg_t<0, AlignSize> head_;
|
msg_t<0, AlignSize> head_ {};
|
||||||
alignas(AlignSize) byte_t data_[DataSize];
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
|
|
||||||
|
msg_t() = default;
|
||||||
|
msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) {
|
||||||
|
head_.que_ = q;
|
||||||
|
head_.id_ = i;
|
||||||
|
head_.remain_ = r;
|
||||||
|
std::memcpy(&data_, d, s);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
buff_t make_cache(void const * data, std::size_t size) {
|
buff_t make_cache(void const * data, std::size_t size) {
|
||||||
@ -62,20 +71,6 @@ struct cache_t {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto& recv_cache() {
|
|
||||||
/*
|
|
||||||
<Remarks> thread_local may have some bugs.
|
|
||||||
See: https://sourceforge.net/p/mingw-w64/bugs/727/
|
|
||||||
https://sourceforge.net/p/mingw-w64/bugs/527/
|
|
||||||
https://github.com/Alexpux/MINGW-packages/issues/2519
|
|
||||||
https://github.com/ChaiScript/ChaiScript/issues/402
|
|
||||||
https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
|
|
||||||
https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
|
|
||||||
*/
|
|
||||||
static tls::pointer<mem::unordered_map<msg_id_t, cache_t>> rc;
|
|
||||||
return *rc.create();
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Policy>
|
||||||
struct detail_impl {
|
struct detail_impl {
|
||||||
|
|
||||||
@ -94,6 +89,20 @@ static auto& queues_cache() {
|
|||||||
return *qc.create();
|
return *qc.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static auto& recv_cache() {
|
||||||
|
/*
|
||||||
|
<Remarks> thread_local may have some bugs.
|
||||||
|
See: https://sourceforge.net/p/mingw-w64/bugs/727/
|
||||||
|
https://sourceforge.net/p/mingw-w64/bugs/527/
|
||||||
|
https://github.com/Alexpux/MINGW-packages/issues/2519
|
||||||
|
https://github.com/ChaiScript/ChaiScript/issues/402
|
||||||
|
https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
|
||||||
|
https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
|
||||||
|
*/
|
||||||
|
static tls::pointer<mem::unordered_map<msg_id_t, cache_t>> rc;
|
||||||
|
return *rc.create();
|
||||||
|
}
|
||||||
|
|
||||||
/* API implementations */
|
/* API implementations */
|
||||||
|
|
||||||
static ipc::handle_t connect(char const * name) {
|
static ipc::handle_t connect(char const * name) {
|
||||||
@ -141,21 +150,18 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
|
|||||||
// push message fragment
|
// push message fragment
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
||||||
msg_t<data_length> msg {
|
while (!que->push(que, msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length),
|
||||||
{ que, msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length) }, {}
|
static_cast<byte_t const *>(data) + offset, data_length)) {
|
||||||
};
|
std::this_thread::yield();
|
||||||
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset, data_length);
|
}
|
||||||
while (!que->push(msg)) std::this_thread::yield();
|
|
||||||
}
|
}
|
||||||
// if remain > 0, this is the last message fragment
|
// if remain > 0, this is the last message fragment
|
||||||
int remain = static_cast<int>(size) - offset;
|
int remain = static_cast<int>(size) - offset;
|
||||||
if (remain > 0) {
|
if (remain > 0) {
|
||||||
msg_t<data_length> msg {
|
while (!que->push(que, msg_id, remain - static_cast<int>(data_length),
|
||||||
{ que, msg_id, remain - static_cast<int>(data_length) }, {}
|
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
|
||||||
};
|
std::this_thread::yield();
|
||||||
std::memcpy(msg.data_, static_cast<byte_t const *>(data) + offset,
|
}
|
||||||
static_cast<std::size_t>(remain));
|
|
||||||
while (!que->push(msg)) std::this_thread::yield();
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -177,24 +183,24 @@ static buff_t recv(ipc::handle_t h) {
|
|||||||
auto cac_it = rc.find(msg.head_.id_);
|
auto cac_it = rc.find(msg.head_.id_);
|
||||||
if (cac_it == rc.end()) {
|
if (cac_it == rc.end()) {
|
||||||
if (remain <= data_length) {
|
if (remain <= data_length) {
|
||||||
return make_cache(msg.data_, remain);
|
return make_cache(&(msg.data_), remain);
|
||||||
}
|
}
|
||||||
// cache the first message fragment
|
// cache the first message fragment
|
||||||
else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) });
|
else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) });
|
||||||
}
|
}
|
||||||
// has cached before this message
|
// has cached before this message
|
||||||
else {
|
else {
|
||||||
auto& cac = cac_it->second;
|
auto& cac = cac_it->second;
|
||||||
// this is the last message fragment
|
// this is the last message fragment
|
||||||
if (msg.head_.remain_ <= 0) {
|
if (msg.head_.remain_ <= 0) {
|
||||||
cac.append(msg.data_, remain);
|
cac.append(&(msg.data_), remain);
|
||||||
// finish this message, erase it from cache
|
// finish this message, erase it from cache
|
||||||
auto buff = std::move(cac.buff_);
|
auto buff = std::move(cac.buff_);
|
||||||
rc.erase(cac_it);
|
rc.erase(cac_it);
|
||||||
return buff;
|
return buff;
|
||||||
}
|
}
|
||||||
// there are remain datas after this message
|
// there are remain datas after this message
|
||||||
cac.append(msg.data_, data_length);
|
cac.append(&(msg.data_), data_length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,7 +10,7 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <mutex>
|
#include <cstring>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
@ -18,22 +18,21 @@
|
|||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using acc_t = std::atomic_size_t;
|
struct info_t {
|
||||||
|
std::atomic_size_t acc_;
|
||||||
|
char name_[ipc::name_length];
|
||||||
|
};
|
||||||
|
|
||||||
constexpr acc_t* acc_of(void* mem) {
|
constexpr std::size_t calc_size(std::size_t size) {
|
||||||
return static_cast<acc_t*>(mem);
|
return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr void* mem_of(void* mem) {
|
inline auto& acc_of(void* mem, std::size_t size) {
|
||||||
return static_cast<acc_t*>(mem) - 1;
|
return reinterpret_cast<info_t*>(static_cast<ipc::byte_t*>(mem) + size - sizeof(info_t))->acc_;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline auto* m2h() {
|
inline auto& str_of(void* mem, std::size_t size) {
|
||||||
static struct {
|
return reinterpret_cast<info_t*>(static_cast<ipc::byte_t*>(mem) + size - sizeof(info_t))->name_;
|
||||||
std::mutex lc_;
|
|
||||||
ipc::mem::unordered_map<void*, std::string> cache_;
|
|
||||||
} m2h_;
|
|
||||||
return &m2h_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
@ -41,11 +40,15 @@ inline auto* m2h() {
|
|||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace shm {
|
namespace shm {
|
||||||
|
|
||||||
void* acquire(char const * name, std::size_t size) {
|
id_t acquire(char const * name, std::size_t size) {
|
||||||
if (name == nullptr || name[0] == '\0' || size == 0) {
|
if (name == nullptr || name[0] == '\0' || size == 0) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
std::string op_name = std::string{"__IPC_SHM__"} + name;
|
std::string op_name = std::string{"__IPC_SHM__"} + name;
|
||||||
|
if (op_name.size() >= ipc::name_length) {
|
||||||
|
ipc::error("name is too long!: [%d]%s\n", static_cast<int>(op_name.size()), op_name.c_str());
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
int fd = ::shm_open(op_name.c_str(), O_CREAT | O_RDWR,
|
int fd = ::shm_open(op_name.c_str(), O_CREAT | O_RDWR,
|
||||||
S_IRUSR | S_IWUSR |
|
S_IRUSR | S_IWUSR |
|
||||||
S_IRGRP | S_IWGRP |
|
S_IRGRP | S_IWGRP |
|
||||||
@ -54,7 +57,7 @@ void* acquire(char const * name, std::size_t size) {
|
|||||||
ipc::error("fail shm_open[%d]: %s\n", errno, name);
|
ipc::error("fail shm_open[%d]: %s\n", errno, name);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
size += sizeof(acc_t);
|
size = calc_size(size);
|
||||||
if (::ftruncate(fd, static_cast<off_t>(size)) != 0) {
|
if (::ftruncate(fd, static_cast<off_t>(size)) != 0) {
|
||||||
ipc::error("fail ftruncate[%d]: %s\n", errno, name);
|
ipc::error("fail ftruncate[%d]: %s\n", errno, name);
|
||||||
::close(fd);
|
::close(fd);
|
||||||
@ -68,33 +71,31 @@ void* acquire(char const * name, std::size_t size) {
|
|||||||
::shm_unlink(op_name.c_str());
|
::shm_unlink(op_name.c_str());
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
auto acc = acc_of(mem);
|
if (acc_of(mem, size).fetch_add(1, std::memory_order_release) == 0) {
|
||||||
acc->fetch_add(1, std::memory_order_release);
|
std::memcpy(&str_of(mem, size), op_name.c_str(), op_name.size());
|
||||||
{
|
|
||||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_);
|
|
||||||
m2h()->cache_.emplace(++acc, std::move(op_name));
|
|
||||||
}
|
}
|
||||||
return acc;
|
return static_cast<id_t>(mem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void release(void* mem, std::size_t size) {
|
void * to_mem(id_t id) {
|
||||||
|
return static_cast<void *>(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void release(id_t id, void * mem, std::size_t size) {
|
||||||
if (mem == nullptr) {
|
if (mem == nullptr) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_);
|
if (mem != to_mem(id)) {
|
||||||
auto& cc = m2h()->cache_;
|
|
||||||
auto it = cc.find(mem);
|
|
||||||
if (it == cc.end()) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mem = mem_of(mem);
|
size = calc_size(size);
|
||||||
size += sizeof(acc_t);
|
if (acc_of(mem, size).fetch_sub(1, std::memory_order_acquire) == 1) {
|
||||||
if (acc_of(mem)->fetch_sub(1, std::memory_order_acquire) == 1) {
|
char name[ipc::name_length] = {};
|
||||||
|
std::memcpy(name, &str_of(mem, size), sizeof(name));
|
||||||
::munmap(mem, size);
|
::munmap(mem, size);
|
||||||
::shm_unlink(it->second.c_str());
|
::shm_unlink(name);
|
||||||
}
|
}
|
||||||
else ::munmap(mem, size);
|
else ::munmap(mem, size);
|
||||||
cc.erase(it);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace shm
|
} // namespace shm
|
||||||
|
|||||||
@ -4,29 +4,16 @@
|
|||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <mutex>
|
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
#include "memory/resource.h"
|
#include "memory/resource.h"
|
||||||
#include "platform/to_tchar.h"
|
#include "platform/to_tchar.h"
|
||||||
|
|
||||||
namespace {
|
|
||||||
|
|
||||||
inline auto* m2h() {
|
|
||||||
static struct {
|
|
||||||
std::mutex lc_;
|
|
||||||
ipc::mem::unordered_map<void*, HANDLE> cache_;
|
|
||||||
} m2h_;
|
|
||||||
return &m2h_;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // internal-linkage
|
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace shm {
|
namespace shm {
|
||||||
|
|
||||||
void* acquire(char const * name, std::size_t size) {
|
id_t acquire(char const * name, std::size_t size) {
|
||||||
if (name == nullptr || name[0] == '\0' || size == 0) {
|
if (name == nullptr || name[0] == '\0' || size == 0) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
@ -38,32 +25,25 @@ void* acquire(char const * name, std::size_t size) {
|
|||||||
ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
return static_cast<id_t>(h);
|
||||||
|
}
|
||||||
|
|
||||||
|
void * to_mem(id_t id) {
|
||||||
|
if (id == nullptr) return nullptr;
|
||||||
|
HANDLE h = static_cast<HANDLE>(id);
|
||||||
LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
|
LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
|
||||||
if (mem == NULL) {
|
if (mem == NULL) {
|
||||||
ipc::error("fail MapViewOfFile[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
ipc::error("fail MapViewOfFile[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
||||||
::CloseHandle(h);
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
{
|
return static_cast<void *>(mem);
|
||||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_);
|
|
||||||
m2h()->cache_.emplace(mem, h);
|
|
||||||
}
|
|
||||||
return mem;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void release(void* mem, std::size_t /*size*/) {
|
void release(id_t id, void * mem, std::size_t /*size*/) {
|
||||||
if (mem == nullptr) {
|
if (id == nullptr) return;
|
||||||
return;
|
if (mem == nullptr) return;
|
||||||
}
|
::UnmapViewOfFile(static_cast<LPVOID>(mem));
|
||||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_);
|
::CloseHandle(static_cast<HANDLE>(id));
|
||||||
auto& cc = m2h()->cache_;
|
|
||||||
auto it = cc.find(mem);
|
|
||||||
if (it == cc.end()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
::UnmapViewOfFile(mem);
|
|
||||||
::CloseHandle(it->second);
|
|
||||||
cc.erase(it);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace shm
|
} // namespace shm
|
||||||
|
|||||||
@ -10,14 +10,13 @@
|
|||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace policy {
|
namespace policy {
|
||||||
|
|
||||||
template <template <typename, std::size_t...> class Elems,
|
template <template <typename, std::size_t...> class Elems, typename Flag>
|
||||||
typename Flag>
|
|
||||||
struct choose;
|
struct choose;
|
||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
struct choose<circ::elem_array, Flag> {
|
struct choose<circ::elem_array, Flag> {
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize>;
|
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize, AlignSize>;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace policy
|
} // namespace policy
|
||||||
|
|||||||
@ -3,6 +3,7 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
|
||||||
@ -20,9 +21,9 @@ struct prod_cons_impl;
|
|||||||
template <>
|
template <>
|
||||||
struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
byte_t data_[DataSize] {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
};
|
};
|
||||||
|
|
||||||
alignas(circ::cache_line_size) std::atomic<circ::u2_t> rd_; // read index
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> rd_; // read index
|
||||||
@ -32,8 +33,8 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool push(W* /*wrapper*/, F&& f, E<DataSize>* elems) {
|
bool push(W* /*wrapper*/, F&& f, E* elems) {
|
||||||
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
||||||
return false; // full
|
return false; // full
|
||||||
@ -43,8 +44,8 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DataSize>* elems) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) {
|
||||||
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
||||||
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
@ -59,9 +60,9 @@ template <>
|
|||||||
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
||||||
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DataSize>* elems) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
||||||
byte_t buff[DataSize];
|
byte_t buff[DS];
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
if (circ::index_of(cur_rd) ==
|
if (circ::index_of(cur_rd) ==
|
||||||
@ -84,16 +85,16 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
|
|
||||||
using flag_t = std::uint64_t;
|
using flag_t = std::uint64_t;
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
byte_t data_[DataSize] {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
|
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
|
||||||
};
|
};
|
||||||
|
|
||||||
alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool push(W* /*wrapper*/, F&& f, E<DataSize>* elems) {
|
bool push(W* /*wrapper*/, F&& f, E* elems) {
|
||||||
circ::u2_t cur_ct, nxt_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
cur_ct = ct_.load(std::memory_order_relaxed);
|
cur_ct = ct_.load(std::memory_order_relaxed);
|
||||||
@ -129,9 +130,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DataSize>* elems) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
||||||
byte_t buff[DataSize];
|
byte_t buff[DS];
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
auto cur_wt = wt_.load(std::memory_order_acquire);
|
auto cur_wt = wt_.load(std::memory_order_acquire);
|
||||||
@ -165,9 +166,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
|
|
||||||
using rc_t = std::size_t;
|
using rc_t = std::size_t;
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
byte_t data_[DataSize] {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
std::atomic<rc_t> rc_ { 0 }; // read-counter
|
std::atomic<rc_t> rc_ { 0 }; // read-counter
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -177,8 +178,8 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
return wt_.load(std::memory_order_acquire);
|
return wt_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool push(W* wrapper, F&& f, E<DataSize>* elems) {
|
bool push(W* wrapper, F&& f, E* elems) {
|
||||||
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
|
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
|
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
|
||||||
@ -193,8 +194,8 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>* elems) {
|
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) {
|
||||||
if (cur == cursor()) return false; // acquire
|
if (cur == cursor()) return false; // acquire
|
||||||
auto* el = elems + circ::index_of(cur++);
|
auto* el = elems + circ::index_of(cur++);
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
@ -223,9 +224,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
|
|||||||
rc_incr = 0x0000000100000000ull
|
rc_incr = 0x0000000100000000ull
|
||||||
};
|
};
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
byte_t data_[DataSize] {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
std::atomic<rc_t > rc_ { 0 }; // read-counter
|
std::atomic<rc_t > rc_ { 0 }; // read-counter
|
||||||
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
|
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
|
||||||
};
|
};
|
||||||
@ -236,9 +237,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
|
|||||||
return ct_.load(std::memory_order_acquire);
|
return ct_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
template <typename W, typename F, typename E>
|
||||||
bool push(W* wrapper, F&& f, E<DataSize>* elems) {
|
bool push(W* wrapper, F&& f, E* elems) {
|
||||||
E<DataSize>* el;
|
E* el;
|
||||||
circ::u2_t cur_ct, nxt_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cc = wrapper->conn_count(std::memory_order_relaxed);
|
auto cc = wrapper->conn_count(std::memory_order_relaxed);
|
||||||
@ -269,8 +270,8 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize, std::size_t N>
|
template <typename W, typename F, typename E, std::size_t N>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>(& elems)[N]) {
|
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E(& elems)[N]) {
|
||||||
auto* el = elems + circ::index_of(cur);
|
auto* el = elems + circ::index_of(cur);
|
||||||
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
||||||
if (cur_fl != ~static_cast<flag_t>(cur)) {
|
if (cur_fl != ~static_cast<flag_t>(cur)) {
|
||||||
|
|||||||
21
src/queue.h
21
src/queue.h
@ -27,16 +27,18 @@ protected:
|
|||||||
ipc::detail::waiter_wrapper cc_waiter_;
|
ipc::detail::waiter_wrapper cc_waiter_;
|
||||||
|
|
||||||
bool connected_ = false;
|
bool connected_ = false;
|
||||||
bool dismiss_ = true;
|
shm::handle elems_h_;
|
||||||
|
|
||||||
template <typename Elems>
|
template <typename Elems>
|
||||||
Elems* open(char const * name) {
|
Elems* open(char const * name) {
|
||||||
auto elems = static_cast<Elems*>(shm::acquire(name, sizeof(Elems)));
|
if (!elems_h_.acquire(name, sizeof(Elems))) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
auto elems = static_cast<Elems*>(elems_h_.get());
|
||||||
if (elems == nullptr) {
|
if (elems == nullptr) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
elems->init();
|
elems->init();
|
||||||
dismiss_ = false;
|
|
||||||
return elems;
|
return elems;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,12 +65,9 @@ protected:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename Elems>
|
template <typename Elems>
|
||||||
void close(Elems* elems) {
|
void close(Elems* /*elems*/) {
|
||||||
close();
|
close();
|
||||||
if (!dismiss_ && (elems != nullptr)) {
|
elems_h_.release();
|
||||||
shm::release(elems, sizeof(Elems));
|
|
||||||
}
|
|
||||||
dismiss_ = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -131,7 +130,7 @@ public:
|
|||||||
using policy_t = typename elems_t::policy_t;
|
using policy_t = typename elems_t::policy_t;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
elems_t* elems_ = nullptr;
|
elems_t * elems_ = nullptr;
|
||||||
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
|
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -243,8 +242,8 @@ public:
|
|||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
template <typename T, typename Policy>
|
template <typename T, typename Policy>
|
||||||
class queue : public detail::queue_base<typename Policy::template elems_t<sizeof(T)>> {
|
class queue : public detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>> {
|
||||||
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T)>>;
|
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using base_t::base_t;
|
using base_t::base_t;
|
||||||
|
|||||||
@ -10,6 +10,7 @@ namespace shm {
|
|||||||
|
|
||||||
class handle::handle_ : public pimpl<handle_> {
|
class handle::handle_ : public pimpl<handle_> {
|
||||||
public:
|
public:
|
||||||
|
shm::id_t id_ = nullptr;
|
||||||
void* m_ = nullptr;
|
void* m_ = nullptr;
|
||||||
|
|
||||||
std::string n_;
|
std::string n_;
|
||||||
@ -58,14 +59,16 @@ char const * handle::name() const {
|
|||||||
|
|
||||||
bool handle::acquire(char const * name, std::size_t size) {
|
bool handle::acquire(char const * name, std::size_t size) {
|
||||||
release();
|
release();
|
||||||
impl(p_)->m_ = shm::acquire((impl(p_)->n_ = name).c_str(),
|
impl(p_)->id_ = shm::acquire((impl(p_)->n_ = name).c_str(),
|
||||||
impl(p_)->s_ = size);
|
impl(p_)->s_ = size);
|
||||||
|
impl(p_)->m_ = shm::to_mem (impl(p_)->id_);
|
||||||
return valid();
|
return valid();
|
||||||
}
|
}
|
||||||
|
|
||||||
void handle::release() {
|
void handle::release() {
|
||||||
if (!valid()) return;
|
if (!valid()) return;
|
||||||
shm::release(impl(p_)->m_, impl(p_)->s_);
|
shm::release(impl(p_)->id_, impl(p_)->m_, impl(p_)->s_);
|
||||||
|
impl(p_)->id_ = nullptr;
|
||||||
impl(p_)->m_ = nullptr;
|
impl(p_)->m_ = nullptr;
|
||||||
impl(p_)->s_ = 0;
|
impl(p_)->s_ = 0;
|
||||||
impl(p_)->n_.clear();
|
impl(p_)->n_.clear();
|
||||||
|
|||||||
@ -24,8 +24,8 @@ template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
|
|||||||
using pc_t = ipc::prod_cons_impl<ipc::wr<Rp, Rc, Ts>>;
|
using pc_t = ipc::prod_cons_impl<ipc::wr<Rp, Rc, Ts>>;
|
||||||
|
|
||||||
template <std::size_t DataSize, typename Policy>
|
template <std::size_t DataSize, typename Policy>
|
||||||
struct ea_t : public ipc::circ::elem_array<Policy, DataSize> {
|
struct ea_t : public ipc::circ::elem_array<Policy, DataSize, 1> {
|
||||||
ea_t() { std::memset(this, 0, sizeof(ipc::circ::elem_array<Policy, DataSize>)); }
|
ea_t() { std::memset(this, 0, sizeof(ipc::circ::elem_array<Policy, DataSize, 1>)); }
|
||||||
};
|
};
|
||||||
|
|
||||||
using cq_t = ea_t<
|
using cq_t = ea_t<
|
||||||
|
|||||||
@ -83,6 +83,8 @@ void Unit::test_hello() {
|
|||||||
QVERIFY(shm_hd__.get() == nullptr);
|
QVERIFY(shm_hd__.get() == nullptr);
|
||||||
QVERIFY(shm_hd__.acquire("my-test", 1024));
|
QVERIFY(shm_hd__.acquire("my-test", 1024));
|
||||||
|
|
||||||
|
mem = shm_hd__.get();
|
||||||
|
QVERIFY(mem != nullptr);
|
||||||
std::uint8_t buf[1024] = {};
|
std::uint8_t buf[1024] = {};
|
||||||
QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0);
|
QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0);
|
||||||
|
|
||||||
|
|||||||
@ -28,6 +28,7 @@ void Unit::test_broadcast() {
|
|||||||
ipc::detail::waiter_wrapper wp { &w };
|
ipc::detail::waiter_wrapper wp { &w };
|
||||||
QVERIFY(wp.open("test-ipc-waiter"));
|
QVERIFY(wp.open("test-ipc-waiter"));
|
||||||
QVERIFY(wp.wait_if([] { return true; }));
|
QVERIFY(wp.wait_if([] { return true; }));
|
||||||
|
wp.close();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,6 +40,7 @@ void Unit::test_broadcast() {
|
|||||||
QVERIFY(wp.broadcast());
|
QVERIFY(wp.broadcast());
|
||||||
|
|
||||||
for (auto& t : ts) t.join();
|
for (auto& t : ts) t.join();
|
||||||
|
wp.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user