mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
commit
9667d5078f
@ -555,7 +555,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
|
|||||||
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
|
recycle_storage(reinterpret_cast<std::size_t>(ptr) - 1, size);
|
||||||
}, reinterpret_cast<void*>(buf_id + 1) };
|
}, reinterpret_cast<void*>(buf_id + 1) };
|
||||||
}
|
}
|
||||||
else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg.id_, remain);
|
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
|
||||||
}
|
}
|
||||||
// gc
|
// gc
|
||||||
if (rc.size() > 1024) {
|
if (rc.size() > 1024) {
|
||||||
|
|||||||
@ -376,10 +376,15 @@ struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
|||||||
}
|
}
|
||||||
// just compare & exchange
|
// just compare & exchange
|
||||||
if (el->rc_.compare_exchange_weak(
|
if (el->rc_.compare_exchange_weak(
|
||||||
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) &&
|
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed)) {
|
||||||
epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) {
|
if (epoch == epoch_.load(std::memory_order_acquire)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
else if (push(wrapper, std::forward<F>(f), elems)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
|
||||||
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
// only one thread/process would touch here at one time
|
// only one thread/process would touch here at one time
|
||||||
|
|||||||
@ -55,7 +55,7 @@ struct waiter_helper {
|
|||||||
}
|
}
|
||||||
else if (flags.need_dest_.exchange(false, std::memory_order_release)) {
|
else if (flags.need_dest_.exchange(false, std::memory_order_release)) {
|
||||||
ret = false;
|
ret = false;
|
||||||
ctrl.sema_wait(tm);
|
ctrl.sema_wait(default_timeout);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -106,6 +106,7 @@ struct waiter_helper {
|
|||||||
template <typename Ctrl>
|
template <typename Ctrl>
|
||||||
static bool quit_waiting(Ctrl & ctrl) {
|
static bool quit_waiting(Ctrl & ctrl) {
|
||||||
auto & flags = ctrl.flags();
|
auto & flags = ctrl.flags();
|
||||||
|
flags.need_dest_.store(true, std::memory_order_relaxed);
|
||||||
if (!flags.is_waiting_.exchange(false, std::memory_order_release)) {
|
if (!flags.is_waiting_.exchange(false, std::memory_order_release)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -116,7 +117,6 @@ struct waiter_helper {
|
|||||||
bool ret = true;
|
bool ret = true;
|
||||||
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
IPC_UNUSED_ auto guard = ctrl.get_lock();
|
||||||
if (counter.counter_ > 0) {
|
if (counter.counter_ > 0) {
|
||||||
flags.need_dest_.store(true, std::memory_order_relaxed);
|
|
||||||
ret = ctrl.sema_post(counter.counter_);
|
ret = ctrl.sema_post(counter.counter_);
|
||||||
counter.counter_ -= 1;
|
counter.counter_ -= 1;
|
||||||
ret = ret && ctrl.handshake_wait(default_timeout);
|
ret = ret && ctrl.handshake_wait(default_timeout);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user