Merge pull request #148 from mutouyun/yonker-yk-master

Yonker yk master
This commit is contained in:
木头云 2025-05-11 21:40:37 +08:00 committed by GitHub
commit a0c7725a14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 23 additions and 6 deletions

View File

@ -74,6 +74,10 @@ public:
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
}
bool connected(cc_t cc_id) const noexcept {
return (this->connections() & cc_id) != 0;
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
cc_t cur = this->cc_.load(order);
cc_t cnt; // accumulates the total bits set in cc
@ -100,6 +104,11 @@ public:
}
}
bool connected(cc_t cc_id) const noexcept {
// In non-broadcast mode, connection tags are only used for counting.
return (this->connections() != 0) && (cc_id != 0);
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->connections(order);
}

View File

@ -627,7 +627,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
for (;;) {
// pop a new message
typename queue_t::value_t msg {};
if (!wait_for(inf->rd_waiter_, [que, &msg] {
if (!wait_for(inf->rd_waiter_, [que, &msg, &h] {
if (!que->connected()) {
reconnect(&h, true);
}
return !que->pop(msg);
}, tm)) {
// pop failed, just return.

View File

@ -63,8 +63,9 @@ public:
shm::handle::clear_storage(name);
}
bool connected() const noexcept {
return connected_ != 0;
template <typename Elems>
bool connected(Elems* elems) const noexcept {
return elems->connected(connected_);
}
circ::cc_t connected_id() const noexcept {
@ -77,16 +78,16 @@ public:
-> std::tuple<bool, bool, decltype(std::declval<Elems>().cursor())> {
if (elems == nullptr) return {};
// if it's already connected, just return
if (connected()) return {connected(), false, 0};
if (connected(elems)) return {connected(elems), false, 0};
connected_ = elems->connect_receiver();
return {connected(), true, elems->cursor()};
return {connected(elems), true, elems->cursor()};
}
template <typename Elems>
bool disconnect(Elems* elems) noexcept {
if (elems == nullptr) return false;
// if it's already disconnected, just return false
if (!connected()) return false;
if (!connected(elems)) return false;
elems->disconnect_receiver(std::exchange(connected_, 0));
return true;
}
@ -150,6 +151,10 @@ public:
elems_->disconnect_sender();
}
bool connected() const noexcept {
return base_t::connected(elems_);
}
bool connect() noexcept {
auto tp = base_t::connect(elems_);
if (std::get<0>(tp) && std::get<1>(tp)) {