reconnect cannot reconnect when you are out of authority

This commit is contained in:
mutouyun 2023-10-21 15:46:16 +08:00
parent fb52c813fa
commit 76d9b41005
4 changed files with 78 additions and 36 deletions

View File

@ -11,14 +11,27 @@ int _tmain (int argc, TCHAR *argv[]) {
ipc::channel ipc_r{ipc::prefix{"Global\\"}, "service ipc r", ipc::receiver}; ipc::channel ipc_r{ipc::prefix{"Global\\"}, "service ipc r", ipc::receiver};
ipc::channel ipc_w{ipc::prefix{"Global\\"}, "service ipc w", ipc::sender}; ipc::channel ipc_w{ipc::prefix{"Global\\"}, "service ipc w", ipc::sender};
while (1) { while (1) {
if (!ipc_r.reconnect(ipc::receiver)) {
Sleep(1000);
continue;
}
auto msg = ipc_r.recv(); auto msg = ipc_r.recv();
if (msg.empty()) { if (msg.empty()) {
_tprintf(_T("My Sample Client: message recv error\n")); _tprintf(_T("My Sample Client: message recv error\n"));
return -1; ipc_r.disconnect();
continue;
} }
printf("My Sample Client: message recv: [%s]\n", (char const *)msg.data()); printf("My Sample Client: message recv: [%s]\n", (char const *)msg.data());
while (!ipc_w.send("Copy.")) { for (;;) {
if (!ipc_w.reconnect(ipc::sender)) {
Sleep(1000);
continue;
}
if (ipc_w.send("Copy.")) {
break;
}
_tprintf(_T("My Sample Client: message send error\n")); _tprintf(_T("My Sample Client: message send error\n"));
ipc_w.disconnect();
Sleep(1000); Sleep(1000);
} }
_tprintf(_T("My Sample Client: message send [Copy]\n")); _tprintf(_T("My Sample Client: message send [Copy]\n"));

View File

@ -112,16 +112,27 @@ struct conn_info_head {
ipc::shm::handle acc_h_; ipc::shm::handle acc_h_;
conn_info_head(char const * prefix, char const * name) conn_info_head(char const * prefix, char const * name)
: prefix_ {ipc::make_string(prefix)} : prefix_{ipc::make_string(prefix)}
, name_ {ipc::make_string(name)} , name_ {ipc::make_string(name)}
, cc_id_ {} , cc_id_ {} {}
, cc_waiter_{ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str()}
, wt_waiter_{ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str()} void init() {
, rd_waiter_{ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str()} if (!cc_waiter_.valid()) cc_waiter_.open(ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str());
, acc_h_ {ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t)} { if (!wt_waiter_.valid()) wt_waiter_.open(ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str());
if (!rd_waiter_.valid()) rd_waiter_.open(ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str());
if (!acc_h_.valid()) acc_h_.acquire(ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t));
if (cc_id_ != 0) {
return;
}
acc_t *pacc = cc_acc(prefix_); acc_t *pacc = cc_acc(prefix_);
if (pacc != nullptr) { if (pacc == nullptr) {
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed); // Failed to obtain the global accumulator.
return;
}
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1;
if (cc_id_ == 0) {
// The identity cannot be 0.
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1;
} }
} }
@ -362,12 +373,18 @@ struct queue_generator {
queue_t que_; queue_t que_;
conn_info_t(char const * pref, char const * name) conn_info_t(char const * pref, char const * name)
: conn_info_head{pref, name} : conn_info_head{pref, name} { init(); }
, que_{ipc::make_prefix(prefix_, {
"QU_CONN__", void init() {
ipc::to_string(DataSize), "__", conn_info_head::init();
ipc::to_string(AlignSize), "__", if (!que_.valid()) {
name}).c_str()} {} que_.open(ipc::make_prefix(prefix_, {
"QU_CONN__",
ipc::to_string(DataSize), "__",
ipc::to_string(AlignSize), "__",
this->name_}).c_str());
}
}
void disconnect_receiver() { void disconnect_receiver() {
bool dis = que_.disconnect(); bool dis = que_.disconnect();
@ -397,6 +414,18 @@ constexpr static queue_t* queue_of(ipc::handle_t h) noexcept {
/* API implementations */ /* API implementations */
static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
assert(ph != nullptr);
if (*ph == nullptr) {
*ph = ipc::mem::alloc<conn_info_t>(pref.str, name);
}
return reconnect(ph, start_to_recv);
}
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
return connect(ph, {nullptr}, name, start_to_recv);
}
static void disconnect(ipc::handle_t h) { static void disconnect(ipc::handle_t h) {
auto que = queue_of(h); auto que = queue_of(h);
if (que == nullptr) { if (que == nullptr) {
@ -414,6 +443,7 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
if (que == nullptr) { if (que == nullptr) {
return false; return false;
} }
info_of(*ph)->init();
if (start_to_recv) { if (start_to_recv) {
que->shut_sending(); que->shut_sending();
if (que->connect()) { // wouldn't connect twice if (que->connect()) { // wouldn't connect twice
@ -429,18 +459,6 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
return que->ready_sending(); return que->ready_sending();
} }
static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
assert(ph != nullptr);
if (*ph == nullptr) {
*ph = ipc::mem::alloc<conn_info_t>(pref.str, name);
}
return reconnect(ph, start_to_recv);
}
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
return connect(ph, {nullptr}, name, start_to_recv);
}
static void destroy(ipc::handle_t h) { static void destroy(ipc::handle_t h) {
disconnect(h); disconnect(h);
ipc::mem::free(info_of(h)); ipc::mem::free(info_of(h));

View File

@ -37,21 +37,26 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
// Opens a named file mapping object. // Opens a named file mapping object.
if (mode == open) { if (mode == open) {
h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str()); h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str());
if (h == NULL) {
ipc::error("fail OpenFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
return nullptr;
}
} }
// Creates or opens a named file mapping object for a specified file. // Creates or opens a named file mapping object for a specified file.
else { else {
h = ::CreateFileMapping(INVALID_HANDLE_VALUE, detail::get_sa(), PAGE_READWRITE | SEC_COMMIT, h = ::CreateFileMapping(INVALID_HANDLE_VALUE, detail::get_sa(), PAGE_READWRITE | SEC_COMMIT,
0, static_cast<DWORD>(size), fmt_name.c_str()); 0, static_cast<DWORD>(size), fmt_name.c_str());
DWORD err = ::GetLastError();
// If the object exists before the function call, the function returns a handle to the existing object // If the object exists before the function call, the function returns a handle to the existing object
// (with its current size, not the specified size), and GetLastError returns ERROR_ALREADY_EXISTS. // (with its current size, not the specified size), and GetLastError returns ERROR_ALREADY_EXISTS.
if ((mode == create) && (::GetLastError() == ERROR_ALREADY_EXISTS)) { if ((mode == create) && (err == ERROR_ALREADY_EXISTS)) {
::CloseHandle(h); if (h != NULL) ::CloseHandle(h);
h = NULL; h = NULL;
} }
} if (h == NULL) {
if (h == NULL) { ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast<int>(err), name);
ipc::error("fail CreateFileMapping/OpenFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name); return nullptr;
return nullptr; }
} }
auto ii = mem::alloc<id_info_t>(); auto ii = mem::alloc<id_info_t>();
ii->h_ = h; ii->h_ = h;

View File

@ -104,7 +104,7 @@ public:
explicit queue_base(char const * name) explicit queue_base(char const * name)
: queue_base{} { : queue_base{} {
elems_ = open<elems_t>(name); elems_ = queue_conn::template open<elems_t>(name);
} }
explicit queue_base(elems_t * elems) noexcept explicit queue_base(elems_t * elems) noexcept
@ -117,6 +117,12 @@ public:
base_t::close(); base_t::close();
} }
bool open(char const * name) noexcept {
base_t::close();
elems_ = queue_conn::template open<elems_t>(name);
return elems_ != nullptr;
}
elems_t * elems() noexcept { return elems_; } elems_t * elems() noexcept { return elems_; }
elems_t const * elems() const noexcept { return elems_; } elems_t const * elems() const noexcept { return elems_; }