mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
use ipc::yield
This commit is contained in:
parent
621078abbd
commit
27eb5ee99b
@ -5,6 +5,7 @@
|
|||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
#include "rw_lock.h"
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace circ {
|
namespace circ {
|
||||||
@ -99,7 +100,7 @@ struct prod_cons<relat::single, relat::multi, trans::unicast>
|
|||||||
template <typename E, typename F, std::size_t S>
|
template <typename E, typename F, std::size_t S>
|
||||||
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
|
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
|
||||||
byte_t buff[sizeof(detail::elem_t<S>)];
|
byte_t buff[sizeof(detail::elem_t<S>)];
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_acquire);
|
auto cur_rd = rd_.load(std::memory_order_acquire);
|
||||||
if (detail::index_of(cur_rd) ==
|
if (detail::index_of(cur_rd) ==
|
||||||
detail::index_of(wt_.load(std::memory_order_relaxed))) {
|
detail::index_of(wt_.load(std::memory_order_relaxed))) {
|
||||||
@ -110,7 +111,7 @@ struct prod_cons<relat::single, relat::multi, trans::unicast>
|
|||||||
std::forward<F>(f)(buff);
|
std::forward<F>(f)(buff);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -124,7 +125,7 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
|
|||||||
template <typename E, typename F, std::size_t S>
|
template <typename E, typename F, std::size_t S>
|
||||||
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
|
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
|
||||||
detail::u2_t cur_ct, nxt_ct;
|
detail::u2_t cur_ct, nxt_ct;
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
cur_ct = ct_.load(std::memory_order_acquire);
|
cur_ct = ct_.load(std::memory_order_acquire);
|
||||||
if (detail::index_of(nxt_ct = cur_ct + 1) ==
|
if (detail::index_of(nxt_ct = cur_ct + 1) ==
|
||||||
detail::index_of(rd_.load(std::memory_order_relaxed))) {
|
detail::index_of(rd_.load(std::memory_order_relaxed))) {
|
||||||
@ -133,15 +134,15 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
|
|||||||
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
|
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(elem_start + detail::index_of(cur_ct));
|
std::forward<F>(f)(elem_start + detail::index_of(cur_ct));
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
auto exp_wt = cur_ct;
|
auto exp_wt = cur_ct;
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::sleep(k);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -170,13 +171,13 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
|
|||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed));
|
auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
rc_t expected = 0;
|
rc_t expected = 0;
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::sleep(k);
|
||||||
conn_cnt = elems->conn_count(); // acquire
|
conn_cnt = elems->conn_count(); // acquire
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
}
|
}
|
||||||
@ -190,7 +191,7 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
|
|||||||
if (cur == cursor()) return false; // acquire
|
if (cur == cursor()) return false; // acquire
|
||||||
auto el = elem_start + detail::index_of(cur++);
|
auto el = elem_start + detail::index_of(cur++);
|
||||||
std::forward<F>(f)(el->data_);
|
std::forward<F>(f)(el->data_);
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
|
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
|
||||||
if (cur_rc == 0) {
|
if (cur_rc == 0) {
|
||||||
return true;
|
return true;
|
||||||
@ -199,7 +200,7 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
|
|||||||
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@ -10,6 +10,7 @@
|
|||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
#include "rw_lock.h"
|
||||||
#include "circ_elem_array.h"
|
#include "circ_elem_array.h"
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
@ -97,7 +98,7 @@ public:
|
|||||||
|
|
||||||
template <typename F>
|
template <typename F>
|
||||||
static queue* multi_wait_for(F&& upd) noexcept {
|
static queue* multi_wait_for(F&& upd) noexcept {
|
||||||
for (uint_t<32> k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto [ques, size] = upd();
|
auto [ques, size] = upd();
|
||||||
for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
|
for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
|
||||||
queue* que = ques[i];
|
queue* que = ques[i];
|
||||||
@ -107,11 +108,7 @@ public:
|
|||||||
return que;
|
return que;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (k < 4096) {
|
ipc::sleep(k);
|
||||||
std::this_thread::yield();
|
|
||||||
++k;
|
|
||||||
}
|
|
||||||
else std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -102,9 +102,7 @@ public:
|
|||||||
void disconnect();
|
void disconnect();
|
||||||
|
|
||||||
std::size_t recv_count() const;
|
std::size_t recv_count() const;
|
||||||
|
void wait_for_recv(std::size_t r_count) const;
|
||||||
bool wait_for_recv(std::size_t r_count, std::size_t until) const;
|
|
||||||
bool wait_for_recv(std::size_t r_count) const;
|
|
||||||
|
|
||||||
bool send(void const * data, std::size_t size);
|
bool send(void const * data, std::size_t size);
|
||||||
bool send(buff_t const & buff);
|
bool send(buff_t const & buff);
|
||||||
|
|||||||
@ -57,12 +57,15 @@
|
|||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
inline void yield(unsigned& k) noexcept {
|
template <std::size_t N = 32, typename K>
|
||||||
|
inline void yield(K& k) noexcept {
|
||||||
if (k < 4) { /* Do nothing */ }
|
if (k < 4) { /* Do nothing */ }
|
||||||
else
|
else
|
||||||
if (k < 16) { IPC_LOCK_PAUSE_(); }
|
if (k < 16) { IPC_LOCK_PAUSE_(); }
|
||||||
else
|
else
|
||||||
if (k < 32) { std::this_thread::yield(); }
|
if (k < static_cast<K>(N)) {
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
return;
|
return;
|
||||||
@ -70,6 +73,11 @@ inline void yield(unsigned& k) noexcept {
|
|||||||
++k;
|
++k;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <std::size_t N = 4096, typename K>
|
||||||
|
inline void sleep(K& k) noexcept {
|
||||||
|
ipc::yield<N>(k);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|
||||||
#pragma pop_macro("IPC_LOCK_PAUSE_")
|
#pragma pop_macro("IPC_LOCK_PAUSE_")
|
||||||
|
|||||||
@ -27,14 +27,6 @@ struct ch_info_t {
|
|||||||
};
|
};
|
||||||
#pragma pack()
|
#pragma pack()
|
||||||
|
|
||||||
inline bool wait_for_recv(route const & rt, std::size_t r_count, std::size_t until) {
|
|
||||||
for (unsigned k = 0; rt.recv_count() < r_count; ++k) {
|
|
||||||
if (k > until) return false;
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////
|
||||||
@ -154,12 +146,10 @@ std::size_t channel::recv_count() const {
|
|||||||
return impl(p_)->r_.recv_count();
|
return impl(p_)->r_.recv_count();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool channel::wait_for_recv(std::size_t r_count, std::size_t until) const {
|
void channel::wait_for_recv(std::size_t r_count) const {
|
||||||
return ::wait_for_recv(impl(p_)->sender(), r_count, until);
|
for (unsigned k = 0; impl(p_)->sender().recv_count() < r_count;) {
|
||||||
}
|
ipc::sleep(k);
|
||||||
|
}
|
||||||
bool channel::wait_for_recv(std::size_t r_count) const {
|
|
||||||
return wait_for_recv(r_count, (std::numeric_limits<std::size_t>::max)());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool channel::send(void const * data, std::size_t size) {
|
bool channel::send(void const * data, std::size_t size) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user