try send/recv; support sending timeout; fix bugs

This commit is contained in:
zhangyi 2019-03-26 19:12:59 +08:00
parent 785abd1845
commit 5affd26da6
10 changed files with 226 additions and 110 deletions

View File

@ -2,27 +2,25 @@
#include <string>
#include <thread>
#include <regex>
#include <atomic>
#include "ipc.h"
namespace ipc {
namespace detail {
IPC_EXPORT std::size_t calc_unique_id();
} // namespace detail
} // namespace ipc
namespace {
char name__[] = "ipc-chat";
char quit__[] = "q";
char id__ [] = "c";
std::size_t calc_unique_id() {
static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
return static_cast<std::atomic<std::size_t>*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed);
}
} // namespace
int main() {
std::string buf, id = id__ + std::to_string(ipc::detail::calc_unique_id());
std::string buf, id = id__ + std::to_string(calc_unique_id());
std::regex reg { "(c\\d+)> (.*)" };
ipc::channel cc { name__ };

View File

@ -30,7 +30,7 @@ enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64,
name_length = 64,
send_wait_for = 100 // ms
send_wait = 100 // ms
};
enum class relat { // multiplicity of the relationship

View File

@ -23,6 +23,9 @@ struct IPC_EXPORT chan_impl {
static bool send(handle_t h, void const * data, std::size_t size);
static buff_t recv(handle_t h, std::size_t tm);
static bool try_send(handle_t h, void const * data, std::size_t size);
static buff_t try_recv(handle_t h);
};
template <typename Flag>
@ -100,21 +103,21 @@ public:
return chan_wrapper(name).wait_for_recv(r_count, tm);
}
bool send(void const * data, std::size_t size) {
return detail_t::send(h_, data, size);
}
bool send (void const * data, std::size_t size) { return detail_t::send(h_, data, size) ; }
bool send (buff_t const & buff) { return this->send(buff.data(), buff.size()) ; }
bool send (std::string const & str) { return this->send(str.c_str(), str.size() + 1); }
bool send(buff_t const & buff) {
return this->send(buff.data(), buff.size());
}
bool send(std::string const & str) {
return this->send(str.c_str(), str.size() + 1);
}
bool try_send(void const * data, std::size_t size) { return detail_t::try_send(h_, data, size) ; }
bool try_send(buff_t const & buff) { return this->try_send(buff.data(), buff.size()) ; }
bool try_send(std::string const & str) { return this->try_send(str.c_str(), str.size() + 1); }
buff_t recv(std::size_t tm = invalid_value) {
return detail_t::recv(h_, tm);
}
buff_t try_recv() {
return detail_t::try_recv(h_);
}
};
template <typename Flag>

View File

