mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 08:46:45 +08:00
commit
c2264a901a
@ -16,3 +16,4 @@ add_subdirectory(src)
|
||||
add_subdirectory(3rdparty/gtest)
|
||||
add_subdirectory(test)
|
||||
add_subdirectory(demo/chat)
|
||||
add_subdirectory(demo/msg_que)
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
@ -8,51 +9,54 @@
|
||||
|
||||
namespace {
|
||||
|
||||
char name__[] = "ipc-chat";
|
||||
char quit__[] = "q";
|
||||
char id__ [] = "c";
|
||||
constexpr char const name__[] = "ipc-chat";
|
||||
constexpr char const quit__[] = "q";
|
||||
constexpr char const id__ [] = "c";
|
||||
|
||||
std::size_t calc_unique_id() {
|
||||
inline std::size_t calc_unique_id() {
|
||||
static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
|
||||
return static_cast<std::atomic<std::size_t>*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
ipc::channel sender__ { name__, ipc::sender };
|
||||
ipc::channel receiver__ { name__, ipc::receiver };
|
||||
|
||||
} // namespace
|
||||
|
||||
int main() {
|
||||
std::string buf, id = id__ + std::to_string(calc_unique_id());
|
||||
std::regex reg { "(c\\d+)> (.*)" };
|
||||
|
||||
ipc::channel cc { name__, ipc::sender };
|
||||
|
||||
std::thread r {[&id, ®] {
|
||||
ipc::channel cc { name__, ipc::receiver };
|
||||
std::cout << id << " is ready." << std::endl;
|
||||
while (1) {
|
||||
auto buf = cc.recv();
|
||||
if (buf.empty()) continue;
|
||||
ipc::buff_t buf = receiver__.recv();
|
||||
if (buf.empty()) break; // quit
|
||||
std::string dat { buf.get<char const *>(), buf.size() - 1 };
|
||||
std::smatch mid;
|
||||
if (std::regex_match(dat, mid, reg)) {
|
||||
if (mid.str(1) == id) {
|
||||
if (mid.str(2) == quit__) {
|
||||
std::cout << "receiver quit..." << std::endl;
|
||||
return;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
std::cout << dat << std::endl;
|
||||
}
|
||||
std::cout << id << " receiver is quit..." << std::endl;
|
||||
}};
|
||||
|
||||
for (/*int i = 1*/;; /*++i*/) {
|
||||
std::cin >> buf;
|
||||
if (buf.empty() || (buf == quit__)) break;
|
||||
// std::cout << "[" << i << "]" << std::endl;
|
||||
cc.send(id + "> " + buf);
|
||||
if (buf == quit__) break;
|
||||
sender__.send(id + "> " + buf);
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
receiver__.disconnect();
|
||||
r.join();
|
||||
std::cout << id << " sender is quit..." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
11
demo/msg_que/CMakeLists.txt
Normal file
11
demo/msg_que/CMakeLists.txt
Normal file
@ -0,0 +1,11 @@
|
||||
project(msg_que)
|
||||
|
||||
include_directories(
|
||||
${CMAKE_SOURCE_DIR}/3rdparty)
|
||||
|
||||
file(GLOB SRC_FILES ./*.cpp)
|
||||
file(GLOB HEAD_FILES ./*.h)
|
||||
|
||||
add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES})
|
||||
|
||||
target_link_libraries(${PROJECT_NAME} ipc)
|
||||
124
demo/msg_que/main.cpp
Normal file
124
demo/msg_que/main.cpp
Normal file
@ -0,0 +1,124 @@
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
|
||||
#include "libipc/ipc.h"
|
||||
#include "capo/random.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr char const name__ [] = "ipc-msg-que";
|
||||
constexpr char const mode_s__[] = "s";
|
||||
constexpr char const mode_r__[] = "r";
|
||||
|
||||
constexpr std::size_t const min_sz = 128;
|
||||
constexpr std::size_t const max_sz = 1024 * 16;
|
||||
|
||||
std::atomic<bool> is_quit__{ false };
|
||||
std::atomic<std::size_t> size_per_1s__{ 0 };
|
||||
|
||||
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
|
||||
|
||||
msg_que_t que__{ name__ };
|
||||
ipc::byte_t buff__[max_sz];
|
||||
capo::random<> rand__{ min_sz, max_sz };
|
||||
|
||||
inline std::string str_of_size(std::size_t sz) noexcept {
|
||||
if (sz <= 1024) {
|
||||
return std::to_string(sz) + " bytes";
|
||||
}
|
||||
if (sz <= 1024 * 1024) {
|
||||
return std::to_string(sz / 1024) + " KB";
|
||||
}
|
||||
return std::to_string(sz / (1024 * 1024)) + " MB";
|
||||
}
|
||||
|
||||
inline std::string speed_of(std::size_t sz) noexcept {
|
||||
return str_of_size(sz) + "/s";
|
||||
}
|
||||
|
||||
void do_counting() {
|
||||
for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms
|
||||
if (i % 10) continue;
|
||||
i = 0;
|
||||
std::cout
|
||||
<< speed_of(size_per_1s__.load(std::memory_order_acquire))
|
||||
<< std::endl;
|
||||
size_per_1s__.store(0, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
|
||||
void do_send() {
|
||||
std::cout
|
||||
<< __func__ << ": start ["
|
||||
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
|
||||
<< "]...\n";
|
||||
if (!que__.reconnect(ipc::sender)) {
|
||||
std::cerr << __func__ << ": connect failed.\n";
|
||||
}
|
||||
else {
|
||||
std::thread counting{ do_counting };
|
||||
while (!is_quit__.load(std::memory_order_acquire)) {
|
||||
std::size_t sz = static_cast<std::size_t>(rand__());
|
||||
if (!que__.send(ipc::buff_t(buff__, sz))) {
|
||||
std::cerr << __func__ << ": send failed.\n";
|
||||
std::cout << __func__ << ": waiting for receiver...\n";
|
||||
if (!que__.wait_for_recv(1)) {
|
||||
std::cerr << __func__ << ": wait receiver failed.\n";
|
||||
is_quit__.store(true, std::memory_order_release);
|
||||
break;
|
||||
}
|
||||
}
|
||||
size_per_1s__.fetch_add(sz, std::memory_order_release);
|
||||
std::this_thread::yield();
|
||||
}
|
||||
counting.join();
|
||||
}
|
||||
std::cout << __func__ << ": quit...\n";
|
||||
}
|
||||
|
||||
void do_recv() {
|
||||
std::cout
|
||||
<< __func__ << ": start ["
|
||||
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
|
||||
<< "]...\n";
|
||||
if (!que__.reconnect(ipc::receiver)) {
|
||||
std::cerr << __func__ << ": connect failed.\n";
|
||||
}
|
||||
else {
|
||||
std::thread counting{ do_counting };
|
||||
while (!is_quit__.load(std::memory_order_acquire)) {
|
||||
auto msg = que__.recv();
|
||||
if (msg.empty()) break;
|
||||
size_per_1s__.fetch_add(msg.size(), std::memory_order_release);
|
||||
}
|
||||
counting.join();
|
||||
}
|
||||
std::cout << __func__ << ": quit...\n";
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char ** argv) {
|
||||
if (argc < 2) return 0;
|
||||
|
||||
::signal(SIGINT, [](int) {
|
||||
is_quit__.store(true, std::memory_order_release);
|
||||
que__.disconnect();
|
||||
});
|
||||
|
||||
if (std::string{ argv[1] } == mode_s__) {
|
||||
do_send();
|
||||
}
|
||||
else if (std::string{ argv[1] } == mode_r__) {
|
||||
do_recv();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -19,19 +19,21 @@ enum : unsigned {
|
||||
|
||||
template <typename Flag>
|
||||
struct IPC_EXPORT chan_impl {
|
||||
static handle_t connect (char const * name, unsigned mode);
|
||||
static void disconnect(handle_t h);
|
||||
static bool connect (ipc::handle_t * ph, char const * name, unsigned mode);
|
||||
static bool reconnect (ipc::handle_t * ph, unsigned mode);
|
||||
static void disconnect(ipc::handle_t h);
|
||||
static void destroy (ipc::handle_t h);
|
||||
|
||||
static char const * name(handle_t h);
|
||||
static char const * name(ipc::handle_t h);
|
||||
|
||||
static std::size_t recv_count(handle_t h);
|
||||
static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm);
|
||||
static std::size_t recv_count(ipc::handle_t h);
|
||||
static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm);
|
||||
|
||||
static bool send(handle_t h, void const * data, std::size_t size, std::size_t tm);
|
||||
static buff_t recv(handle_t h, std::size_t tm);
|
||||
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm);
|
||||
static buff_t recv(ipc::handle_t h, std::size_t tm);
|
||||
|
||||
static bool try_send(handle_t h, void const * data, std::size_t size, std::size_t tm);
|
||||
static buff_t try_recv(handle_t h);
|
||||
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm);
|
||||
static buff_t try_recv(ipc::handle_t h);
|
||||
};
|
||||
|
||||
template <typename Flag>
|
||||
@ -39,7 +41,7 @@ class chan_wrapper {
|
||||
private:
|
||||
using detail_t = chan_impl<Flag>;
|
||||
|
||||
handle_t h_ = nullptr;
|
||||
ipc::handle_t h_ = nullptr;
|
||||
unsigned mode_ = ipc::sender;
|
||||
|
||||
public:
|
||||
@ -54,7 +56,7 @@ public:
|
||||
}
|
||||
|
||||
~chan_wrapper() {
|
||||
disconnect();
|
||||
detail_t::destroy(h_);
|
||||
}
|
||||
|
||||
void swap(chan_wrapper& rhs) noexcept {
|
||||
@ -70,7 +72,7 @@ public:
|
||||
return detail_t::name(h_);
|
||||
}
|
||||
|
||||
handle_t handle() const noexcept {
|
||||
ipc::handle_t handle() const noexcept {
|
||||
return h_;
|
||||
}
|
||||
|
||||
@ -89,14 +91,18 @@ public:
|
||||
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) {
|
||||
if (name == nullptr || name[0] == '\0') return false;
|
||||
this->disconnect();
|
||||
h_ = detail_t::connect(name, mode_ = mode);
|
||||
return valid();
|
||||
return detail_t::connect(&h_, name, mode_ = mode);
|
||||
}
|
||||
|
||||
bool reconnect(unsigned mode) {
|
||||
if (!valid()) return false;
|
||||
if (mode_ == mode) return true;
|
||||
return detail_t::reconnect(&h_, mode_ = mode);
|
||||
}
|
||||
|
||||
void disconnect() {
|
||||
if (!valid()) return;
|
||||
detail_t::disconnect(h_);
|
||||
h_ = nullptr;
|
||||
}
|
||||
|
||||
std::size_t recv_count() const {
|
||||
@ -111,13 +117,25 @@ public:
|
||||
return chan_wrapper(name).wait_for_recv(r_count, tm);
|
||||
}
|
||||
|
||||
bool send (void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm) ; }
|
||||
bool send (buff_t const & buff , std::size_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm) ; }
|
||||
bool send (std::string const & str , std::size_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); }
|
||||
bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) {
|
||||
return detail_t::send(h_, data, size, tm);
|
||||
}
|
||||
bool send(buff_t const & buff, std::size_t tm = default_timeout) {
|
||||
return this->send(buff.data(), buff.size(), tm);
|
||||
}
|
||||
bool send(std::string const & str, std::size_t tm = default_timeout) {
|
||||
return this->send(str.c_str(), str.size() + 1, tm);
|
||||
}
|
||||
|
||||
bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm) ; }
|
||||
bool try_send(buff_t const & buff , std::size_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm) ; }
|
||||
bool try_send(std::string const & str , std::size_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); }
|
||||
bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) {
|
||||
return detail_t::try_send(h_, data, size, tm);
|
||||
}
|
||||
bool try_send(buff_t const & buff, std::size_t tm = default_timeout) {
|
||||
return this->try_send(buff.data(), buff.size(), tm);
|
||||
}
|
||||
bool try_send(std::string const & str, std::size_t tm = default_timeout) {
|
||||
return this->try_send(str.c_str(), str.size() + 1, tm);
|
||||
}
|
||||
|
||||
buff_t recv(std::size_t tm = invalid_value) {
|
||||
return detail_t::recv(h_, tm);
|
||||
@ -128,8 +146,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Flag>
|
||||
using chan = chan_wrapper<Flag>;
|
||||
template <relat Rp, relat Rc, trans Ts>
|
||||
using chan = chan_wrapper<ipc::wr<Rp, Rc, Ts>>;
|
||||
|
||||
/*
|
||||
* class route
|
||||
@ -142,7 +160,7 @@ using chan = chan_wrapper<Flag>;
|
||||
* (one producer/writer to multi consumers/readers)
|
||||
*/
|
||||
|
||||
using route = chan<ipc::wr<relat::single, relat::multi, trans::broadcast>>;
|
||||
using route = chan<relat::single, relat::multi, trans::broadcast>;
|
||||
|
||||
/*
|
||||
* class channel
|
||||
@ -152,6 +170,6 @@ using route = chan<ipc::wr<relat::single, relat::multi, trans::broadcast>>;
|
||||
* would receive your sent messages.
|
||||
*/
|
||||
|
||||
using channel = chan<ipc::wr<relat::multi, relat::multi, trans::broadcast>>;
|
||||
using channel = chan<relat::multi, relat::multi, trans::broadcast>;
|
||||
|
||||
} // namespace ipc
|
||||
|
||||
@ -77,11 +77,8 @@ inline void sleep(K& k, F&& f) {
|
||||
if (k < static_cast<K>(N)) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
else if (std::forward<F>(f)()) {
|
||||
return;
|
||||
}
|
||||
else {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
static_cast<void>(std::forward<F>(f)());
|
||||
return;
|
||||
}
|
||||
++k;
|
||||
@ -89,7 +86,9 @@ inline void sleep(K& k, F&& f) {
|
||||
|
||||
template <std::size_t N = 4096, typename K>
|
||||
inline void sleep(K& k) {
|
||||
sleep<N>(k, [] { return false; });
|
||||
sleep<N>(k, [] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
|
||||
@ -19,6 +19,7 @@ aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_FILES)
|
||||
|
||||
file(GLOB HEAD_FILES
|
||||
${CMAKE_SOURCE_DIR}/include/libipc/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/*.inc
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/circ/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/memory/*.h
|
||||
|
||||
171
src/ipc.cpp
171
src/ipc.cpp
@ -8,6 +8,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
|
||||
#include "libipc/ipc.h"
|
||||
#include "libipc/def.h"
|
||||
@ -30,8 +31,6 @@
|
||||
|
||||
namespace {
|
||||
|
||||
using namespace ipc;
|
||||
|
||||
using msg_id_t = std::uint32_t;
|
||||
using acc_t = std::atomic<msg_id_t>;
|
||||
|
||||
@ -64,54 +63,54 @@ struct msg_t : msg_t<0, AlignSize> {
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
buff_t make_cache(T& data, std::size_t size) {
|
||||
auto ptr = mem::alloc(size);
|
||||
ipc::buff_t make_cache(T& data, std::size_t size) {
|
||||
auto ptr = ipc::mem::alloc(size);
|
||||
std::memcpy(ptr, &data, (ipc::detail::min)(sizeof(data), size));
|
||||
return { ptr, size, mem::free };
|
||||
return { ptr, size, ipc::mem::free };
|
||||
}
|
||||
|
||||
struct cache_t {
|
||||
std::size_t fill_;
|
||||
buff_t buff_;
|
||||
ipc::buff_t buff_;
|
||||
|
||||
cache_t(std::size_t f, buff_t&& b)
|
||||
cache_t(std::size_t f, ipc::buff_t && b)
|
||||
: fill_(f), buff_(std::move(b))
|
||||
{}
|
||||
|
||||
void append(void const * data, std::size_t size) {
|
||||
if (fill_ >= buff_.size() || data == nullptr || size == 0) return;
|
||||
auto new_fill = (ipc::detail::min)(fill_ + size, buff_.size());
|
||||
std::memcpy(static_cast<byte_t*>(buff_.data()) + fill_, data, new_fill - fill_);
|
||||
std::memcpy(static_cast<ipc::byte_t*>(buff_.data()) + fill_, data, new_fill - fill_);
|
||||
fill_ = new_fill;
|
||||
}
|
||||
};
|
||||
|
||||
auto cc_acc() {
|
||||
static shm::handle acc_h("__CA_CONN__", sizeof(acc_t));
|
||||
static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t));
|
||||
return static_cast<acc_t*>(acc_h.get());
|
||||
}
|
||||
|
||||
auto& cls_storages() {
|
||||
struct cls_t {
|
||||
shm::handle id_info_;
|
||||
std::array<shm::handle, id_pool<>::max_count> mems_;
|
||||
ipc::shm::handle id_info_;
|
||||
std::array<ipc::shm::handle, ipc::id_pool<>::max_count> mems_;
|
||||
};
|
||||
static ipc::unordered_map<std::size_t, cls_t> cls_s;
|
||||
return cls_s;
|
||||
}
|
||||
|
||||
auto& cls_lock() {
|
||||
static spin_lock cls_l;
|
||||
static ipc::spin_lock cls_l;
|
||||
return cls_l;
|
||||
}
|
||||
|
||||
struct cls_info_t {
|
||||
id_pool<> pool_;
|
||||
spin_lock lock_;
|
||||
ipc::id_pool<> pool_;
|
||||
ipc::spin_lock lock_;
|
||||
};
|
||||
|
||||
constexpr std::size_t calc_cls_size(std::size_t size) noexcept {
|
||||
return (((size - 1) / large_msg_limit) + 1) * large_msg_limit;
|
||||
return (((size - 1) / ipc::large_msg_limit) + 1) * ipc::large_msg_limit;
|
||||
}
|
||||
|
||||
auto& cls_storage(std::size_t cls_size) {
|
||||
@ -135,8 +134,8 @@ cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size)
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) {
|
||||
if (id == invalid_value) {
|
||||
ipc::byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) {
|
||||
if (id == ipc::invalid_value) {
|
||||
return nullptr;
|
||||
}
|
||||
if (!cls_shm.mems_[id].valid() &&
|
||||
@ -147,7 +146,7 @@ byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
byte_t* ptr = static_cast<byte_t*>(cls_shm.mems_[id].get());
|
||||
auto ptr = static_cast<ipc::byte_t *>(cls_shm.mems_[id].get());
|
||||
if (ptr == nullptr) {
|
||||
ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size);
|
||||
return nullptr;
|
||||
@ -190,7 +189,7 @@ void* find_storage(std::size_t id, std::size_t size) {
|
||||
}
|
||||
|
||||
void recycle_storage(std::size_t id, std::size_t size) {
|
||||
if (id == invalid_value) {
|
||||
if (id == ipc::invalid_value) {
|
||||
ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size);
|
||||
return;
|
||||
}
|
||||
@ -202,7 +201,7 @@ void recycle_storage(std::size_t id, std::size_t size) {
|
||||
ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size);
|
||||
return;
|
||||
}
|
||||
byte_t* ptr = static_cast<byte_t*>(cls_shm.mems_[id].get());
|
||||
auto ptr = static_cast<ipc::byte_t*>(cls_shm.mems_[id].get());
|
||||
if (ptr == nullptr) {
|
||||
ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size);
|
||||
return;
|
||||
@ -221,7 +220,7 @@ void recycle_storage(std::size_t id, std::size_t size) {
|
||||
}
|
||||
|
||||
void clear_storage(std::size_t id, std::size_t size) {
|
||||
if (id == invalid_value) {
|
||||
if (id == ipc::invalid_value) {
|
||||
ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size);
|
||||
return;
|
||||
}
|
||||
@ -254,8 +253,8 @@ struct conn_info_head {
|
||||
|
||||
ipc::string name_;
|
||||
msg_id_t cc_id_; // connection-info id
|
||||
waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
||||
shm::handle acc_h_;
|
||||
ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_;
|
||||
ipc::shm::handle acc_h_;
|
||||
|
||||
/*
|
||||
* <Remarks> thread_local may have some bugs.
|
||||
@ -268,7 +267,7 @@ struct conn_info_head {
|
||||
* - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
|
||||
* - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
|
||||
*/
|
||||
tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_;
|
||||
ipc::tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_;
|
||||
|
||||
conn_info_head(char const * name)
|
||||
: name_ (name)
|
||||
@ -279,6 +278,12 @@ struct conn_info_head {
|
||||
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
|
||||
}
|
||||
|
||||
void quit_waiting() {
|
||||
cc_waiter_.quit_waiting();
|
||||
wt_waiter_.quit_waiting();
|
||||
rd_waiter_.quit_waiting();
|
||||
}
|
||||
|
||||
auto acc() {
|
||||
return static_cast<acc_t*>(acc_h_.get());
|
||||
}
|
||||
@ -295,10 +300,9 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) {
|
||||
bool loop = true, ret = true;
|
||||
ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] {
|
||||
ret = waiter.wait_if([&loop, &pred] {
|
||||
return loop = pred();
|
||||
}, tm);
|
||||
return loop = pred();
|
||||
}, tm);
|
||||
k = 0;
|
||||
return true;
|
||||
});
|
||||
if (!ret ) return false; // timeout or fail
|
||||
if (!loop) break;
|
||||
@ -307,7 +311,7 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) {
|
||||
}
|
||||
|
||||
template <typename Policy,
|
||||
std::size_t DataSize = data_length,
|
||||
std::size_t DataSize = ipc::data_length,
|
||||
std::size_t AlignSize = (ipc::detail::min)(DataSize, alignof(std::max_align_t))>
|
||||
struct queue_generator {
|
||||
|
||||
@ -341,35 +345,54 @@ constexpr static queue_t* queue_of(ipc::handle_t h) {
|
||||
|
||||
/* API implementations */
|
||||
|
||||
static ipc::handle_t connect(char const * name, bool start) {
|
||||
auto h = mem::alloc<conn_info_t>(name);
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
if (start) {
|
||||
if (que->connect()) { // wouldn't connect twice
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
}
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
static void disconnect(ipc::handle_t h) {
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return;
|
||||
}
|
||||
if (que->disconnect()) {
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
bool dis = que->disconnect();
|
||||
info_of(h)->quit_waiting();
|
||||
if (dis) {
|
||||
info_of(h)->recv_cache().clear();
|
||||
}
|
||||
mem::free(info_of(h));
|
||||
}
|
||||
|
||||
static bool reconnect(ipc::handle_t * ph, bool start) {
|
||||
assert(ph != nullptr);
|
||||
assert(*ph != nullptr);
|
||||
auto que = queue_of(*ph);
|
||||
if (que == nullptr) {
|
||||
return false;
|
||||
}
|
||||
if (start) {
|
||||
if (que->connect()) { // wouldn't connect twice
|
||||
info_of(*ph)->cc_waiter_.broadcast();
|
||||
}
|
||||
}
|
||||
// start == false
|
||||
else if (que->connected()) {
|
||||
disconnect(*ph);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool connect(ipc::handle_t * ph, char const * name, bool start) {
|
||||
assert(ph != nullptr);
|
||||
if (*ph == nullptr) {
|
||||
*ph = ipc::mem::alloc<conn_info_t>(name);
|
||||
}
|
||||
return reconnect(ph, start);
|
||||
}
|
||||
|
||||
static void destroy(ipc::handle_t h) {
|
||||
disconnect(h);
|
||||
ipc::mem::free(info_of(h));
|
||||
}
|
||||
|
||||
static std::size_t recv_count(ipc::handle_t h) {
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return invalid_value;
|
||||
return ipc::invalid_value;
|
||||
}
|
||||
return que->conn_count();
|
||||
}
|
||||
@ -411,30 +434,31 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
||||
}
|
||||
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
||||
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
|
||||
if (size > large_msg_limit) {
|
||||
if (size > ipc::large_msg_limit) {
|
||||
auto dat = apply_storage(que->conn_count(), size);
|
||||
void * buf = dat.second;
|
||||
if (buf != nullptr) {
|
||||
std::memcpy(buf, data, size);
|
||||
return try_push(static_cast<std::int32_t>(size) -
|
||||
static_cast<std::int32_t>(data_length), &(dat.first), 0);
|
||||
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
|
||||
}
|
||||
// try using message fragment
|
||||
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
|
||||
}
|
||||
// push message fragment
|
||||
std::int32_t offset = 0;
|
||||
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
|
||||
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(data_length),
|
||||
static_cast<byte_t const *>(data) + offset, data_length)) {
|
||||
for (int i = 0; i < static_cast<int>(size / ipc::data_length); ++i, offset += ipc::data_length) {
|
||||
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length),
|
||||
static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// if remain > 0, this is the last message fragment
|
||||
std::int32_t remain = static_cast<std::int32_t>(size) - offset;
|
||||
if (remain > 0) {
|
||||
if (!try_push(remain - static_cast<std::int32_t>(data_length),
|
||||
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
|
||||
if (!try_push(remain - static_cast<std::int32_t>(ipc::data_length),
|
||||
static_cast<ipc::byte_t const *>(data) + offset,
|
||||
static_cast<std::size_t>(remain))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -451,8 +475,9 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
|
||||
if (!que->force_push([](void* p) {
|
||||
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);
|
||||
if (tmp_msg->storage_) {
|
||||
clear_storage(*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
|
||||
static_cast<std::int32_t>(data_length) + tmp_msg->remain_);
|
||||
clear_storage(
|
||||
*reinterpret_cast<std::size_t*>(&tmp_msg->data_),
|
||||
static_cast<std::int32_t>(ipc::data_length) + tmp_msg->remain_);
|
||||
}
|
||||
return true;
|
||||
}, info->cc_id_, msg_id, remain, data, size)) {
|
||||
@ -479,20 +504,22 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
|
||||
}, h, data, size);
|
||||
}
|
||||
|
||||
static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
ipc::error("fail: recv, queue_of(h) == nullptr\n");
|
||||
return {};
|
||||
}
|
||||
if (que->connect()) { // wouldn't connect twice
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
if (!que->connected()) {
|
||||
// hasn't connected yet, just return.
|
||||
return {};
|
||||
}
|
||||
auto& rc = info_of(h)->recv_cache();
|
||||
while (1) {
|
||||
// pop a new message
|
||||
typename queue_t::value_t msg;
|
||||
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
|
||||
// pop failed, just return.
|
||||
return {};
|
||||
}
|
||||
info_of(h)->wt_waiter_.broadcast();
|
||||
@ -500,18 +527,18 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
continue; // ignore message to self
|
||||
}
|
||||
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
||||
std::size_t remain = static_cast<std::int32_t>(data_length) + msg.remain_;
|
||||
std::size_t remain = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
|
||||
// find cache with msg.id_
|
||||
auto cac_it = rc.find(msg.id_);
|
||||
if (cac_it == rc.end()) {
|
||||
if (remain <= data_length) {
|
||||
if (remain <= ipc::data_length) {
|
||||
return make_cache(msg.data_, remain);
|
||||
}
|
||||
if (msg.storage_) {
|
||||
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
|
||||
void * buf = find_storage(buf_id, remain);
|
||||
if (buf != nullptr) {
|
||||
return buff_t { buf, remain, [](void* ptr, std::size_t size) {
|
||||
return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) {
|
||||
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
|
||||
}, reinterpret_cast<void*>(buf_id + 1) };
|
||||
}
|
||||
@ -529,7 +556,7 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
for (auto id : need_del) rc.erase(id);
|
||||
}
|
||||
// cache the first message fragment
|
||||
rc.emplace(msg.id_, cache_t { data_length, make_cache(msg.data_, remain) });
|
||||
rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) });
|
||||
}
|
||||
// has cached before this message
|
||||
else {
|
||||
@ -543,27 +570,32 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
return buff;
|
||||
}
|
||||
// there are remain datas after this message
|
||||
cac.append(&(msg.data_), data_length);
|
||||
cac.append(&(msg.data_), ipc::data_length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static buff_t try_recv(ipc::handle_t h) {
|
||||
static ipc::buff_t try_recv(ipc::handle_t h) {
|
||||
return recv(h, 0);
|
||||
}
|
||||
|
||||
}; // detail_impl<Policy>
|
||||
|
||||
template <typename Flag>
|
||||
using policy_t = policy::choose<circ::elem_array, Flag>;
|
||||
using policy_t = ipc::policy::choose<ipc::circ::elem_array, Flag>;
|
||||
|
||||
} // internal-linkage
|
||||
|
||||
namespace ipc {
|
||||
|
||||
template <typename Flag>
|
||||
ipc::handle_t chan_impl<Flag>::connect(char const * name, unsigned mode) {
|
||||
return detail_impl<policy_t<Flag>>::connect(name, mode & receiver);
|
||||
bool chan_impl<Flag>::connect(ipc::handle_t * ph, char const * name, unsigned mode) {
|
||||
return detail_impl<policy_t<Flag>>::connect(ph, name, mode & receiver);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
bool chan_impl<Flag>::reconnect(ipc::handle_t * ph, unsigned mode) {
|
||||
return detail_impl<policy_t<Flag>>::reconnect(ph, mode & receiver);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
@ -571,6 +603,11 @@ void chan_impl<Flag>::disconnect(ipc::handle_t h) {
|
||||
detail_impl<policy_t<Flag>>::disconnect(h);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
void chan_impl<Flag>::destroy(ipc::handle_t h) {
|
||||
detail_impl<policy_t<Flag>>::destroy(h);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
char const * chan_impl<Flag>::name(ipc::handle_t h) {
|
||||
auto info = detail_impl<policy_t<Flag>>::info_of(h);
|
||||
|
||||
@ -9,8 +9,11 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <cassert>
|
||||
|
||||
#include "libipc/def.h"
|
||||
#include "libipc/waiter_helper.h"
|
||||
|
||||
#include "libipc/utility/log.h"
|
||||
#include "libipc/platform/detail.h"
|
||||
@ -176,7 +179,7 @@ public:
|
||||
return SEM_FAILED;
|
||||
}
|
||||
|
||||
static handle_t open(char const* name, long count) {
|
||||
static handle_t open(char const * name, long count) {
|
||||
handle_t sem = ::sem_open(name, O_CREAT, 0666, count);
|
||||
if (sem == SEM_FAILED) {
|
||||
ipc::error("fail sem_open[%d]: %s\n", errno, name);
|
||||
@ -199,13 +202,19 @@ public:
|
||||
IPC_SEMAPHORE_FUNC_(sem_close, h);
|
||||
}
|
||||
|
||||
static bool destroy(char const* name) {
|
||||
static bool destroy(char const * name) {
|
||||
IPC_SEMAPHORE_FUNC_(sem_unlink, name);
|
||||
}
|
||||
|
||||
static bool post(handle_t h) {
|
||||
static bool post(handle_t h, long count) {
|
||||
if (h == invalid()) return false;
|
||||
IPC_SEMAPHORE_FUNC_(sem_post, h);
|
||||
auto spost = [](handle_t h) {
|
||||
IPC_SEMAPHORE_FUNC_(sem_post, h);
|
||||
};
|
||||
for (long i = 0; i < count; ++i) {
|
||||
if (!spost(h)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool wait(handle_t h, std::size_t tm = invalid_value) {
|
||||
@ -233,19 +242,63 @@ public:
|
||||
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
|
||||
};
|
||||
|
||||
class waiter_helper {
|
||||
mutex lock_;
|
||||
|
||||
std::atomic<unsigned> waiting_ { 0 };
|
||||
long counter_ = 0;
|
||||
|
||||
class waiter_holder {
|
||||
public:
|
||||
using handle_t = std::tuple<ipc::string, sem_helper::handle_t, sem_helper::handle_t>;
|
||||
using handle_t = std::tuple<
|
||||
ipc::string,
|
||||
sem_helper::handle_t /* sema */,
|
||||
sem_helper::handle_t /* handshake */>;
|
||||
|
||||
static handle_t invalid() noexcept {
|
||||
return std::make_tuple(ipc::string{}, sem_helper::invalid(), sem_helper::invalid());
|
||||
return std::make_tuple(
|
||||
ipc::string{},
|
||||
sem_helper::invalid(),
|
||||
sem_helper::invalid());
|
||||
}
|
||||
|
||||
private:
|
||||
using wait_flags = waiter_helper::wait_flags;
|
||||
using wait_counter = waiter_helper::wait_counter;
|
||||
|
||||
mutex lock_;
|
||||
wait_counter cnt_;
|
||||
|
||||
struct contrl {
|
||||
waiter_holder * me_;
|
||||
wait_flags * flags_;
|
||||
handle_t const & h_;
|
||||
|
||||
wait_flags & flags() noexcept {
|
||||
assert(flags_ != nullptr);
|
||||
return *flags_;
|
||||
}
|
||||
|
||||
wait_counter & counter() noexcept {
|
||||
return me_->cnt_;
|
||||
}
|
||||
|
||||
auto get_lock() {
|
||||
return ipc::detail::unique_lock(me_->lock_);
|
||||
}
|
||||
|
||||
bool sema_wait(std::size_t tm) {
|
||||
return sem_helper::wait(std::get<1>(h_), tm);
|
||||
}
|
||||
|
||||
bool sema_post(long count) {
|
||||
return sem_helper::post(std::get<1>(h_), count);
|
||||
}
|
||||
|
||||
bool handshake_wait(std::size_t tm) {
|
||||
return sem_helper::wait(std::get<2>(h_), tm);
|
||||
}
|
||||
|
||||
bool handshake_post(long count) {
|
||||
return sem_helper::post(std::get<2>(h_), count);
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
handle_t open_h(ipc::string && name) {
|
||||
auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0);
|
||||
if (sem == sem_helper::invalid()) {
|
||||
@ -278,63 +331,45 @@ public:
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) {
|
||||
waiting_.fetch_add(1, std::memory_order_release);
|
||||
{
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (!std::forward<F>(pred)()) return true;
|
||||
++ counter_;
|
||||
}
|
||||
bool ret = sem_helper::wait(std::get<1>(h), tm);
|
||||
waiting_.fetch_sub(1, std::memory_order_release);
|
||||
ret = sem_helper::post(std::get<2>(h)) && ret;
|
||||
return ret;
|
||||
bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::size_t tm = invalid_value) {
|
||||
assert(flags != nullptr);
|
||||
contrl ctrl { this, flags, h };
|
||||
|
||||
class non_mutex {
|
||||
public:
|
||||
void lock () noexcept {}
|
||||
void unlock() noexcept {}
|
||||
} nm;
|
||||
|
||||
return waiter_helper::wait_if(ctrl, nm, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
bool notify(handle_t const & h) {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_.load(std::memory_order_relaxed) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (counter_ > 0) {
|
||||
ret = sem_helper::post(std::get<1>(h));
|
||||
-- counter_;
|
||||
ret = ret && sem_helper::wait(std::get<2>(h), default_timeout);
|
||||
}
|
||||
return ret;
|
||||
contrl ctrl { this, nullptr, h };
|
||||
return waiter_helper::notify(ctrl);
|
||||
}
|
||||
|
||||
bool broadcast(handle_t const & h) {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_.load(std::memory_order_relaxed) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (counter_ > 0) {
|
||||
for (long i = 0; i < counter_; ++i) {
|
||||
ret = ret && sem_helper::post(std::get<1>(h));
|
||||
}
|
||||
do {
|
||||
-- counter_;
|
||||
ret = ret && sem_helper::wait(std::get<2>(h), default_timeout);
|
||||
} while (counter_ > 0);
|
||||
}
|
||||
return ret;
|
||||
contrl ctrl { this, nullptr, h };
|
||||
return waiter_helper::broadcast(ctrl);
|
||||
}
|
||||
|
||||
bool quit_waiting(handle_t const & h, wait_flags * flags) {
|
||||
assert(flags != nullptr);
|
||||
contrl ctrl { this, flags, h };
|
||||
return waiter_helper::quit_waiting(ctrl);
|
||||
}
|
||||
};
|
||||
|
||||
class waiter {
|
||||
waiter_helper helper_;
|
||||
waiter_holder helper_;
|
||||
std::atomic<unsigned> opened_ { 0 };
|
||||
|
||||
public:
|
||||
using handle_t = waiter_helper::handle_t;
|
||||
using handle_t = waiter_holder::handle_t;
|
||||
|
||||
static handle_t invalid() noexcept {
|
||||
return waiter_helper::invalid();
|
||||
return waiter_holder::invalid();
|
||||
}
|
||||
|
||||
handle_t open(char const * name) {
|
||||
@ -357,19 +392,24 @@ public:
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(handle_t h, F&& pred, std::size_t tm = invalid_value) {
|
||||
bool wait_if(handle_t h, waiter_helper::wait_flags * flags, F && pred, std::size_t tm = invalid_value) {
|
||||
if (h == invalid()) return false;
|
||||
return helper_.wait_if(h, std::forward<F>(pred), tm);
|
||||
return helper_.wait_if(h, flags, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
void notify(handle_t h) {
|
||||
if (h == invalid()) return;
|
||||
helper_.notify(h);
|
||||
bool notify(handle_t h) {
|
||||
if (h == invalid()) return false;
|
||||
return helper_.notify(h);
|
||||
}
|
||||
|
||||
void broadcast(handle_t h) {
|
||||
if (h == invalid()) return;
|
||||
helper_.broadcast(h);
|
||||
bool broadcast(handle_t h) {
|
||||
if (h == invalid()) return false;
|
||||
return helper_.broadcast(h);
|
||||
}
|
||||
|
||||
bool quit_waiting(handle_t h, waiter_helper::wait_flags * flags) {
|
||||
if (h == invalid()) return false;
|
||||
return helper_.quit_waiting(h, flags);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -3,11 +3,14 @@
|
||||
#include <Windows.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <limits>
|
||||
#include <cassert>
|
||||
|
||||
#include "libipc/rw_lock.h"
|
||||
#include "libipc/pool_alloc.h"
|
||||
#include "libipc/shm.h"
|
||||
#include "libipc/waiter_helper.h"
|
||||
|
||||
#include "libipc/utility/log.h"
|
||||
#include "libipc/platform/to_tchar.h"
|
||||
@ -42,8 +45,9 @@ public:
|
||||
switch ((ret = ::WaitForSingleObject(h_, ms))) {
|
||||
case WAIT_OBJECT_0:
|
||||
return true;
|
||||
case WAIT_ABANDONED:
|
||||
case WAIT_TIMEOUT:
|
||||
return false;
|
||||
case WAIT_ABANDONED:
|
||||
default:
|
||||
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
|
||||
return false;
|
||||
@ -73,15 +77,51 @@ public:
|
||||
};
|
||||
|
||||
class condition {
|
||||
using wait_flags = waiter_helper::wait_flags;
|
||||
using wait_counter = waiter_helper::wait_counter;
|
||||
|
||||
mutex lock_;
|
||||
semaphore sema_, handshake_;
|
||||
wait_counter * cnt_ = nullptr;
|
||||
|
||||
std::atomic<unsigned> * waiting_ = nullptr;
|
||||
long * counter_ = nullptr;
|
||||
struct contrl {
|
||||
condition * me_;
|
||||
wait_flags * flags_;
|
||||
|
||||
wait_flags & flags() noexcept {
|
||||
assert(flags_ != nullptr);
|
||||
return *flags_;
|
||||
}
|
||||
|
||||
wait_counter & counter() noexcept {
|
||||
assert(me_->cnt_ != nullptr);
|
||||
return *(me_->cnt_);
|
||||
}
|
||||
|
||||
auto get_lock() {
|
||||
return ipc::detail::unique_lock(me_->lock_);
|
||||
}
|
||||
|
||||
bool sema_wait(std::size_t tm) {
|
||||
return me_->sema_.wait(tm);
|
||||
}
|
||||
|
||||
bool sema_post(long count) {
|
||||
return me_->sema_.post(count);
|
||||
}
|
||||
|
||||
bool handshake_wait(std::size_t tm) {
|
||||
return me_->handshake_.wait(tm);
|
||||
}
|
||||
|
||||
bool handshake_post(long count) {
|
||||
return me_->handshake_.post(count);
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
friend bool operator==(condition const & c1, condition const & c2) {
|
||||
return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_);
|
||||
return c1.cnt_ == c2.cnt_;
|
||||
}
|
||||
|
||||
friend bool operator!=(condition const & c1, condition const & c2) {
|
||||
@ -94,12 +134,11 @@ public:
|
||||
mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str());
|
||||
}
|
||||
|
||||
bool open(ipc::string const & name, std::atomic<unsigned> * waiting, long * counter) {
|
||||
bool open(ipc::string const & name, wait_counter * cnt) {
|
||||
if (lock_ .open("__COND_MTX__" + name) &&
|
||||
sema_ .open("__COND_SEM__" + name) &&
|
||||
handshake_.open("__COND_HAN__" + name)) {
|
||||
waiting_ = waiting;
|
||||
counter_ = counter;
|
||||
cnt_ = cnt;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -112,58 +151,31 @@ public:
|
||||
}
|
||||
|
||||
template <typename Mutex, typename F>
|
||||
bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) {
|
||||
waiting_->fetch_add(1, std::memory_order_release);
|
||||
{
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (!std::forward<F>(pred)()) return true;
|
||||
++ *counter_;
|
||||
}
|
||||
mtx.unlock();
|
||||
bool ret = sema_.wait(tm);
|
||||
waiting_->fetch_sub(1, std::memory_order_release);
|
||||
ret = handshake_.post() && ret;
|
||||
mtx.lock();
|
||||
return ret;
|
||||
bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::size_t tm = invalid_value) {
|
||||
assert(flags != nullptr);
|
||||
contrl ctrl { this, flags };
|
||||
return waiter_helper::wait_if(ctrl, mtx, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
bool notify() {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_->load(std::memory_order_relaxed) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (*counter_ > 0) {
|
||||
ret = sema_.post();
|
||||
-- *counter_;
|
||||
ret = ret && handshake_.wait(default_timeout);
|
||||
}
|
||||
return ret;
|
||||
contrl ctrl { this, nullptr };
|
||||
return waiter_helper::notify(ctrl);
|
||||
}
|
||||
|
||||
bool broadcast() {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_->load(std::memory_order_relaxed) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (*counter_ > 0) {
|
||||
ret = sema_.post(*counter_);
|
||||
do {
|
||||
-- *counter_;
|
||||
ret = ret && handshake_.wait(default_timeout);
|
||||
} while (*counter_ > 0);
|
||||
}
|
||||
return ret;
|
||||
contrl ctrl { this, nullptr };
|
||||
return waiter_helper::broadcast(ctrl);
|
||||
}
|
||||
|
||||
bool quit_waiting(wait_flags * flags) {
|
||||
assert(flags != nullptr);
|
||||
contrl ctrl { this, flags };
|
||||
return waiter_helper::quit_waiting(ctrl);
|
||||
}
|
||||
};
|
||||
|
||||
class waiter {
|
||||
|
||||
std::atomic<unsigned> waiting_ { 0 };
|
||||
long counter_ = 0;
|
||||
waiter_helper::wait_counter cnt_;
|
||||
|
||||
public:
|
||||
using handle_t = condition;
|
||||
@ -177,7 +189,7 @@ public:
|
||||
return invalid();
|
||||
}
|
||||
condition cond;
|
||||
if (cond.open(name, &waiting_, &counter_)) {
|
||||
if (cond.open(name, &cnt_)) {
|
||||
return cond;
|
||||
}
|
||||
return invalid();
|
||||
@ -189,7 +201,7 @@ public:
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) {
|
||||
bool wait_if(handle_t& h, waiter_helper::wait_flags * flags, F&& pred, std::size_t tm = invalid_value) {
|
||||
if (h == invalid()) return false;
|
||||
|
||||
class non_mutex {
|
||||
@ -198,17 +210,22 @@ public:
|
||||
void unlock() noexcept {}
|
||||
} nm;
|
||||
|
||||
return h.wait_if(nm, std::forward<F>(pred), tm);
|
||||
return h.wait_if(nm, flags, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
void notify(handle_t& h) {
|
||||
if (h == invalid()) return;
|
||||
h.notify();
|
||||
bool notify(handle_t& h) {
|
||||
if (h == invalid()) return false;
|
||||
return h.notify();
|
||||
}
|
||||
|
||||
void broadcast(handle_t& h) {
|
||||
if (h == invalid()) return;
|
||||
h.broadcast();
|
||||
bool broadcast(handle_t& h) {
|
||||
if (h == invalid()) return false;
|
||||
return h.broadcast();
|
||||
}
|
||||
|
||||
bool quit_waiting(handle_t& h, waiter_helper::wait_flags * flags) {
|
||||
if (h == invalid()) return false;
|
||||
return h.quit_waiting(flags);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -21,35 +21,39 @@ using mutex_impl = ipc::detail::mutex;
|
||||
using semaphore_impl = ipc::detail::semaphore;
|
||||
|
||||
class condition_impl : public ipc::detail::condition {
|
||||
using base_t = ipc::detail::condition;
|
||||
|
||||
ipc::shm::handle wait_h_, cnt_h_;
|
||||
ipc::shm::handle cnt_h_;
|
||||
waiter_helper::wait_flags flags_;
|
||||
|
||||
public:
|
||||
static void remove(char const * name) {
|
||||
ipc::detail::condition::remove(name);
|
||||
base_t::remove(name);
|
||||
ipc::string n = name;
|
||||
ipc::shm::remove((n + "__COND_CNT__" ).c_str());
|
||||
ipc::shm::remove((n + "__COND_WAIT__").c_str());
|
||||
}
|
||||
|
||||
bool open(ipc::string const & name) {
|
||||
if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic<unsigned>)) &&
|
||||
cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) {
|
||||
return ipc::detail::condition::open(name,
|
||||
static_cast<std::atomic<unsigned> *>(wait_h_.get()),
|
||||
static_cast<long *>(cnt_h_.get()));
|
||||
bool open(char const * name) {
|
||||
if (cnt_h_ .acquire(
|
||||
(ipc::string { name } + "__COND_CNT__" ).c_str(),
|
||||
sizeof(waiter_helper::wait_counter))) {
|
||||
flags_.is_closed_.store(false, std::memory_order_release);
|
||||
return base_t::open(name,
|
||||
static_cast<waiter_helper::wait_counter *>(cnt_h_.get()));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void close() {
|
||||
ipc::detail::condition::close();
|
||||
cnt_h_ .release();
|
||||
wait_h_.release();
|
||||
flags_.is_closed_.store(true, std::memory_order_release);
|
||||
base_t::quit_waiting(&flags_);
|
||||
base_t::close();
|
||||
cnt_h_.release();
|
||||
}
|
||||
|
||||
bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) {
|
||||
return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm);
|
||||
return base_t::wait_if(mtx, &flags_, [] { return true; }, tm);
|
||||
}
|
||||
};
|
||||
|
||||
@ -165,17 +169,11 @@ public:
|
||||
}
|
||||
|
||||
bool wait(std::size_t tm = invalid_value) {
|
||||
if (h_ == sem_helper::invalid()) return false;
|
||||
return sem_helper::wait(h_, tm);
|
||||
}
|
||||
|
||||
bool post(long count) {
|
||||
if (h_ == sem_helper::invalid()) return false;
|
||||
bool ret = true;
|
||||
for (long i = 0; i < count; ++i) {
|
||||
ret = ret && sem_helper::post(h_);
|
||||
}
|
||||
return ret;
|
||||
return sem_helper::post(h_, count);
|
||||
}
|
||||
};
|
||||
|
||||
@ -194,6 +192,7 @@ public:
|
||||
private:
|
||||
waiter_t* w_ = nullptr;
|
||||
waiter_t::handle_t h_ = waiter_t::invalid();
|
||||
waiter_helper::wait_flags flags_;
|
||||
|
||||
public:
|
||||
waiter_wrapper() = default;
|
||||
@ -218,20 +217,27 @@ public:
|
||||
bool open(char const * name) {
|
||||
if (w_ == nullptr) return false;
|
||||
close();
|
||||
flags_.is_closed_.store(false, std::memory_order_release);
|
||||
h_ = w_->open(name);
|
||||
return valid();
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (!valid()) return;
|
||||
flags_.is_closed_.store(true, std::memory_order_release);
|
||||
quit_waiting();
|
||||
w_->close(h_);
|
||||
h_ = waiter_t::invalid();
|
||||
}
|
||||
|
||||
void quit_waiting() {
|
||||
w_->quit_waiting(h_, &flags_);
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(F&& pred, std::size_t tm = invalid_value) {
|
||||
bool wait_if(F && pred, std::size_t tm = invalid_value) {
|
||||
if (!valid()) return false;
|
||||
return w_->wait_if(h_, std::forward<F>(pred), tm);
|
||||
return w_->wait_if(h_, &flags_, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
bool notify() {
|
||||
|
||||
@ -7,8 +7,8 @@ namespace ipc {
|
||||
namespace detail {
|
||||
|
||||
template <typename O>
|
||||
void print(O out, char const * fmt) {
|
||||
std::fprintf(out, "%s", fmt);
|
||||
void print(O out, char const * str) {
|
||||
std::fprintf(out, "%s", str);
|
||||
}
|
||||
|
||||
template <typename O, typename P1, typename... P>
|
||||
@ -27,8 +27,8 @@ void log(char const * fmt, P1&& p1, P&&... params) {
|
||||
ipc::detail::print(stdout, fmt, std::forward<P1>(p1), std::forward<P>(params)...);
|
||||
}
|
||||
|
||||
inline void error(char const * fmt) {
|
||||
ipc::detail::print(stderr, fmt);
|
||||
inline void error(char const * str) {
|
||||
ipc::detail::print(stderr, str);
|
||||
}
|
||||
|
||||
template <typename P1, typename... P>
|
||||
|
||||
64
src/libipc/utility/scope_guard.h
Normal file
64
src/libipc/utility/scope_guard.h
Normal file
@ -0,0 +1,64 @@
|
||||
#pragma once
|
||||
|
||||
#include <utility> // std::forward, std::move
|
||||
#include <algorithm> // std::swap
|
||||
#include <type_traits> // std::decay
|
||||
|
||||
namespace ipc {
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
/// Execute guard function when the enclosing scope exits
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
template <typename F>
|
||||
class scope_guard {
|
||||
F destructor_;
|
||||
mutable bool dismiss_;
|
||||
|
||||
public:
|
||||
template <typename D>
|
||||
scope_guard(D && destructor)
|
||||
: destructor_(std::forward<D>(destructor))
|
||||
, dismiss_(false) {
|
||||
}
|
||||
|
||||
scope_guard(scope_guard&& rhs)
|
||||
: destructor_(std::move(rhs.destructor_))
|
||||
, dismiss_(true) /* dismiss rhs */ {
|
||||
std::swap(dismiss_, rhs.dismiss_);
|
||||
}
|
||||
|
||||
~scope_guard() {
|
||||
try { do_exit(); }
|
||||
/**
|
||||
* In the realm of exceptions, it is fundamental that you can do nothing
|
||||
* if your "undo/recover" action fails.
|
||||
*/
|
||||
catch (...) { /* Do nothing */ }
|
||||
}
|
||||
|
||||
void swap(scope_guard & rhs) {
|
||||
std::swap(destructor_, rhs.destructor_);
|
||||
std::swap(dismiss_ , rhs.dismiss_);
|
||||
}
|
||||
|
||||
void dismiss() const noexcept {
|
||||
dismiss_ = true;
|
||||
}
|
||||
|
||||
void do_exit() {
|
||||
if (!dismiss_) {
|
||||
dismiss_ = true;
|
||||
destructor_();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <typename D>
|
||||
constexpr auto guard(D && destructor) noexcept {
|
||||
return scope_guard<std::decay_t<D>> {
|
||||
std::forward<D>(destructor)
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
129
src/libipc/waiter_helper.h
Normal file
129
src/libipc/waiter_helper.h
Normal file
@ -0,0 +1,129 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <limits>
|
||||
#include <utility>
|
||||
|
||||
#include "libipc/def.h"
|
||||
#include "libipc/utility/scope_guard.h"
|
||||
|
||||
namespace ipc {
|
||||
namespace detail {
|
||||
|
||||
class waiter_helper {
|
||||
|
||||
enum : unsigned {
|
||||
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1,
|
||||
destruct_flag = ~destruct_mask
|
||||
};
|
||||
|
||||
public:
|
||||
struct wait_counter {
|
||||
std::atomic<unsigned> waiting_ { 0 };
|
||||
long counter_ = 0;
|
||||
};
|
||||
|
||||
struct wait_flags {
|
||||
std::atomic<bool> is_waiting_ { false };
|
||||
std::atomic<bool> is_closed_ { true };
|
||||
};
|
||||
|
||||
template <typename Mutex, typename Ctrl, typename F>
|
||||
static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::size_t tm) {
|
||||
auto & flags = ctrl.flags();
|
||||
if (flags.is_closed_.load(std::memory_order_acquire)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto & counter = ctrl.counter();
|
||||
counter.waiting_.fetch_add(1, std::memory_order_release);
|
||||
flags.is_waiting_.store(true, std::memory_order_relaxed);
|
||||
auto finally = ipc::guard([&counter, &flags] {
|
||||
counter.waiting_.fetch_sub(1, std::memory_order_release);
|
||||
flags.is_waiting_.store(false, std::memory_order_relaxed);
|
||||
});
|
||||
{
|
||||
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
||||
if (!std::forward<F>(pred)()) return true;
|
||||
counter.counter_ += 1;
|
||||
}
|
||||
mtx.unlock();
|
||||
|
||||
bool ret = false;
|
||||
do {
|
||||
bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed);
|
||||
bool is_closed = flags.is_closed_ .load(std::memory_order_acquire);
|
||||
if (!is_waiting || is_closed) {
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
ret = ctrl.sema_wait(tm);
|
||||
} while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag);
|
||||
finally.do_exit();
|
||||
ret = ctrl.handshake_post(1) && ret;
|
||||
|
||||
mtx.lock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Ctrl>
|
||||
static bool notify(Ctrl & ctrl) {
|
||||
auto & counter = ctrl.counter();
|
||||
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
||||
if (counter.counter_ > 0) {
|
||||
ret = ctrl.sema_post(1);
|
||||
counter.counter_ -= 1;
|
||||
ret = ret && ctrl.handshake_wait(default_timeout);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Ctrl>
|
||||
static bool broadcast(Ctrl & ctrl) {
|
||||
auto & counter = ctrl.counter();
|
||||
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
||||
if (counter.counter_ > 0) {
|
||||
ret = ctrl.sema_post(counter.counter_);
|
||||
do {
|
||||
counter.counter_ -= 1;
|
||||
ret = ret && ctrl.handshake_wait(default_timeout);
|
||||
} while (counter.counter_ > 0);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Ctrl>
|
||||
static bool quit_waiting(Ctrl & ctrl) {
|
||||
auto & flags = ctrl.flags();
|
||||
if (!flags.is_waiting_.exchange(false, std::memory_order_release)) {
|
||||
return true;
|
||||
}
|
||||
auto & counter = ctrl.counter();
|
||||
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
||||
counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed);
|
||||
IPC_UNUSED_ auto finally = ipc::guard([&counter] {
|
||||
counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed);
|
||||
});
|
||||
if (counter.counter_ > 0) {
|
||||
ret = ctrl.sema_post(counter.counter_);
|
||||
counter.counter_ -= 1;
|
||||
ret = ret && ctrl.handshake_wait(default_timeout);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
} // namespace ipc
|
||||
@ -52,7 +52,7 @@ public:
|
||||
|
||||
template <relat Rp, relat Rc, trans Ts>
|
||||
void test_basic(char const * name) {
|
||||
using que_t = chan<wr<Rp, Rc, Ts>>;
|
||||
using que_t = chan<Rp, Rc, Ts>;
|
||||
rand_buf test1, test2;
|
||||
|
||||
que_t que1 { name };
|
||||
@ -69,7 +69,7 @@ void test_basic(char const * name) {
|
||||
|
||||
template <relat Rp, relat Rc, trans Ts>
|
||||
void test_sr(char const * name, int size, int s_cnt, int r_cnt) {
|
||||
using que_t = chan<wr<Rp, Rc, Ts>>;
|
||||
using que_t = chan<Rp, Rc, Ts>;
|
||||
|
||||
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
|
||||
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user