mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2026-02-12 21:29:49 +08:00
refactor(log): replace utility/log with imp/log in ipc.cpp
- Replace include "libipc/utility/log.h" with "libipc/imp/log.h" - Add LIBIPC_LOG() at the beginning of functions that use logging - Replace all ipc::error() calls with log.error() - Replace all ipc::log() calls with log.debug() or log.error() based on context - Modified functions: - cc_acc(): error logging for shm acquire failure - make_handle(): error logging for chunk storage operations - find_storage(): error logging for invalid storage id - release_storage(): error logging for invalid storage id - recycle_storage(): error logging for invalid storage id - clear_message(): error logging for invalid message size - send(): error logging for various send failures, debug logging for force_push - recv(): error logging for various recv failures - Use type-safe streaming interface instead of printf-style formatting - Remove manual newline characters from log messages - Total changes: 19 log call sites updated
This commit is contained in:
parent
309baf77bc
commit
6143c23fd3
@ -19,7 +19,7 @@
|
|||||||
#include "libipc/rw_lock.h"
|
#include "libipc/rw_lock.h"
|
||||||
#include "libipc/waiter.h"
|
#include "libipc/waiter.h"
|
||||||
|
|
||||||
#include "libipc/utility/log.h"
|
#include "libipc/imp/log.h"
|
||||||
#include "libipc/utility/id_pool.h"
|
#include "libipc/utility/id_pool.h"
|
||||||
#include "libipc/utility/scope_guard.h"
|
#include "libipc/utility/scope_guard.h"
|
||||||
#include "libipc/utility/utility.h"
|
#include "libipc/utility/utility.h"
|
||||||
@ -76,6 +76,7 @@ ipc::buff_t make_cache(T &data, std::size_t size) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
acc_t *cc_acc(std::string const &pref) {
|
acc_t *cc_acc(std::string const &pref) {
|
||||||
|
LIBIPC_LOG();
|
||||||
static auto *phs = new ipc::unordered_map<std::string, ipc::shm::handle>; // no delete
|
static auto *phs = new ipc::unordered_map<std::string, ipc::shm::handle>; // no delete
|
||||||
static std::mutex lock;
|
static std::mutex lock;
|
||||||
std::lock_guard<std::mutex> guard {lock};
|
std::lock_guard<std::mutex> guard {lock};
|
||||||
@ -84,7 +85,7 @@ acc_t *cc_acc(std::string const &pref) {
|
|||||||
std::string shm_name {ipc::make_prefix(pref, "CA_CONN__")};
|
std::string shm_name {ipc::make_prefix(pref, "CA_CONN__")};
|
||||||
ipc::shm::handle h;
|
ipc::shm::handle h;
|
||||||
if (!h.acquire(shm_name.c_str(), sizeof(acc_t))) {
|
if (!h.acquire(shm_name.c_str(), sizeof(acc_t))) {
|
||||||
ipc::error("[cc_acc] acquire failed: %s\n", shm_name.c_str());
|
log.error("[cc_acc] acquire failed: ", shm_name);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
it = phs->emplace(pref, std::move(h)).first;
|
it = phs->emplace(pref, std::move(h)).first;
|
||||||
@ -217,10 +218,11 @@ auto& chunk_storages() {
|
|||||||
std::mutex lock_;
|
std::mutex lock_;
|
||||||
|
|
||||||
static bool make_handle(ipc::shm::handle &h, std::string const &shm_name, std::size_t chunk_size) {
|
static bool make_handle(ipc::shm::handle &h, std::string const &shm_name, std::size_t chunk_size) {
|
||||||
|
LIBIPC_LOG();
|
||||||
if (!h.valid() &&
|
if (!h.valid() &&
|
||||||
!h.acquire( shm_name.c_str(),
|
!h.acquire( shm_name.c_str(),
|
||||||
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
|
sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) {
|
||||||
ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size);
|
log.error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = ", chunk_size);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -240,7 +242,7 @@ auto& chunk_storages() {
|
|||||||
}
|
}
|
||||||
auto *info = static_cast<chunk_info_t*>(h->get());
|
auto *info = static_cast<chunk_info_t*>(h->get());
|
||||||
if (info == nullptr) {
|
if (info == nullptr) {
|
||||||
ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size);
|
log.error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = ", chunk_size);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
return info;
|
return info;
|
||||||
@ -290,8 +292,9 @@ std::pair<ipc::storage_id_t, void*> acquire_storage(conn_info_head *inf, std::si
|
|||||||
}
|
}
|
||||||
|
|
||||||
void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
||||||
|
LIBIPC_LOG();
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
log.error("[find_storage] id is invalid: id = ", (long)id, ", size = ", size);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
@ -301,8 +304,9 @@ void *find_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
void release_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
void release_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size) {
|
||||||
|
LIBIPC_LOG();
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
log.error("[release_storage] id is invalid: id = ", (long)id, ", size = ", size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
@ -334,8 +338,9 @@ bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,
|
|||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
|
void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
|
||||||
|
LIBIPC_LOG();
|
||||||
if (id < 0) {
|
if (id < 0) {
|
||||||
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
|
log.error("[recycle_storage] id is invalid: id = ", (long)id, ", size = ", size);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
std::size_t chunk_size = calc_chunk_size(size);
|
std::size_t chunk_size = calc_chunk_size(size);
|
||||||
@ -355,11 +360,12 @@ void recycle_storage(ipc::storage_id_t id, conn_info_head *inf, std::size_t size
|
|||||||
|
|
||||||
template <typename MsgT>
|
template <typename MsgT>
|
||||||
bool clear_message(conn_info_head *inf, void* p) {
|
bool clear_message(conn_info_head *inf, void* p) {
|
||||||
|
LIBIPC_LOG();
|
||||||
auto msg = static_cast<MsgT*>(p);
|
auto msg = static_cast<MsgT*>(p);
|
||||||
if (msg->storage_) {
|
if (msg->storage_) {
|
||||||
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
|
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
|
||||||
if (r_size <= 0) {
|
if (r_size <= 0) {
|
||||||
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
|
log.error("[clear_message] invalid msg size: ", (int)r_size);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
release_storage(*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
|
release_storage(*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
|
||||||
@ -518,33 +524,34 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm
|
|||||||
|
|
||||||
template <typename F>
|
template <typename F>
|
||||||
static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) {
|
static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) {
|
||||||
|
LIBIPC_LOG();
|
||||||
if (data == nullptr || size == 0) {
|
if (data == nullptr || size == 0) {
|
||||||
ipc::error("fail: send(%p, %zd)\n", data, size);
|
log.error("fail: send(", data, ", ", size, ")");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
auto que = queue_of(h);
|
auto que = queue_of(h);
|
||||||
if (que == nullptr) {
|
if (que == nullptr) {
|
||||||
ipc::error("fail: send, queue_of(h) == nullptr\n");
|
log.error("fail: send, queue_of(h) == nullptr");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (que->elems() == nullptr) {
|
if (que->elems() == nullptr) {
|
||||||
ipc::error("fail: send, queue_of(h)->elems() == nullptr\n");
|
log.error("fail: send, queue_of(h)->elems() == nullptr");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!que->ready_sending()) {
|
if (!que->ready_sending()) {
|
||||||
ipc::error("fail: send, que->ready_sending() == false\n");
|
log.error("fail: send, que->ready_sending() == false");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
|
ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
|
||||||
if (conns == 0) {
|
if (conns == 0) {
|
||||||
ipc::error("fail: send, there is no receiver on this connection.\n");
|
log.error("fail: send, there is no receiver on this connection.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// calc a new message id
|
// calc a new message id
|
||||||
conn_info_t *inf = info_of(h);
|
conn_info_t *inf = info_of(h);
|
||||||
auto acc = inf->acc();
|
auto acc = inf->acc();
|
||||||
if (acc == nullptr) {
|
if (acc == nullptr) {
|
||||||
ipc::error("fail: send, info_of(h)->acc() == nullptr\n");
|
log.error("fail: send, info_of(h)->acc() == nullptr");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
|
||||||
@ -558,7 +565,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
|
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
|
||||||
}
|
}
|
||||||
// try using message fragment
|
// try using message fragment
|
||||||
//ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
|
//log.debug("fail: shm::handle for big message. msg_id: ", msg_id, ", size: ", size);
|
||||||
}
|
}
|
||||||
// push message fragment
|
// push message fragment
|
||||||
std::int32_t offset = 0;
|
std::int32_t offset = 0;
|
||||||
@ -588,7 +595,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint
|
|||||||
[](void*) { return true; },
|
[](void*) { return true; },
|
||||||
info->cc_id_, msg_id, remain, data, size);
|
info->cc_id_, msg_id, remain, data, size);
|
||||||
}, tm)) {
|
}, tm)) {
|
||||||
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
log.debug("force_push: msg_id = ", msg_id, ", remain = ", remain, ", size = ", size);
|
||||||
if (!que->force_push(
|
if (!que->force_push(
|
||||||
[info](void* p) { return clear_message<typename queue_t::value_t>(info, p); },
|
[info](void* p) { return clear_message<typename queue_t::value_t>(info, p); },
|
||||||
info->cc_id_, msg_id, remain, data, size)) {
|
info->cc_id_, msg_id, remain, data, size)) {
|
||||||
@ -618,9 +625,10 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::
|
|||||||
}
|
}
|
||||||
|
|
||||||
static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
||||||
|
LIBIPC_LOG();
|
||||||
auto que = queue_of(h);
|
auto que = queue_of(h);
|
||||||
if (que == nullptr) {
|
if (que == nullptr) {
|
||||||
ipc::error("fail: recv, queue_of(h) == nullptr\n");
|
log.error("fail: recv, queue_of(h) == nullptr");
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
if (!que->connected()) {
|
if (!que->connected()) {
|
||||||
@ -648,7 +656,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
// msg.remain_ may minus & abs(msg.remain_) < data_length
|
||||||
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
|
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
|
||||||
if (r_size <= 0) {
|
if (r_size <= 0) {
|
||||||
ipc::error("fail: recv, r_size = %d\n", (int)r_size);
|
log.error("fail: recv, r_size = ", (int)r_size);
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
std::size_t msg_size = static_cast<std::size_t>(r_size);
|
std::size_t msg_size = static_cast<std::size_t>(r_size);
|
||||||
@ -669,7 +677,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
que->connected_id()
|
que->connected_id()
|
||||||
});
|
});
|
||||||
if (r_info == nullptr) {
|
if (r_info == nullptr) {
|
||||||
ipc::log("fail: ipc::mem::$new<recycle_t>.\n");
|
log.error("fail: ipc::mem::$new<recycle_t>.");
|
||||||
return ipc::buff_t{buf, msg_size}; // no recycle
|
return ipc::buff_t{buf, msg_size}; // no recycle
|
||||||
} else {
|
} else {
|
||||||
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
|
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
|
||||||
@ -685,7 +693,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
}, r_info};
|
}, r_info};
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
|
log.error("fail: shm::handle for large message. msg_id: ", msg.id_, ", buf_id: ", buf_id, ", size: ", msg_size);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user