In non-broadcast mode, connection tags are only used for counting.

This commit is contained in:
mutouyun 2025-05-10 15:14:39 +08:00
parent 87b1fa4abc
commit a1cdc9a711
3 changed files with 16 additions and 7 deletions

View File

@ -74,6 +74,10 @@ public:
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
}
bool connected(cc_t cc_id) const noexcept {
return (this->connections() & cc_id) != 0;
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
cc_t cur = this->cc_.load(order);
cc_t cnt; // accumulates the total bits set in cc
@ -100,6 +104,11 @@ public:
}
}
bool connected(cc_t cc_id) const noexcept {
// In non-broadcast mode, connection tags are only used for counting.
return (this->connections() != 0) && (cc_id != 0);
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->connections(order);
}

View File

@ -628,7 +628,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
// pop a new message
typename queue_t::value_t msg {};
if (!wait_for(inf->rd_waiter_, [que, &msg, &h] {
if (!que->connected(que->elems())) {
if (!que->connected()) {
reconnect(&h, true);
}
return !que->pop(msg);

View File

@ -63,13 +63,9 @@ public:
shm::handle::clear_storage(name);
}
bool connected() const noexcept {
return connected_ != 0;
}
template <typename Elems>
bool connected(Elems* elems) noexcept {
return connected_ & elems->connections();
bool connected(Elems* elems) const noexcept {
return elems->connected(connected_);
}
circ::cc_t connected_id() const noexcept {
@ -155,6 +151,10 @@ public:
elems_->disconnect_sender();
}
bool connected() const noexcept {
return base_t::connected(elems_);
}
bool connect() noexcept {
auto tp = base_t::connect(elems_);
if (std::get<0>(tp) && std::get<1>(tp)) {