mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
reconnect cannot reconnect when you are out of authority
This commit is contained in:
parent
a3b0a968f8
commit
ac54be7083
@ -11,14 +11,27 @@ int _tmain (int argc, TCHAR *argv[]) {
|
|||||||
ipc::channel ipc_r{ipc::prefix{"Global\\"}, "service ipc r", ipc::receiver};
|
ipc::channel ipc_r{ipc::prefix{"Global\\"}, "service ipc r", ipc::receiver};
|
||||||
ipc::channel ipc_w{ipc::prefix{"Global\\"}, "service ipc w", ipc::sender};
|
ipc::channel ipc_w{ipc::prefix{"Global\\"}, "service ipc w", ipc::sender};
|
||||||
while (1) {
|
while (1) {
|
||||||
|
if (!ipc_r.reconnect(ipc::receiver)) {
|
||||||
|
Sleep(1000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
auto msg = ipc_r.recv();
|
auto msg = ipc_r.recv();
|
||||||
if (msg.empty()) {
|
if (msg.empty()) {
|
||||||
_tprintf(_T("My Sample Client: message recv error\n"));
|
_tprintf(_T("My Sample Client: message recv error\n"));
|
||||||
return -1;
|
ipc_r.disconnect();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
printf("My Sample Client: message recv: [%s]\n", (char const *)msg.data());
|
printf("My Sample Client: message recv: [%s]\n", (char const *)msg.data());
|
||||||
while (!ipc_w.send("Copy.")) {
|
for (;;) {
|
||||||
|
if (!ipc_w.reconnect(ipc::sender)) {
|
||||||
|
Sleep(1000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (ipc_w.send("Copy.")) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
_tprintf(_T("My Sample Client: message send error\n"));
|
_tprintf(_T("My Sample Client: message send error\n"));
|
||||||
|
ipc_w.disconnect();
|
||||||
Sleep(1000);
|
Sleep(1000);
|
||||||
}
|
}
|
||||||
_tprintf(_T("My Sample Client: message send [Copy]\n"));
|
_tprintf(_T("My Sample Client: message send [Copy]\n"));
|
||||||
|
|||||||
@ -114,14 +114,25 @@ struct conn_info_head {
|
|||||||
conn_info_head(char const * prefix, char const * name)
|
conn_info_head(char const * prefix, char const * name)
|
||||||
: prefix_{ipc::make_string(prefix)}
|
: prefix_{ipc::make_string(prefix)}
|
||||||
, name_ {ipc::make_string(name)}
|
, name_ {ipc::make_string(name)}
|
||||||
, cc_id_ {}
|
, cc_id_ {} {}
|
||||||
, cc_waiter_{ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str()}
|
|
||||||
, wt_waiter_{ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str()}
|
void init() {
|
||||||
, rd_waiter_{ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str()}
|
if (!cc_waiter_.valid()) cc_waiter_.open(ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str());
|
||||||
, acc_h_ {ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t)} {
|
if (!wt_waiter_.valid()) wt_waiter_.open(ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str());
|
||||||
|
if (!rd_waiter_.valid()) rd_waiter_.open(ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str());
|
||||||
|
if (!acc_h_.valid()) acc_h_.acquire(ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t));
|
||||||
|
if (cc_id_ != 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
acc_t *pacc = cc_acc(prefix_);
|
acc_t *pacc = cc_acc(prefix_);
|
||||||
if (pacc != nullptr) {
|
if (pacc == nullptr) {
|
||||||
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed);
|
// Failed to obtain the global accumulator.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1;
|
||||||
|
if (cc_id_ == 0) {
|
||||||
|
// The identity cannot be 0.
|
||||||
|
cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -362,12 +373,18 @@ struct queue_generator {
|
|||||||
queue_t que_;
|
queue_t que_;
|
||||||
|
|
||||||
conn_info_t(char const * pref, char const * name)
|
conn_info_t(char const * pref, char const * name)
|
||||||
: conn_info_head{pref, name}
|
: conn_info_head{pref, name} { init(); }
|
||||||
, que_{ipc::make_prefix(prefix_, {
|
|
||||||
|
void init() {
|
||||||
|
conn_info_head::init();
|
||||||
|
if (!que_.valid()) {
|
||||||
|
que_.open(ipc::make_prefix(prefix_, {
|
||||||
"QU_CONN__",
|
"QU_CONN__",
|
||||||
ipc::to_string(DataSize), "__",
|
ipc::to_string(DataSize), "__",
|
||||||
ipc::to_string(AlignSize), "__",
|
ipc::to_string(AlignSize), "__",
|
||||||
name}).c_str()} {}
|
this->name_}).c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void disconnect_receiver() {
|
void disconnect_receiver() {
|
||||||
bool dis = que_.disconnect();
|
bool dis = que_.disconnect();
|
||||||
@ -397,6 +414,18 @@ constexpr static queue_t* queue_of(ipc::handle_t h) noexcept {
|
|||||||
|
|
||||||
/* API implementations */
|
/* API implementations */
|
||||||
|
|
||||||
|
static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
|
||||||
|
assert(ph != nullptr);
|
||||||
|
if (*ph == nullptr) {
|
||||||
|
*ph = ipc::mem::alloc<conn_info_t>(pref.str, name);
|
||||||
|
}
|
||||||
|
return reconnect(ph, start_to_recv);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
|
||||||
|
return connect(ph, {nullptr}, name, start_to_recv);
|
||||||
|
}
|
||||||
|
|
||||||
static void disconnect(ipc::handle_t h) {
|
static void disconnect(ipc::handle_t h) {
|
||||||
auto que = queue_of(h);
|
auto que = queue_of(h);
|
||||||
if (que == nullptr) {
|
if (que == nullptr) {
|
||||||
@ -414,6 +443,7 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
|
|||||||
if (que == nullptr) {
|
if (que == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
info_of(*ph)->init();
|
||||||
if (start_to_recv) {
|
if (start_to_recv) {
|
||||||
que->shut_sending();
|
que->shut_sending();
|
||||||
if (que->connect()) { // wouldn't connect twice
|
if (que->connect()) { // wouldn't connect twice
|
||||||
@ -429,18 +459,6 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
|
|||||||
return que->ready_sending();
|
return que->ready_sending();
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) {
|
|
||||||
assert(ph != nullptr);
|
|
||||||
if (*ph == nullptr) {
|
|
||||||
*ph = ipc::mem::alloc<conn_info_t>(pref.str, name);
|
|
||||||
}
|
|
||||||
return reconnect(ph, start_to_recv);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
|
|
||||||
return connect(ph, {nullptr}, name, start_to_recv);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroy(ipc::handle_t h) {
|
static void destroy(ipc::handle_t h) {
|
||||||
disconnect(h);
|
disconnect(h);
|
||||||
ipc::mem::free(info_of(h));
|
ipc::mem::free(info_of(h));
|
||||||
|
|||||||
@ -37,22 +37,27 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) {
|
|||||||
// Opens a named file mapping object.
|
// Opens a named file mapping object.
|
||||||
if (mode == open) {
|
if (mode == open) {
|
||||||
h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str());
|
h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str());
|
||||||
|
if (h == NULL) {
|
||||||
|
ipc::error("fail OpenFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Creates or opens a named file mapping object for a specified file.
|
// Creates or opens a named file mapping object for a specified file.
|
||||||
else {
|
else {
|
||||||
h = ::CreateFileMapping(INVALID_HANDLE_VALUE, detail::get_sa(), PAGE_READWRITE | SEC_COMMIT,
|
h = ::CreateFileMapping(INVALID_HANDLE_VALUE, detail::get_sa(), PAGE_READWRITE | SEC_COMMIT,
|
||||||
0, static_cast<DWORD>(size), fmt_name.c_str());
|
0, static_cast<DWORD>(size), fmt_name.c_str());
|
||||||
|
DWORD err = ::GetLastError();
|
||||||
// If the object exists before the function call, the function returns a handle to the existing object
|
// If the object exists before the function call, the function returns a handle to the existing object
|
||||||
// (with its current size, not the specified size), and GetLastError returns ERROR_ALREADY_EXISTS.
|
// (with its current size, not the specified size), and GetLastError returns ERROR_ALREADY_EXISTS.
|
||||||
if ((mode == create) && (::GetLastError() == ERROR_ALREADY_EXISTS)) {
|
if ((mode == create) && (err == ERROR_ALREADY_EXISTS)) {
|
||||||
::CloseHandle(h);
|
if (h != NULL) ::CloseHandle(h);
|
||||||
h = NULL;
|
h = NULL;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (h == NULL) {
|
if (h == NULL) {
|
||||||
ipc::error("fail CreateFileMapping/OpenFileMapping[%d]: %s\n", static_cast<int>(::GetLastError()), name);
|
ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast<int>(err), name);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
auto ii = mem::alloc<id_info_t>();
|
auto ii = mem::alloc<id_info_t>();
|
||||||
ii->h_ = h;
|
ii->h_ = h;
|
||||||
ii->size_ = size;
|
ii->size_ = size;
|
||||||
|
|||||||
@ -104,7 +104,7 @@ public:
|
|||||||
|
|
||||||
explicit queue_base(char const * name)
|
explicit queue_base(char const * name)
|
||||||
: queue_base{} {
|
: queue_base{} {
|
||||||
elems_ = open<elems_t>(name);
|
elems_ = queue_conn::template open<elems_t>(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
explicit queue_base(elems_t * elems) noexcept
|
explicit queue_base(elems_t * elems) noexcept
|
||||||
@ -117,6 +117,12 @@ public:
|
|||||||
base_t::close();
|
base_t::close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool open(char const * name) noexcept {
|
||||||
|
base_t::close();
|
||||||
|
elems_ = queue_conn::template open<elems_t>(name);
|
||||||
|
return elems_ != nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
elems_t * elems() noexcept { return elems_; }
|
elems_t * elems() noexcept { return elems_; }
|
||||||
elems_t const * elems() const noexcept { return elems_; }
|
elems_t const * elems() const noexcept { return elems_; }
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user