Merge pull request #20 from mutouyun/develop

Develop
This commit is contained in:
木头云 2020-09-26 19:20:02 +08:00 committed by GitHub
commit 5eb77db4cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 30 deletions

View File

@ -21,7 +21,7 @@ constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16; constexpr std::size_t const max_sz = 1024 * 16;
std::atomic<bool> is_quit__{ false }; std::atomic<bool> is_quit__{ false };
std::atomic<std::size_t> size_per_1s__{ 0 }; std::atomic<std::size_t> size_counter__{ 0 };
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>; using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
@ -30,13 +30,13 @@ ipc::byte_t buff__[max_sz];
capo::random<> rand__{ min_sz, max_sz }; capo::random<> rand__{ min_sz, max_sz };
inline std::string str_of_size(std::size_t sz) noexcept { inline std::string str_of_size(std::size_t sz) noexcept {
if (sz <= 1024) { if (sz > 1024 * 1024) {
return std::to_string(sz) + " bytes"; return std::to_string(sz / (1024 * 1024)) + " MB";
} }
if (sz <= 1024 * 1024) { if (sz > 1024) {
return std::to_string(sz / 1024) + " KB"; return std::to_string(sz / 1024) + " KB";
} }
return std::to_string(sz / (1024 * 1024)) + " MB"; return std::to_string(sz) + " bytes";
} }
inline std::string speed_of(std::size_t sz) noexcept { inline std::string speed_of(std::size_t sz) noexcept {
@ -49,9 +49,8 @@ void do_counting() {
if (i % 10) continue; if (i % 10) continue;
i = 0; i = 0;
std::cout std::cout
<< speed_of(size_per_1s__.load(std::memory_order_acquire)) << speed_of(size_counter__.exchange(0, std::memory_order_relaxed))
<< std::endl; << std::endl;
size_per_1s__.store(0, std::memory_order_release);
} }
} }
@ -76,7 +75,7 @@ void do_send() {
break; break;
} }
} }
size_per_1s__.fetch_add(sz, std::memory_order_release); size_counter__.fetch_add(sz, std::memory_order_relaxed);
std::this_thread::yield(); std::this_thread::yield();
} }
counting.join(); counting.join();
@ -97,7 +96,7 @@ void do_recv() {
while (!is_quit__.load(std::memory_order_acquire)) { while (!is_quit__.load(std::memory_order_acquire)) {
auto msg = que__.recv(); auto msg = que__.recv();
if (msg.empty()) break; if (msg.empty()) break;
size_per_1s__.fetch_add(msg.size(), std::memory_order_release); size_counter__.fetch_add(msg.size(), std::memory_order_relaxed);
} }
counting.join(); counting.join();
} }

View File