@ -11,16 +11,17 @@
namespace ipc {
namespace circ {
namespace detail {
template <typename Policy, std::size_t DataSize, std::size_t AlignSize>
class elem_array {
class elem_array : public ipc::circ::conn_head {
public:
using base_t = ipc::circ::conn_head;
using policy_t = Policy;
using cursor_t = decltype(std::declval<policy_t>().cursor());
using elem_t = typename policy_t::template elem_t<DataSize, AlignSize>;
enum : std::size_t {
head_size = sizeof(base_t) + sizeof(policy_t),
data_size = DataSize,
elem_max = (std::numeric_limits<uint_t<8>>::max)() + 1, // default is 255 + 1
elem_size = sizeof(elem_t),
@ -36,53 +37,20 @@ public:
return head_.cursor();
}
template <typename E, typename F>
bool push(E* elems, F&& f) {
return head_.push(elems, std::forward<F>(f), block_);
}
template <typename E, typename F>
bool pop(E* elems, cursor_t* cur, F&& f) {
if (cur == nullptr) return false;
return head_.pop(elems, *cur, std::forward<F>(f), block_);
}
};
} // namespace detail
template <typename Policy, std::size_t DataSize, std::size_t AlignSize>
class elem_array : public ipc::circ::conn_head {
public:
using base_t = ipc::circ::conn_head;
using array_t = detail::elem_array<Policy, DataSize, AlignSize>;
using policy_t = typename array_t::policy_t;
using cursor_t = typename array_t::cursor_t;
using elem_t = typename array_t::elem_t;
enum : std::size_t {
head_size = sizeof(base_t) + sizeof(policy_t),
data_size = array_t::data_size,
elem_max = array_t::elem_max,
elem_size = array_t::elem_size,
block_size = array_t::block_size
};
private:
array_t array_;
public:
cursor_t cursor() const noexcept {
return array_.cursor();
template <typename F>
bool push(F&& f) {
return head_.push(this, std::forward<F>(f), block_);
}
template <typename F>
bool push(F&& f) {
return array_.push(this, std::forward<F>(f));
bool force_push(F&& f) {
return head_.force_push(this, std::forward<F>(f), block_);
}
template <typename F>
bool pop(cursor_t* cur, F&& f) {
return array_.pop(this, cur, std::forward<F>(f));
if (cur == nullptr) return false;
return head_.pop(this, *cur, std::forward<F>(f), block_);
}
};

View File

@ -7,6 +7,7 @@
#include <atomic>
#include <type_traits>
#include <string>
#include <vector>
#include "def.h"
#include "shm.h"
@ -15,6 +16,7 @@
#include "queue.h"
#include "policy.h"
#include "rw_lock.h"
#include "log.h"
#include "memory/resource.h"
@ -75,14 +77,33 @@ struct cache_t {
}
};
struct conn_info_head {
using acc_t = std::atomic<msg_id_t>;
waiter cc_waiter_, wt_waiter_, rd_waiter_;
shm::handle acc_h_;
conn_info_head(char const * name)
: cc_waiter_((std::string{ "__CC_CONN__" } + name).c_str())
, wt_waiter_((std::string{ "__WT_CONN__" } + name).c_str())
, rd_waiter_((std::string{ "__RD_CONN__" } + name).c_str())
, acc_h_ ((std::string{ "__AC_CONN__" } + name).c_str(), sizeof(acc_t)) {
}
auto acc() {
return static_cast<acc_t*>(acc_h_.get());
}
};
template <typename W, typename F>
bool wait_for(W& waiter, F&& pred, std::size_t tm) {
for (unsigned k = 0; pred();) {
bool loop = true, ret = true;
ipc::sleep(k, [&loop, &ret, &waiter, &pred, tm] {
ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] {
ret = waiter.wait_if([&loop, &ret, &pred] {
return loop = pred();
}, tm);
k = 0;
return true;
});
if (!ret ) return false; // timeout or fail
@ -96,15 +117,12 @@ struct detail_impl {
using queue_t = ipc::queue<msg_t<data_length>, Policy>;
struct conn_info_t {
struct conn_info_t : conn_info_head {
queue_t que_;
waiter cc_waiter_, wt_waiter_, rd_waiter_;
conn_info_t(char const * name)
: que_ ((std::string{ "__QU_CONN__" } + name).c_str()) {
cc_waiter_.open((std::string{ "__CC_CONN__" } + name).c_str());
wt_waiter_.open((std::string{ "__WT_CONN__" } + name).c_str());
rd_waiter_.open((std::string{ "__RD_CONN__" } + name).c_str());
: conn_info_head(name)
, que_((std::string{ "__QU_CONN__" } + name).c_str()) {
}
};
@ -169,43 +187,80 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm)
}, tm);
}
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
if (data == nullptr) {
return false;
}
if (size == 0) {
template <typename F>
static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) {
if (data == nullptr || size == 0) {
ipc::error("fail: send(%p, %zd)\n", data, size);
return false;
}
auto que = queue_of(h);
if (que == nullptr) {
ipc::error("fail: send, queue_of(h) == nullptr\n");
return false;
}
// calc a new message id
auto msg_id = ipc::detail::calc_unique_id();
auto acc = info_of(h)->acc();
if (acc == nullptr) {
ipc::error("fail: send, info_of(h)->acc() == nullptr\n");
return false;
}
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
// push message fragment
int offset = 0;
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
while (!que->push(que, msg_id, static_cast<int>(size) - offset - static_cast<int>(data_length),
if (!try_push(static_cast<int>(size) - offset - static_cast<int>(data_length),
static_cast<byte_t const *>(data) + offset, data_length)) {
std::this_thread::yield();
return false;
}
info_of(h)->rd_waiter_.broadcast();
}
// if remain > 0, this is the last message fragment
int remain = static_cast<int>(size) - offset;
if (remain > 0) {
while (!que->push(que, msg_id, remain - static_cast<int>(data_length),
if (!try_push(remain - static_cast<int>(data_length),
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
std::this_thread::yield();
return false;
}
info_of(h)->rd_waiter_.broadcast();
}
return true;
}
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(que, msg_id, remain, data, size);
}, send_wait)) {
if (!que->force_push(que, msg_id, remain, data, size)) {
return false;
}
}
info->rd_waiter_.broadcast();
return true;
};
}, h, data, size);
}
static bool try_send(ipc::handle_t h, void const * data, std::size_t size) {
return send([](auto info, auto que, auto msg_id) {
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(que, msg_id, remain, data, size);
}, 0)) {
return false;
}
info->rd_waiter_.broadcast();
return true;
};
}, h, data, size);
}
static buff_t recv(ipc::handle_t h, std::size_t tm) {
auto que = queue_of(h);
if (que == nullptr) return {};
if (que == nullptr) {
ipc::error("fail: recv, queue_of(h) == nullptr\n");
return {};
}
if (que->connect()) { // wouldn't connect twice
info_of(h)->cc_waiter_.broadcast();
}
@ -217,19 +272,35 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
[que, &msg] { return !que->pop(msg); }, tm)) {
return {};
}
if (msg.head_.que_ == nullptr) return {};
info_of(h)->wt_waiter_.broadcast();
if (msg.head_.que_ == nullptr) {
ipc::error("fail: recv, msg.head_.que_ == nullptr\n");
return {};
}
if (msg.head_.que_ == que) continue; // pop next
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length
std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.head_.remain_);
auto remain = static_cast<std::size_t>(static_cast<int>(data_length) + msg.head_.remain_);
// find cache with msg.head_.id_
auto cac_it = rc.find(msg.head_.id_);
if (cac_it == rc.end()) {
if (remain <= data_length) {
return make_cache(&(msg.data_), remain);
}
else {
// gc
if (rc.size() > 1024) {
std::vector<msg_id_t> need_del;
for (auto const & pair : rc) {
auto cmp = std::minmax(msg.head_.id_, pair.first);
if (cmp.second - cmp.first > 8192) {
need_del.push_back(pair.first);
}
}
for (auto id : need_del) rc.erase(id);
}
// cache the first message fragment
else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) });
rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) });
}
}
// has cached before this message
else {
@ -248,6 +319,10 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
}
}
static buff_t try_recv(ipc::handle_t h) {
return recv(h, 0);
}
}; // detail_impl<Policy>
template <typename Flag>
@ -257,15 +332,6 @@ using policy_t = policy::choose<circ::elem_array, Flag>;
namespace ipc {
namespace detail {
std::size_t calc_unique_id() {
static shm::handle g_shm { "__IPC_GLOBAL_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
return static_cast<std::atomic<std::size_t>*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed);
}
} // namespace detail
template <typename Flag>
ipc::handle_t chan_impl<Flag>::connect(char const * name) {
return detail_impl<policy_t<Flag>>::connect(name);
@ -296,6 +362,16 @@ buff_t chan_impl<Flag>::recv(ipc::handle_t h, std::size_t tm) {
return detail_impl<policy_t<Flag>>::recv(h, tm);
}
template <typename Flag>
bool chan_impl<Flag>::try_send(ipc::handle_t h, void const * data, std::size_t size) {
return detail_impl<policy_t<Flag>>::try_send(h, data, size);
}
template <typename Flag>
buff_t chan_impl<Flag>::try_recv(ipc::handle_t h) {
return detail_impl<policy_t<Flag>>::try_recv(h);
}
template struct chan_impl<ipc::wr<relat::single, relat::single, trans::unicast >>;
template struct chan_impl<ipc::wr<relat::single, relat::multi , trans::unicast >>;
template struct chan_impl<ipc::wr<relat::multi , relat::multi , trans::unicast >>;

View File

@ -83,8 +83,6 @@ constexpr auto shared_lock(T&& lc) {
#endif/*__cplusplus < 201703L*/
IPC_EXPORT std::size_t calc_unique_id();
template <typename F, typename D>
constexpr decltype(auto) static_switch(std::size_t /*i*/, std::index_sequence<>, F&& /*f*/, D&& def) {
return def();

View File

@ -20,8 +20,9 @@ namespace detail {
inline static void calc_wait_time(timespec& ts, std::size_t tm) {
::clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (tm / 1000); // seconds
ts.tv_nsec += (tm % 1000) * 1000000; // nanoseconds
ts.tv_nsec += tm * 1000000; // nanoseconds
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
}
#pragma push_macro("IPC_PTHREAD_FUNC_")
@ -110,7 +111,13 @@ public:
else {
timespec ts;
calc_wait_time(ts, tm);
IPC_PTHREAD_FUNC_(pthread_cond_timedwait, &cond_, &mtx.native(), &ts);
int eno;
if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
return false;
}
return true;
}
}
@ -173,7 +180,12 @@ public:
else {
timespec ts;
calc_wait_time(ts, tm);
IPC_SEMAPHORE_FUNC_(sem_timedwait, h, &ts);
if (::sem_timedwait(h, &ts) != 0) {
ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
errno, tm, ts.tv_sec, ts.tv_nsec);
return false;
}
return true;
}
}

View File

@ -232,6 +232,11 @@ class waiter : public detail::waiter_wrapper {
using detail::waiter_wrapper::attach;
public:
waiter() = default;
waiter(char const * name) {
open(name);
}
~waiter() {
close();
}

View File

@ -44,6 +44,11 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
return true;
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
return push(wrapper, std::forward<F>(f), elems);
}
template <typename W, typename F, typename E>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) {
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
@ -130,6 +135,11 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
return true;
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
return push(wrapper, std::forward<F>(f), elems); /* TBD */
}
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
byte_t buff[DS];
@ -186,7 +196,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
// check all consumers have finished reading this element
rc_t expected = 0;
if (!el->rc_.compare_exchange_strong(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
expected, static_cast<rc_t>(conn_cnt), std::memory_order_acq_rel)) {
return false; // full
}
std::forward<F>(f)(&(el->data_));
@ -194,6 +204,18 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
return true;
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false;
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// reset reading flag
el->rc_.store(static_cast<rc_t>(conn_cnt), std::memory_order_relaxed);
std::forward<F>(f)(&(el->data_));
wt_.fetch_add(1, std::memory_order_release);
return true;
}
template <typename W, typename F, typename E>
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) {
if (cur == cursor()) return false; // acquire
@ -240,7 +262,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E>
bool push(W* wrapper, F&& f, E* elems) {
E* el;
circ::u2_t cur_ct, nxt_ct;
circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) {
@ -263,7 +285,30 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
ipc::yield(k);
}
// only one thread/process would touch here at one time
ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release);
ct_.store(cur_ct + 1, std::memory_order_release);
std::forward<F>(f)(&(el->data_));
// set flag & try update wt
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
return true;
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
E* el;
circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) {
return false; // no reader
}
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
auto cur_rc = el->rc_.load(std::memory_order_acquire);
el->rc_.store(static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed);
if (ct_.compare_exchange_weak(cur_ct, cur_ct + 1, std::memory_order_release)) {
break;
}
ipc::yield(k);
}
std::forward<F>(f)(&(el->data_));
// set flag & try update wt
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);

View File

@ -145,6 +145,14 @@ public:
});
}
template <typename T, typename... P>
auto force_push(P&&... params) {
if (elems_ == nullptr) return false;
return elems_->force_push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
});
}
template <typename T>
bool pop(T& item) {
if (elems_ == nullptr) {
@ -166,14 +174,17 @@ public:
using value_t = T;
using base_t::base_t;
using base_t::push;
using base_t::pop;
template <typename... P>
auto push(P&&... params) {
return base_t::template push<T>(std::forward<P>(params)...);
}
template <typename... P>
auto force_push(P&&... params) {
return base_t::template force_push<T>(std::forward<P>(params)...);
}
bool pop(T& item) {
return base_t::pop(item);
}