From 5affd26da6e38045deb13aa3ac136efa514ddf71 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Tue, 26 Mar 2019 19:12:59 +0800 Subject: [PATCH] try send/recv; support sending timeout; fix bugs --- demo/chat/main.cpp | 16 ++-- include/def.h | 2 +- include/ipc.h | 23 +++--- src/circ/elem_array.h | 54 +++---------- src/ipc.cpp | 148 +++++++++++++++++++++++++--------- src/platform/detail.h | 2 - src/platform/waiter_linux.h | 20 ++++- src/platform/waiter_wrapper.h | 5 ++ src/prod_cons.h | 51 +++++++++++- src/queue.h | 15 +++- 10 files changed, 226 insertions(+), 110 deletions(-) diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index eb71573..06098bb 100644 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -2,27 +2,25 @@ #include #include #include +#include #include "ipc.h" -namespace ipc { -namespace detail { - -IPC_EXPORT std::size_t calc_unique_id(); - -} // namespace detail -} // namespace ipc - namespace { char name__[] = "ipc-chat"; char quit__[] = "q"; char id__ [] = "c"; +std::size_t calc_unique_id() { + static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic) }; + return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); +} + } // namespace int main() { - std::string buf, id = id__ + std::to_string(ipc::detail::calc_unique_id()); + std::string buf, id = id__ + std::to_string(calc_unique_id()); std::regex reg { "(c\\d+)> (.*)" }; ipc::channel cc { name__ }; diff --git a/include/def.h b/include/def.h index 945c0ef..3845e37 100644 --- a/include/def.h +++ b/include/def.h @@ -30,7 +30,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, name_length = 64, - send_wait_for = 100 // ms + send_wait = 100 // ms }; enum class relat { // multiplicity of the relationship diff --git a/include/ipc.h b/include/ipc.h index b851d70..dca60cc 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -23,6 +23,9 @@ struct IPC_EXPORT chan_impl { static bool send(handle_t h, void const * data, std::size_t size); static buff_t recv(handle_t h, std::size_t tm); + + static bool try_send(handle_t h, void const * data, std::size_t size); + static buff_t try_recv(handle_t h); }; template @@ -100,21 +103,21 @@ public: return chan_wrapper(name).wait_for_recv(r_count, tm); } - bool send(void const * data, std::size_t size) { - return detail_t::send(h_, data, size); - } + bool send (void const * data, std::size_t size) { return detail_t::send(h_, data, size) ; } + bool send (buff_t const & buff) { return this->send(buff.data(), buff.size()) ; } + bool send (std::string const & str) { return this->send(str.c_str(), str.size() + 1); } - bool send(buff_t const & buff) { - return this->send(buff.data(), buff.size()); - } - - bool send(std::string const & str) { - return this->send(str.c_str(), str.size() + 1); - } + bool try_send(void const * data, std::size_t size) { return detail_t::try_send(h_, data, size) ; } + bool try_send(buff_t const & buff) { return this->try_send(buff.data(), buff.size()) ; } + bool try_send(std::string const & str) { return this->try_send(str.c_str(), str.size() + 1); } buff_t recv(std::size_t tm = invalid_value) { return detail_t::recv(h_, tm); } + + buff_t try_recv() { + return detail_t::try_recv(h_); + } }; template diff --git a/src/circ/elem_array.h b/src/circ/elem_array.h index b5ef71f..8ed8116 100644 --- a/src/circ/elem_array.h +++ b/src/circ/elem_array.h @@ -11,16 +11,17 @@ namespace ipc { namespace circ { -namespace detail { template -class elem_array { +class elem_array : public ipc::circ::conn_head { public: + using base_t = ipc::circ::conn_head; using policy_t = Policy; using cursor_t = decltype(std::declval().cursor()); using elem_t = typename policy_t::template elem_t; enum : std::size_t { + head_size = sizeof(base_t) + sizeof(policy_t), data_size = DataSize, elem_max = (std::numeric_limits>::max)() + 1, // default is 255 + 1 elem_size = sizeof(elem_t), @@ -29,60 +30,27 @@ public: private: policy_t head_; - elem_t block_[elem_max]; + elem_t block_[elem_max]; public: cursor_t cursor() const noexcept { return head_.cursor(); } - template - bool push(E* elems, F&& f) { - return head_.push(elems, std::forward(f), block_); - } - - template - bool pop(E* elems, cursor_t* cur, F&& f) { - if (cur == nullptr) return false; - return head_.pop(elems, *cur, std::forward(f), block_); - } -}; - -} // namespace detail - -template -class elem_array : public ipc::circ::conn_head { -public: - using base_t = ipc::circ::conn_head; - using array_t = detail::elem_array; - using policy_t = typename array_t::policy_t; - using cursor_t = typename array_t::cursor_t; - using elem_t = typename array_t::elem_t; - - enum : std::size_t { - head_size = sizeof(base_t) + sizeof(policy_t), - data_size = array_t::data_size, - elem_max = array_t::elem_max, - elem_size = array_t::elem_size, - block_size = array_t::block_size - }; - -private: - array_t array_; - -public: - cursor_t cursor() const noexcept { - return array_.cursor(); + template + bool push(F&& f) { + return head_.push(this, std::forward(f), block_); } template - bool push(F&& f) { - return array_.push(this, std::forward(f)); + bool force_push(F&& f) { + return head_.force_push(this, std::forward(f), block_); } template bool pop(cursor_t* cur, F&& f) { - return array_.pop(this, cur, std::forward(f)); + if (cur == nullptr) return false; + return head_.pop(this, *cur, std::forward(f), block_); } }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 2d8cf2b..05d5bad 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include "def.h" #include "shm.h" @@ -15,6 +16,7 @@ #include "queue.h" #include "policy.h" #include "rw_lock.h" +#include "log.h" #include "memory/resource.h" @@ -75,14 +77,33 @@ struct cache_t { } }; +struct conn_info_head { + using acc_t = std::atomic; + + waiter cc_waiter_, wt_waiter_, rd_waiter_; + shm::handle acc_h_; + + conn_info_head(char const * name) + : cc_waiter_((std::string{ "__CC_CONN__" } + name).c_str()) + , wt_waiter_((std::string{ "__WT_CONN__" } + name).c_str()) + , rd_waiter_((std::string{ "__RD_CONN__" } + name).c_str()) + , acc_h_ ((std::string{ "__AC_CONN__" } + name).c_str(), sizeof(acc_t)) { + } + + auto acc() { + return static_cast(acc_h_.get()); + } +}; + template bool wait_for(W& waiter, F&& pred, std::size_t tm) { for (unsigned k = 0; pred();) { bool loop = true, ret = true; - ipc::sleep(k, [&loop, &ret, &waiter, &pred, tm] { + ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { ret = waiter.wait_if([&loop, &ret, &pred] { return loop = pred(); }, tm); + k = 0; return true; }); if (!ret ) return false; // timeout or fail @@ -96,15 +117,12 @@ struct detail_impl { using queue_t = ipc::queue, Policy>; -struct conn_info_t { +struct conn_info_t : conn_info_head { queue_t que_; - waiter cc_waiter_, wt_waiter_, rd_waiter_; conn_info_t(char const * name) - : que_ ((std::string{ "__QU_CONN__" } + name).c_str()) { - cc_waiter_.open((std::string{ "__CC_CONN__" } + name).c_str()); - wt_waiter_.open((std::string{ "__WT_CONN__" } + name).c_str()); - rd_waiter_.open((std::string{ "__RD_CONN__" } + name).c_str()); + : conn_info_head(name) + , que_((std::string{ "__QU_CONN__" } + name).c_str()) { } }; @@ -169,43 +187,80 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) }, tm); } -static bool send(ipc::handle_t h, void const * data, std::size_t size) { - if (data == nullptr) { - return false; - } - if (size == 0) { +template +static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) { + if (data == nullptr || size == 0) { + ipc::error("fail: send(%p, %zd)\n", data, size); return false; } auto que = queue_of(h); if (que == nullptr) { + ipc::error("fail: send, queue_of(h) == nullptr\n"); return false; } // calc a new message id - auto msg_id = ipc::detail::calc_unique_id(); + auto acc = info_of(h)->acc(); + if (acc == nullptr) { + ipc::error("fail: send, info_of(h)->acc() == nullptr\n"); + return false; + } + auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); + auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); // push message fragment int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { - while (!que->push(que, msg_id, static_cast(size) - offset - static_cast(data_length), - static_cast(data) + offset, data_length)) { - std::this_thread::yield(); + if (!try_push(static_cast(size) - offset - static_cast(data_length), + static_cast(data) + offset, data_length)) { + return false; } - info_of(h)->rd_waiter_.broadcast(); } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; if (remain > 0) { - while (!que->push(que, msg_id, remain - static_cast(data_length), - static_cast(data) + offset, static_cast(remain))) { - std::this_thread::yield(); + if (!try_push(remain - static_cast(data_length), + static_cast(data) + offset, static_cast(remain))) { + return false; } - info_of(h)->rd_waiter_.broadcast(); } return true; } +static bool send(ipc::handle_t h, void const * data, std::size_t size) { + return send([](auto info, auto que, auto msg_id) { + return [info, que, msg_id](int remain, void const * data, std::size_t size) { + if (!wait_for(info->wt_waiter_, [&] { + return !que->push(que, msg_id, remain, data, size); + }, send_wait)) { + if (!que->force_push(que, msg_id, remain, data, size)) { + return false; + } + } + info->rd_waiter_.broadcast(); + return true; + }; + }, h, data, size); +} + +static bool try_send(ipc::handle_t h, void const * data, std::size_t size) { + return send([](auto info, auto que, auto msg_id) { + return [info, que, msg_id](int remain, void const * data, std::size_t size) { + if (!wait_for(info->wt_waiter_, [&] { + return !que->push(que, msg_id, remain, data, size); + }, 0)) { + return false; + } + info->rd_waiter_.broadcast(); + return true; + }; + }, h, data, size); +} + static buff_t recv(ipc::handle_t h, std::size_t tm) { auto que = queue_of(h); - if (que == nullptr) return {}; + if (que == nullptr) { + ipc::error("fail: recv, queue_of(h) == nullptr\n"); + return {}; + } if (que->connect()) { // wouldn't connect twice info_of(h)->cc_waiter_.broadcast(); } @@ -217,19 +272,35 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { [que, &msg] { return !que->pop(msg); }, tm)) { return {}; } - if (msg.head_.que_ == nullptr) return {}; + info_of(h)->wt_waiter_.broadcast(); + if (msg.head_.que_ == nullptr) { + ipc::error("fail: recv, msg.head_.que_ == nullptr\n"); + return {}; + } if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length - std::size_t remain = static_cast( - static_cast(data_length) + msg.head_.remain_); + auto remain = static_cast(static_cast(data_length) + msg.head_.remain_); // find cache with msg.head_.id_ auto cac_it = rc.find(msg.head_.id_); if (cac_it == rc.end()) { if (remain <= data_length) { return make_cache(&(msg.data_), remain); } - // cache the first message fragment - else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) }); + else { + // gc + if (rc.size() > 1024) { + std::vector need_del; + for (auto const & pair : rc) { + auto cmp = std::minmax(msg.head_.id_, pair.first); + if (cmp.second - cmp.first > 8192) { + need_del.push_back(pair.first); + } + } + for (auto id : need_del) rc.erase(id); + } + // cache the first message fragment + rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) }); + } } // has cached before this message else { @@ -248,6 +319,10 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { } } +static buff_t try_recv(ipc::handle_t h) { + return recv(h, 0); +} + }; // detail_impl template @@ -257,15 +332,6 @@ using policy_t = policy::choose; namespace ipc { -namespace detail { - -std::size_t calc_unique_id() { - static shm::handle g_shm { "__IPC_GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; - return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); -} - -} // namespace detail - template ipc::handle_t chan_impl::connect(char const * name) { return detail_impl>::connect(name); @@ -296,6 +362,16 @@ buff_t chan_impl::recv(ipc::handle_t h, std::size_t tm) { return detail_impl>::recv(h, tm); } +template +bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size) { + return detail_impl>::try_send(h, data, size); +} + +template +buff_t chan_impl::try_recv(ipc::handle_t h) { + return detail_impl>::try_recv(h); +} + template struct chan_impl>; template struct chan_impl>; template struct chan_impl>; diff --git a/src/platform/detail.h b/src/platform/detail.h index 54312f8..85be6a3 100644 --- a/src/platform/detail.h +++ b/src/platform/detail.h @@ -83,8 +83,6 @@ constexpr auto shared_lock(T&& lc) { #endif/*__cplusplus < 201703L*/ -IPC_EXPORT std::size_t calc_unique_id(); - template constexpr decltype(auto) static_switch(std::size_t /*i*/, std::index_sequence<>, F&& /*f*/, D&& def) { return def(); diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 516d9fa..42216d5 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -20,8 +20,9 @@ namespace detail { inline static void calc_wait_time(timespec& ts, std::size_t tm) { ::clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += (tm / 1000); // seconds - ts.tv_nsec += (tm % 1000) * 1000000; // nanoseconds + ts.tv_nsec += tm * 1000000; // nanoseconds + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; } #pragma push_macro("IPC_PTHREAD_FUNC_") @@ -110,7 +111,13 @@ public: else { timespec ts; calc_wait_time(ts, tm); - IPC_PTHREAD_FUNC_(pthread_cond_timedwait, &cond_, &mtx.native(), &ts); + int eno; + if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { + ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + eno, tm, ts.tv_sec, ts.tv_nsec); + return false; + } + return true; } } @@ -173,7 +180,12 @@ public: else { timespec ts; calc_wait_time(ts, tm); - IPC_SEMAPHORE_FUNC_(sem_timedwait, h, &ts); + if (::sem_timedwait(h, &ts) != 0) { + ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + errno, tm, ts.tv_sec, ts.tv_nsec); + return false; + } + return true; } } diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 6bcb6d3..e81676c 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -232,6 +232,11 @@ class waiter : public detail::waiter_wrapper { using detail::waiter_wrapper::attach; public: + waiter() = default; + waiter(char const * name) { + open(name); + } + ~waiter() { close(); } diff --git a/src/prod_cons.h b/src/prod_cons.h index ff751e1..3bda83f 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -44,6 +44,11 @@ struct prod_cons_impl> { return true; } + template + bool force_push(W* wrapper, F&& f, E* elems) { + return push(wrapper, std::forward(f), elems); + } + template bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); @@ -130,6 +135,11 @@ struct prod_cons_impl> return true; } + template + bool force_push(W* wrapper, F&& f, E* elems) { + return push(wrapper, std::forward(f), elems); /* TBD */ + } + template class E, std::size_t DS, std::size_t AS> bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { byte_t buff[DS]; @@ -186,7 +196,7 @@ struct prod_cons_impl> { // check all consumers have finished reading this element rc_t expected = 0; if (!el->rc_.compare_exchange_strong( - expected, static_cast(conn_cnt), std::memory_order_release)) { + expected, static_cast(conn_cnt), std::memory_order_acq_rel)) { return false; // full } std::forward(f)(&(el->data_)); @@ -194,6 +204,18 @@ struct prod_cons_impl> { return true; } + template + bool force_push(W* wrapper, F&& f, E* elems) { + auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); + if (conn_cnt == 0) return false; + auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); + // reset reading flag + el->rc_.store(static_cast(conn_cnt), std::memory_order_relaxed); + std::forward(f)(&(el->data_)); + wt_.fetch_add(1, std::memory_order_release); + return true; + } + template bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) { if (cur == cursor()) return false; // acquire @@ -240,7 +262,7 @@ struct prod_cons_impl> { template bool push(W* wrapper, F&& f, E* elems) { E* el; - circ::u2_t cur_ct, nxt_ct; + circ::u2_t cur_ct; for (unsigned k = 0;;) { auto cc = wrapper->conn_count(std::memory_order_relaxed); if (cc == 0) { @@ -263,7 +285,30 @@ struct prod_cons_impl> { ipc::yield(k); } // only one thread/process would touch here at one time - ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release); + ct_.store(cur_ct + 1, std::memory_order_release); + std::forward(f)(&(el->data_)); + // set flag & try update wt + el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); + return true; + } + + template + bool force_push(W* wrapper, F&& f, E* elems) { + E* el; + circ::u2_t cur_ct; + for (unsigned k = 0;;) { + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) { + return false; // no reader + } + el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); + auto cur_rc = el->rc_.load(std::memory_order_acquire); + el->rc_.store(static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed); + if (ct_.compare_exchange_weak(cur_ct, cur_ct + 1, std::memory_order_release)) { + break; + } + ipc::yield(k); + } std::forward(f)(&(el->data_)); // set flag & try update wt el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); diff --git a/src/queue.h b/src/queue.h index 521afe3..aa35fd2 100644 --- a/src/queue.h +++ b/src/queue.h @@ -145,6 +145,14 @@ public: }); } + template + auto force_push(P&&... params) { + if (elems_ == nullptr) return false; + return elems_->force_push([&](void* p) { + ::new (p) T(std::forward

(params)...); + }); + } + template bool pop(T& item) { if (elems_ == nullptr) { @@ -166,14 +174,17 @@ public: using value_t = T; using base_t::base_t; - using base_t::push; - using base_t::pop; template auto push(P&&... params) { return base_t::template push(std::forward

(params)...); } + template + auto force_push(P&&... params) { + return base_t::template force_push(std::forward

(params)...); + } + bool pop(T& item) { return base_t::pop(item); }