调整接口;添加 msg_que demo

This commit is contained in:
mutouyun 2020-09-20 14:55:47 +08:00
parent 85c9eecdfd
commit 2255ae685a
5 changed files with 149 additions and 22 deletions

View File

@ -1,6 +1,4 @@
#include <signal.h>
#include <iostream>
#include <string>
#include <thread>

View File

@ -1,5 +1,8 @@
project(msg_que)
include_directories(
${CMAKE_SOURCE_DIR}/3rdparty)
file(GLOB SRC_FILES ./*.cpp)
file(GLOB HEAD_FILES ./*.h)

View File

@ -1,19 +1,124 @@
#include <signal.h>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include "libipc/ipc.h"
#include "capo/random.hpp"
namespace {
constexpr char const name__[] = "ipc-msg-que";
constexpr char const name__ [] = "ipc-msg-que";
constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";
constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16;
std::atomic<bool> is_quit__{ false };
std::atomic<std::size_t> size_per_1s__{ 0 };
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
msg_que_t que__{ name__ };
ipc::byte_t buff__[max_sz];
capo::random<> rand__{ min_sz, max_sz };
inline std::string str_of_size(std::size_t sz) noexcept {
if (sz <= 1024) {
return std::to_string(sz) + " bytes";
}
if (sz <= 1024 * 1024) {
return std::to_string(sz / 1024) + " KB";
}
return std::to_string(sz / (1024 * 1024)) + " MB";
}
inline std::string speed_of(std::size_t sz) noexcept {
return str_of_size(sz) + " /s";
}
void do_counting() {
for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms
if (i % 10) continue;
i = 0;
std::cout
<< speed_of(size_per_1s__.load(std::memory_order_acquire))
<< std::endl;
size_per_1s__.store(0, std::memory_order_release);
}
}
void do_send() {
std::cout
<< __func__ << ": start ["
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
<< "]...\n";
if (!que__.reconnect(ipc::sender)) {
std::cerr << __func__ << ": connect failed.\n";
}
else {
std::thread counting{ do_counting };
while (!is_quit__.load(std::memory_order_acquire)) {
std::size_t sz = static_cast<std::size_t>(rand__());
if (!que__.send(ipc::buff_t(buff__, sz))) {
std::cerr << __func__ << ": send failed.\n";
std::cout << __func__ << ": waiting for receiver...\n";
if (!que__.wait_for_recv(1)) {
std::cerr << __func__ << ": wait receiver failed.\n";
is_quit__.store(true, std::memory_order_release);
break;
}
}
size_per_1s__.fetch_add(sz, std::memory_order_release);
std::this_thread::yield();
}
counting.join();
}
std::cout << __func__ << ": quit...\n";
}
void do_recv() {
std::cout
<< __func__ << ": start ["
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
<< "]...\n";
if (!que__.reconnect(ipc::receiver)) {
std::cerr << __func__ << ": connect failed.\n";
}
else {
std::thread counting{ do_counting };
while (!is_quit__.load(std::memory_order_acquire)) {
auto msg = que__.recv();
if (msg.empty()) break;
size_per_1s__.fetch_add(msg.size(), std::memory_order_release);
}
counting.join();
}
std::cout << __func__ << ": quit...\n";
}
} // namespace
int main(int argc, char ** argv) {
if (argc < 2) return 0;
::signal(SIGINT, [](int) {
is_quit__.store(true, std::memory_order_release);
que__.disconnect();
});
if (std::string{ argv[1] } == mode_s__) {
do_send();
}
else if (std::string{ argv[1] } == mode_r__) {
do_recv();
}
return 0;
}

View File

@ -20,6 +20,7 @@ enum : unsigned {
template <typename Flag>
struct IPC_EXPORT chan_impl {
static bool connect (ipc::handle_t * ph, char const * name, unsigned mode);
static bool reconnect (ipc::handle_t * ph, unsigned mode);
static void disconnect(ipc::handle_t h);
static void destroy (ipc::handle_t h);
@ -90,8 +91,13 @@ public:
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) {
if (name == nullptr || name[0] == '\0') return false;
this->disconnect();
detail_t::connect(&h_, name, mode_ = mode);
return valid();
return detail_t::connect(&h_, name, mode_ = mode);
}
bool reconnect(unsigned mode) {
if (!valid()) return false;
if (mode_ == mode) return true;
return detail_t::reconnect(&h_, mode_ = mode);
}
void disconnect() {

View File

@ -345,23 +345,6 @@ constexpr static queue_t* queue_of(ipc::handle_t h) {
/* API implementations */
static bool connect(ipc::handle_t * ph, char const * name, bool start) {
assert(ph != nullptr);
if (*ph == nullptr) {
*ph = ipc::mem::alloc<conn_info_t>(name);
}
auto que = queue_of(*ph);
if (que == nullptr) {
return false;
}
if (start) {
if (que->connect()) { // wouldn't connect twice
info_of(*ph)->cc_waiter_.broadcast();
}
}
return true;
}
static void disconnect(ipc::handle_t h) {
auto que = queue_of(h);
if (que == nullptr) {
@ -374,6 +357,33 @@ static void disconnect(ipc::handle_t h) {
}
}
static bool reconnect(ipc::handle_t * ph, bool start) {
assert(ph != nullptr);
assert(*ph != nullptr);
auto que = queue_of(*ph);
if (que == nullptr) {
return false;
}
if (start) {
if (que->connect()) { // wouldn't connect twice
info_of(*ph)->cc_waiter_.broadcast();
}
}
// start == false
else if (que->connected()) {
disconnect(*ph);
}
return true;
}
static bool connect(ipc::handle_t * ph, char const * name, bool start) {
assert(ph != nullptr);
if (*ph == nullptr) {
*ph = ipc::mem::alloc<conn_info_t>(name);
}
return reconnect(ph, start);
}
static void destroy(ipc::handle_t h) {
disconnect(h);
ipc::mem::free(info_of(h));
@ -583,6 +593,11 @@ bool chan_impl<Flag>::connect(ipc::handle_t * ph, char const * name, unsigned mo
return detail_impl<policy_t<Flag>>::connect(ph, name, mode & receiver);
}
template <typename Flag>
bool chan_impl<Flag>::reconnect(ipc::handle_t * ph, unsigned mode) {
return detail_impl<policy_t<Flag>>::reconnect(ph, mode & receiver);
}
template <typename Flag>
void chan_impl<Flag>::disconnect(ipc::handle_t h) {
detail_impl<policy_t<Flag>>::disconnect(h);