@ -26,7 +26,7 @@ inline static bool calc_wait_time(timespec& ts, std::size_t tm /*ms*/) {
timeval now; timeval now;
int eno = ::gettimeofday(&now, NULL); int eno = ::gettimeofday(&now, NULL);
if (eno != 0) { if (eno != 0) {
ipc::error("fail gettimeofday[%d]\n", eno); ipc::error("fail gettimeofday [%d]\n", eno);
return false; return false;
} }
ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000; ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000;
@ -40,7 +40,7 @@ inline static bool calc_wait_time(timespec& ts, std::size_t tm /*ms*/) {
#define IPC_PTHREAD_FUNC_(CALL, ...) \ #define IPC_PTHREAD_FUNC_(CALL, ...) \
int eno; \ int eno; \
if ((eno = ::CALL(__VA_ARGS__)) != 0) { \ if ((eno = ::CALL(__VA_ARGS__)) != 0) { \
ipc::error("fail " #CALL "[%d]\n", eno); \ ipc::error("fail " #CALL " [%d]\n", eno); \
return false; \ return false; \
} \ } \
return true return true
@ -146,7 +146,11 @@ public:
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
default: { default: {
timespec ts; timespec ts;
calc_wait_time(ts, tm); if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
return false;
}
int eno; int eno;
if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) {
if (eno != ETIMEDOUT) { if (eno != ETIMEDOUT) {
@ -226,10 +230,14 @@ public:
IPC_SEMAPHORE_FUNC_(sem_wait, h); IPC_SEMAPHORE_FUNC_(sem_wait, h);
default: { default: {
timespec ts; timespec ts;
calc_wait_time(ts, tm); if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
return false;
}
if (::sem_timedwait(h, &ts) != 0) { if (::sem_timedwait(h, &ts) != 0) {
if (errno != ETIMEDOUT) { if (errno != ETIMEDOUT) {
ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", ipc::error("fail sem_timedwait [%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
errno, tm, ts.tv_sec, ts.tv_nsec); errno, tm, ts.tv_sec, ts.tv_nsec);
} }
return false; return false;

View File

@ -10,14 +10,8 @@
namespace ipc { namespace ipc {
namespace detail { namespace detail {
class waiter_helper { struct waiter_helper {
enum : unsigned {
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1,
destruct_flag = ~destruct_mask
};
public:
struct wait_counter { struct wait_counter {
std::atomic<unsigned> waiting_ { 0 }; std::atomic<unsigned> waiting_ { 0 };
long counter_ = 0; long counter_ = 0;
@ -26,6 +20,7 @@ public:
struct wait_flags { struct wait_flags {
std::atomic<bool> is_waiting_ { false }; std::atomic<bool> is_waiting_ { false };
std::atomic<bool> is_closed_ { true }; std::atomic<bool> is_closed_ { true };
std::atomic<bool> need_dest_ { false };
}; };
template <typename Mutex, typename Ctrl, typename F> template <typename Mutex, typename Ctrl, typename F>
@ -54,11 +49,19 @@ public:
bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed);
bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); bool is_closed = flags.is_closed_ .load(std::memory_order_acquire);
if (!is_waiting || is_closed) { if (!is_waiting || is_closed) {
flags.need_dest_.store(false, std::memory_order_release);
ret = false; ret = false;
break; break;
} }
ret = ctrl.sema_wait(tm); else if (flags.need_dest_.exchange(false, std::memory_order_release)) {
} while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag); ret = false;
ctrl.sema_wait(tm);
break;
}
else {
ret = ctrl.sema_wait(tm);
}
} while (flags.need_dest_.load(std::memory_order_acquire));
finally.do_exit(); finally.do_exit();
ret = ctrl.handshake_post(1) && ret; ret = ctrl.handshake_post(1) && ret;
@ -69,7 +72,7 @@ public:
template <typename Ctrl> template <typename Ctrl>
static bool notify(Ctrl & ctrl) { static bool notify(Ctrl & ctrl) {
auto & counter = ctrl.counter(); auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true; return true;
} }
bool ret = true; bool ret = true;
@ -85,7 +88,7 @@ public:
template <typename Ctrl> template <typename Ctrl>
static bool broadcast(Ctrl & ctrl) { static bool broadcast(Ctrl & ctrl) {
auto & counter = ctrl.counter(); auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true; return true;
} }
bool ret = true; bool ret = true;
@ -107,16 +110,13 @@ public:
return true; return true;
} }
auto & counter = ctrl.counter(); auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true; return true;
} }
bool ret = true; bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock(); IPC_UNUSED_ auto guard = ctrl.get_lock();
counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed);
IPC_UNUSED_ auto finally = ipc::guard([&counter] {
counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed);
});
if (counter.counter_ > 0) { if (counter.counter_ > 0) {
flags.need_dest_.store(true, std::memory_order_relaxed);
ret = ctrl.sema_post(counter.counter_); ret = ctrl.sema_post(counter.counter_);
counter.counter_ -= 1; counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout); ret = ret && ctrl.handshake_wait(default_timeout);