From 1c224b18248bb50c4006ba995b44b205e59789d4 Mon Sep 17 00:00:00 2001 From: MincYu Date: Mon, 15 Feb 2021 10:51:17 +0800 Subject: [PATCH] a demo using shared memory for kvs --- CMakeLists.txt | 1 + demo/kvs/CMakeLists.txt | 11 ++ demo/kvs/main.cpp | 376 ++++++++++++++++++++++++++++++++++++++++ demo/msg_que/main.cpp | 16 +- 4 files changed, 399 insertions(+), 5 deletions(-) create mode 100644 demo/kvs/CMakeLists.txt create mode 100644 demo/kvs/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7ae1216..124d223 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,3 +20,4 @@ add_subdirectory(test) add_subdirectory(demo/chat) add_subdirectory(demo/msg_que) +add_subdirectory(demo/kvs) diff --git a/demo/kvs/CMakeLists.txt b/demo/kvs/CMakeLists.txt new file mode 100644 index 0000000..ddfac36 --- /dev/null +++ b/demo/kvs/CMakeLists.txt @@ -0,0 +1,11 @@ +project(kvs) + +include_directories( + ${CMAKE_SOURCE_DIR}/3rdparty) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/kvs/main.cpp b/demo/kvs/main.cpp new file mode 100644 index 0000000..f26116a --- /dev/null +++ b/demo/kvs/main.cpp @@ -0,0 +1,376 @@ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "libipc/ipc.h" +#include "libipc/shm.h" +#include "capo/random.hpp" + +#include +#include + +using string = std::string; + +template +using map = std::unordered_map; + +using namespace ipc::shm; + +namespace { + +constexpr char const name__ [] = "ipc-kvs"; +constexpr char const mode_s__[] = "s"; +constexpr char const mode_c__[] = "c"; +constexpr char const mode_t__[] = "t"; // test mode + +// constexpr std::size_t const min_sz = 1; +// constexpr std::size_t const max_sz = 1024 * 1024 * 512; + +std::atomic is_quit__{ false }; +std::atomic size_counter__{ 0 }; + +// using msg_que_t = ipc::chan; +// msg_que_t que__{ name__ }; +// ipc::byte_t buff__[128]; + +capo::random<> rand__{ + static_cast(1), + static_cast(127) +}; + +ipc::channel shared_chan { name__, ipc::sender | ipc::receiver }; + +inline std::string str_of_size(std::size_t sz) noexcept { + if (sz > 1024 * 1024) { + return std::to_string(sz / (1024 * 1024)) + " MB"; + } + if (sz > 1024) { + return std::to_string(sz / 1024) + " KB"; + } + return std::to_string(sz) + " bytes"; +} + +inline std::string speed_of(std::size_t sz) noexcept { + return str_of_size(sz) + "/s"; +} + +void do_counting() { + for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms + if (i % 10) continue; + i = 0; + std::cout + << speed_of(size_counter__.exchange(0, std::memory_order_relaxed)) + << std::endl; + } +} + +void kvs_server() { + // map key_val_map; + map key_len_map; + std::cout << "Running kvs server...\n"; + while (1) { + // std::printf("2 recving\n"); + auto dd = shared_chan.recv(); + auto str = static_cast(dd.data()); + + auto recv_stamp = std::chrono::system_clock::now(); + + // for (int i = 0; i < strlen(str); i++){ + // std::cout << (int)str[i] << " "; + // } + // std::cout << "\n"; + + if (str == nullptr) { + std::cout << "Receive null str\n"; + continue; + } + + // request addres (1 byte) | resp address (1 byte) | get/put (1 byte) | request id (1 byte) | metadata len (1 byte)| metadata | optional value + if (str[0] != 1) { + std::cout << "Not for server\n"; + continue; + } + + // std::printf("2 recv: %s\n", str); + auto resp_address = str[1]; + bool is_read = (str[2] == 1); + auto req_id = str[3]; + int meta_data_len = (int)str[4]; + + string key_name(str + 5, meta_data_len); + + string resp; + resp.push_back(resp_address); + resp.push_back(req_id); + + // response address (1 byte) | request id (1 byte) | is_success (1 byte) | optional value + if (is_read){ + // get request + std::cout << "Getting " << key_name << " ...\n"; + if (key_len_map.find(key_name) != key_len_map.end()) { + auto size_len = key_len_map[key_name]; + + resp.push_back(1); + // resp.push_back((char) size_len); + // resp.push_back((char) size_len >> 8); + // resp.push_back((char) size_len >> 16); + // resp.push_back((char) size_len >> 24); + resp += std::to_string(size_len); + } + else { + std::cout << key_name << " not exists\n"; + resp.push_back(2); + } + } + else{ + // put request + std::cout << "Putting " << key_name << " ...\n"; + + // auto size_len = (uint32_t) str[5 + meta_data_len] | + // (uint32_t) str[7 + meta_data_len] << 8 | + // (uint32_t) str[8 + meta_data_len] << 16 | + // (uint32_t) str[9 + meta_data_len] << 24; + + auto size_len = stoi(string(str + 5 + meta_data_len)); + + // handle shm_hd(key_name.c_str(), size_len); + // auto shm_ptr = (char *) shm_hd.get(); + auto shm_id = acquire(key_name.c_str(), size_len, open); + + // auto shm_ptr = (char *) get_mem(shm_id, nullptr); + // for (int i = 0; i < strlen(shm_ptr); i++){ + // std::cout << shm_ptr[i] << " "; + // } + // std::cout << "\n"; + + + if (shm_id == nullptr){ + std::cout << "Shm null ptr for " << key_name << "\n"; + resp.push_back(2); + } + else { + auto shm_ptr = (char *) get_mem(shm_id, nullptr); + // auto val_size = strlen(shm_ptr); + // std::cout << "shm_size " << size_len << " val_size " << val_size << "\n"; + + // key_val_map[key_name] = shm_ptr; + key_len_map[key_name] = size_len; + resp.push_back(1); + } + } + + auto ready_stamp = std::chrono::system_clock::now(); + auto handling_time = std::chrono::duration_cast(ready_stamp - recv_stamp).count(); + + auto req_type = is_read ? "Get" : "Put"; + std::cout << "Handled " << req_type << " " << key_name << ", handling_time: " << handling_time << "\n"; + + // try sending ack + while (!shared_chan.send(resp)) { + // waiting for connection + shared_chan.wait_for_recv(2); + } + + // std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + // std::cout << "Repeat \n"; + // auto val = key_val_map[key_name]; + // for (int i = 0; i < strlen(val); i++){ + // std::cout << val[i] << " "; + // } + // std::cout << "\n"; + } + + std::cout << __func__ << ": quit...\n"; +} + +void kvs_client(char id, bool is_read, string info) { + auto client_id = 2 + id; + std::cout << "Launching client " << client_id << " ...\n"; + + // request addres (1 byte) | resp address (1 byte) | get/put (1 byte) | request id (1 byte) | metadata len (1 byte)| metadata | optional value + auto req_id = rand__(); + + auto start_stamp = std::chrono::system_clock::now(); + string req; + req.push_back(1); + req.push_back(client_id); + + string key_name = "a" + info; + + if (is_read){ + req.push_back(1); + req.push_back(req_id); + + req.push_back((char) key_name.size()); + req += key_name; + } + else { + req.push_back(2); + req.push_back(req_id); + + int data_len = stoi(info); + + req.push_back((char) key_name.size()); + req += key_name; + + auto shm_size = data_len + 1; + // handle shm_hd(key_name.c_str(), shm_size); + // auto shm_ptr = (char *) shm_hd.get(); + + auto shm_id = acquire(key_name.c_str(), shm_size); + auto shm_ptr = (char *) get_mem(shm_id, nullptr); + // for (int i = 0; i < data_len; i++){ + // shm_ptr[i] = '1'; + // } + memset(shm_ptr, '1', data_len); + shm_ptr[data_len] = '\0'; + + // req.push_back((char) shm_size); + // req.push_back((char) shm_size >> 8); + // req.push_back((char) shm_size >> 16); + // req.push_back((char) shm_size >> 24); + req += std::to_string(shm_size); + + std::cout << "shm_size " << shm_size << "\n"; + } + + auto ready_stamp = std::chrono::system_clock::now(); + + while (!shared_chan.send(req)) { + // waiting for connection + shared_chan.wait_for_recv(2); + } + + // recv ack + auto dd = shared_chan.recv(); + auto str = static_cast(dd.data()); + + // for (int i = 0; i < strlen(str); i++){ + // std::cout << (int)str[i] << " "; + // } + // std::cout << "\n"; + + // response address (1 byte) | request id (1 byte) | is_success (1 byte) | optional value + if (str == nullptr) { + std::cout << "Ack error\n"; + } + else if (client_id != (int) str[0]){ + std::cout << "Not my ack " << (int) str[0] << "\n"; + return; + } + else { + auto ack_stamp = std::chrono::system_clock::now(); + + if (str[1] == req_id) { + if (is_read){ + + // auto size_len = (uint32_t) str[3] | + // (uint32_t) str[4] << 8 | + // (uint32_t) str[5] << 16 | + // (uint32_t) str[6] << 24; + + auto size_len = stoi(string(str + 3)); + auto shm_id = acquire(key_name.c_str(), size_len); + auto shm_ptr = (char *) get_mem(shm_id, nullptr); + // for (int i = 0; i < strlen(shm_ptr); i++){ + // std::cout << shm_ptr[i] << " "; + // } + // std::cout << "\n"; + auto ptr_stamp = std::chrono::system_clock::now(); + + auto val_size = strlen(shm_ptr); + auto val_stamp = std::chrono::system_clock::now(); + + auto ready_time = std::chrono::duration_cast(ready_stamp - start_stamp).count(); + auto ack_time = std::chrono::duration_cast(ack_stamp - ready_stamp).count(); + auto ptr_time = std::chrono::duration_cast(ptr_stamp - ack_stamp).count(); + auto val_time = std::chrono::duration_cast(val_stamp - ptr_stamp).count(); + + std::cout << "Receive Get " << key_name << ", val_size: " << val_size + << ", shm_size: " << size_len + << ", ready_time: " << ready_time + << ", ack_time: " << ack_time + << ", ptr_time: " << ptr_time + << ", val_time: " << val_time + <<"\n"; + } + else { + auto ready_time = std::chrono::duration_cast(ready_stamp - start_stamp).count(); + auto ack_time = std::chrono::duration_cast(ack_stamp - ready_stamp).count(); + + std::cout << "Receive Put " << key_name << ", ready_time "<< ready_time + << ", ack_time: " << ack_time + <<"\n"; + } + } + else { + std::cout << "Request id " << req_id << " not match " << (int)str[1] << "\n"; + } + } + std::cout << __func__ << ": quit...\n"; +} + +// char test_str[1024 * 1024 * 512]; +void test(int len){ + // auto start_stamp = std::chrono::system_clock::now(); + + // memset(test_str, '1', len); + // test_str[len] = '\0'; + // auto memset_stamp = std::chrono::system_clock::now(); + + // auto val_size = strlen(test_str); + // auto size_stamp = std::chrono::system_clock::now(); + + // auto memset_time = std::chrono::duration_cast(memset_stamp - start_stamp).count(); + // auto size_time = std::chrono::duration_cast(size_stamp - memset_stamp).count(); + // std::cout << "memset " << len << ", memset_time "<< memset_time << ", size_time " << size_time << "\n"; +} + +} // namespace + +int main(int argc, char ** argv) { + if (argc < 2) return 0; + + auto exit = [](int) { + is_quit__.store(true, std::memory_order_release); + shared_chan.disconnect(); + }; + ::signal(SIGINT , exit); + ::signal(SIGABRT , exit); + ::signal(SIGSEGV , exit); + ::signal(SIGTERM , exit); +#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ + defined(WINCE) || defined(_WIN32_WCE) + ::signal(SIGBREAK, exit); +#else + ::signal(SIGHUP , exit); +#endif + + if (std::string{ argv[1] } == mode_s__) { + kvs_server(); + } + else if (std::string{ argv[1] } == mode_c__) { + if (argc < 5) { + std::cout << "Require indicating client id, request type, and info.\n"; + return 0; + } + int id = std::stoi(string{argv[2]}); + bool is_read = std::stoi(string{argv[3]}) == 1; + string info{ argv[4] }; + kvs_client(id, is_read, info); + } + else if (std::string{ argv[1] } == mode_t__) { + int len = std::stoi(string{argv[2]}); + test(len); + } + return 0; +} diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..86c8162 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -18,7 +18,7 @@ constexpr char const mode_s__[] = "s"; constexpr char const mode_r__[] = "r"; constexpr std::size_t const min_sz = 128; -constexpr std::size_t const max_sz = 1024 * 16; +constexpr std::size_t const max_sz = 1024 * 1024 * 512; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; @@ -57,10 +57,11 @@ void do_counting() { } } -void do_send() { +void do_send(std::size_t sz) { std::cout << __func__ << ": start [" - << str_of_size(min_sz) << " - " << str_of_size(max_sz) + // << str_of_size(min_sz) << " - " << str_of_size(max_sz) + << str_of_size(sz) << "]...\n"; if (!que__.reconnect(ipc::sender)) { std::cerr << __func__ << ": connect failed.\n"; @@ -68,7 +69,7 @@ void do_send() { else { std::thread counting{ do_counting }; while (!is_quit__.load(std::memory_order_acquire)) { - std::size_t sz = static_cast(rand__()); + // std::size_t sz = static_cast(rand__()); if (!que__.send(ipc::buff_t(buff__, sz))) { std::cerr << __func__ << ": send failed.\n"; std::cout << __func__ << ": waiting for receiver...\n"; @@ -128,7 +129,12 @@ int main(int argc, char ** argv) { #endif if (std::string{ argv[1] } == mode_s__) { - do_send(); + if (argc < 3) { + std::cout << "Require indicating size.\n"; + return 0; + } + int sz = std::stoi(std::string{ argv[2] }); + do_send(sz); } else if (std::string{ argv[1] } == mode_r__) { do_recv();