using 'signal' to quit waiting explicitly

This commit is contained in:
mutouyun 2021-09-21 13:09:59 +08:00
parent f6bd578c8a
commit a457a8975f
4 changed files with 40 additions and 13 deletions

View File

@ -20,8 +20,8 @@ constexpr char const mode_r__[] = "r";
constexpr std::size_t const min_sz = 128; constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16; constexpr std::size_t const max_sz = 1024 * 16;
std::atomic<bool> is_quit__{ false }; std::atomic<bool> is_quit__ {false};
std::atomic<std::size_t> size_counter__{ 0 }; std::atomic<std::size_t> size_counter__ {0};
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>; using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;

View File

@ -1,17 +1,24 @@
#include <signal.h>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <atomic>
#include "libipc/ipc.h" #include "libipc/ipc.h"
namespace { namespace {
std::atomic<bool> is_quit__ {false};
ipc::channel *ipc__ = nullptr;
void do_send(int size, int interval) { void do_send(int size, int interval) {
ipc::channel ipc {"ipc", ipc::sender}; ipc::channel ipc {"ipc", ipc::sender};
ipc__ = &ipc;
std::string buffer(size, 'A'); std::string buffer(size, 'A');
while (true) { while (!is_quit__.load(std::memory_order_acquire)) {
std::cout << "send size: " << buffer.size() + 1 << "\n"; std::cout << "send size: " << buffer.size() + 1 << "\n";
ipc.send(buffer, 0/*tm*/); ipc.send(buffer, 0/*tm*/);
std::this_thread::sleep_for(std::chrono::milliseconds(interval)); std::this_thread::sleep_for(std::chrono::milliseconds(interval));
@ -20,11 +27,13 @@ void do_send(int size, int interval) {
void do_recv(int interval) { void do_recv(int interval) {
ipc::channel ipc {"ipc", ipc::receiver}; ipc::channel ipc {"ipc", ipc::receiver};
while (true) { ipc__ = &ipc;
while (!is_quit__.load(std::memory_order_acquire)) {
ipc::buff_t recv; ipc::buff_t recv;
for (int k = 1; recv.empty(); ++k) { for (int k = 1; recv.empty(); ++k) {
std::cout << "recv waiting... " << k << "\n"; std::cout << "recv waiting... " << k << "\n";
recv = ipc.recv(interval); recv = ipc.recv(interval);
if (is_quit__.load(std::memory_order_acquire)) return;
} }
std::cout << "recv size: " << recv.size() << "\n"; std::cout << "recv size: " << recv.size() << "\n";
} }
@ -34,6 +43,23 @@ void do_recv(int interval) {
int main(int argc, char ** argv) { int main(int argc, char ** argv) {
if (argc < 3) return -1; if (argc < 3) return -1;
auto exit = [](int) {
is_quit__.store(true, std::memory_order_release);
if (ipc__ != nullptr) ipc__->disconnect();
};
::signal(SIGINT , exit);
::signal(SIGABRT , exit);
::signal(SIGSEGV , exit);
::signal(SIGTERM , exit);
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
::signal(SIGBREAK, exit);
#else
::signal(SIGHUP , exit);
#endif
std::string mode {argv[1]}; std::string mode {argv[1]};
if (mode == "send") { if (mode == "send") {
if (argc < 4) return -1; if (argc < 4) return -1;

View File

@ -301,15 +301,13 @@ template <typename W, typename F>
bool wait_for(W& waiter, F&& pred, std::uint64_t tm) { bool wait_for(W& waiter, F&& pred, std::uint64_t tm) {
if (tm == 0) return !pred(); if (tm == 0) return !pred();
for (unsigned k = 0; pred();) { for (unsigned k = 0; pred();) {
bool loop = true, ret = true; bool ret = true;
ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { ipc::sleep(k, [&k, &ret, &waiter, &pred, tm] {
ret = waiter.wait_if([&loop, &pred] { ret = waiter.wait_if(std::forward<F>(pred), tm);
return loop = pred();
}, tm);
k = 0; k = 0;
}); });
if (!ret ) return false; // timeout or fail if (!ret) return false; // timeout or fail
if (!loop) break; if (k == 0) break; // k has been reset
} }
return true; return true;
} }

View File

@ -162,6 +162,9 @@ TEST(Sync, Condition) {
for (auto &t : test_conds) t.join(); for (auto &t : test_conds) t.join();
} }
/**
* https://stackoverflow.com/questions/51730660/is-this-a-bug-in-glibc-pthread
*/
TEST(Sync, ConditionRobust) { TEST(Sync, ConditionRobust) {
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 1\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 1\n");
ipc::sync::condition cond {"test-cond"}; ipc::sync::condition cond {"test-cond"};
@ -180,7 +183,7 @@ TEST(Sync, ConditionRobust) {
} }
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n"); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n");
cond.notify(); cond.broadcast();
printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n"); printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n");
}}; }};
printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n"); printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n");