From 8e6ae4220b9351d1fb1dc30940a7c88e6de6894a Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 9 Jan 2021 21:42:56 +0800 Subject: [PATCH 1/3] =?UTF-8?q?mmb=20force=5Fpush=20=E5=87=BA=E7=8E=B0?= =?UTF-8?q?=E7=AB=9E=E4=BA=89=E6=97=B6=EF=BC=8C=E5=BA=94=E8=AF=A5=E5=9C=A8?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=90=8E=E5=85=88=E6=AD=A3=E5=B8=B8push?= =?UTF-8?q?=E4=B8=80=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/prod_cons.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index 735abbc..d5684fa 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -376,9 +376,14 @@ struct prod_cons_impl> { } // just compare & exchange if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && - epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { - break; + cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed)) { + if (epoch == epoch_.load(std::memory_order_acquire)) { + break; + } + else if (push(wrapper, std::forward(f), elems)) { + return true; + } + epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; } ipc::yield(k); } From 152b5515d27a5b089c5ace83ffadb4123d9a4f4c Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 10 Jan 2021 14:29:23 +0800 Subject: [PATCH 2/3] fix: recv may block after disconnect --- src/ipc.cpp | 2 +- src/libipc/waiter_helper.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index cd15a88..6731ba5 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -555,7 +555,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { recycle_storage(reinterpret_cast(ptr) - 1, size); }, reinterpret_cast(buf_id + 1) }; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg.id_, remain); + else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } // gc if (rc.size() > 1024) { diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index ed267b6..c4dfc02 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -55,7 +55,7 @@ struct waiter_helper { } else if (flags.need_dest_.exchange(false, std::memory_order_release)) { ret = false; - ctrl.sema_wait(tm); + ctrl.sema_wait(default_timeout); break; } else { @@ -100,12 +100,13 @@ struct waiter_helper { ret = ret && ctrl.handshake_wait(default_timeout); } while (counter.counter_ > 0); } - return ret; + return ret; } template static bool quit_waiting(Ctrl & ctrl) { auto & flags = ctrl.flags(); + flags.need_dest_.store(true, std::memory_order_relaxed); if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { return true; } @@ -116,7 +117,6 @@ struct waiter_helper { bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); if (counter.counter_ > 0) { - flags.need_dest_.store(true, std::memory_order_relaxed); ret = ctrl.sema_post(counter.counter_); counter.counter_ -= 1; ret = ret && ctrl.handshake_wait(default_timeout); From a7bec6348b47a67ae30af866f51c7469e1907fbb Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 10 Jan 2021 14:45:17 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/waiter_helper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index c4dfc02..a32035b 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -100,7 +100,7 @@ struct waiter_helper { ret = ret && ctrl.handshake_wait(default_timeout); } while (counter.counter_ > 0); } - return ret; + return ret; } template