diff --git a/src/ipc.cpp b/src/ipc.cpp index cd15a88..6731ba5 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -555,7 +555,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { recycle_storage(reinterpret_cast(ptr) - 1, size); }, reinterpret_cast(buf_id + 1) }; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg.id_, remain); + else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } // gc if (rc.size() > 1024) { diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index 735abbc..d5684fa 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -376,9 +376,14 @@ struct prod_cons_impl> { } // just compare & exchange if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && - epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { - break; + cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed)) { + if (epoch == epoch_.load(std::memory_order_acquire)) { + break; + } + else if (push(wrapper, std::forward(f), elems)) { + return true; + } + epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; } ipc::yield(k); } diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index ed267b6..a32035b 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -55,7 +55,7 @@ struct waiter_helper { } else if (flags.need_dest_.exchange(false, std::memory_order_release)) { ret = false; - ctrl.sema_wait(tm); + ctrl.sema_wait(default_timeout); break; } else { @@ -106,6 +106,7 @@ struct waiter_helper { template static bool quit_waiting(Ctrl & ctrl) { auto & flags = ctrl.flags(); + flags.need_dest_.store(true, std::memory_order_relaxed); if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { return true; } @@ -116,7 +117,6 @@ struct waiter_helper { bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); if (counter.counter_ > 0) { - flags.need_dest_.store(true, std::memory_order_relaxed); ret = ctrl.sema_post(counter.counter_); counter.counter_ -= 1; ret = ret && ctrl.handshake_wait(default_timeout);