fix some bugs, adjust the test cases

This commit is contained in:
mutouyun 2021-06-27 15:46:21 +08:00
parent bbd241948b
commit 2179ce2a19
6 changed files with 148 additions and 106 deletions

View File

@ -29,7 +29,7 @@ enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(), invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64, data_length = 64,
large_msg_limit = data_length, large_msg_limit = data_length,
large_msg_align = 512, large_msg_align = 1024,
large_msg_cache = 32, large_msg_cache = 32,
default_timeout = 100 // ms default_timeout = 100 // ms
}; };

View File

@ -51,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> {
std::aligned_storage_t<DataSize, AlignSize> data_ {}; std::aligned_storage_t<DataSize, AlignSize> data_ {};
msg_t() = default; msg_t() = default;
msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size)
: msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} {
if (this->storage_) { if (this->storage_) {
if (d != nullptr) { if (data != nullptr) {
// copy storage-id // copy storage-id
*reinterpret_cast<std::size_t*>(&data_) = *static_cast<std::size_t const *>(d); *reinterpret_cast<ipc::storage_id_t*>(&data_) =
*static_cast<ipc::storage_id_t const *>(data);
} }
} }
else std::memcpy(&data_, d, s); else std::memcpy(&data_, data, size);
} }
}; };
@ -95,17 +96,13 @@ struct chunk_info_t {
ipc::id_pool<> pool_; ipc::id_pool<> pool_;
ipc::spin_lock lock_; ipc::spin_lock lock_;
IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept {
return ipc::make_align(alignof(std::max_align_t), chunk_size);
}
IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept {
return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); return ipc::id_pool<>::max_count * chunk_size;
} }
ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
if (id == ipc::invalid_value) return nullptr; if (id < 0) return nullptr;
return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunks_elem_size(chunk_size) * id); return reinterpret_cast<ipc::byte_t *>(this + 1) + (chunk_size * id);
} }
}; };
@ -129,29 +126,22 @@ auto& chunk_storages() {
return info; return info;
} }
}; };
static ipc::unordered_map<std::size_t, chunk_t> chunk_s; thread_local ipc::unordered_map<std::size_t, chunk_t> chunk_s;
return chunk_s; return chunk_s;
} }
auto& chunk_lock() { IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
static ipc::spin_lock chunk_l; return ipc::make_align(alignof(std::max_align_t),
return chunk_l; (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align);
} }
constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; return chunk_storages()[chunk_size].get_info(chunk_size);
} }
auto& chunk_storage(std::size_t chunk_size) { std::pair<ipc::storage_id_t, void*> apply_storage(std::size_t size) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock());
return chunk_storages()[chunk_size];
}
std::pair<std::size_t, void*> apply_storage(std::size_t size) {
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return {}; if (info == nullptr) return {};
info->lock_.lock(); info->lock_.lock();
@ -163,27 +153,25 @@ std::pair<std::size_t, void*> apply_storage(std::size_t size) {
return { id, info->at(chunk_size, id) }; return { id, info->at(chunk_size, id) };
} }
void *find_storage(std::size_t id, std::size_t size) { void *find_storage(ipc::storage_id_t id, std::size_t size) {
if (id == ipc::invalid_value) { if (id < 0) {
ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
return nullptr; return nullptr;
} }
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return nullptr; if (info == nullptr) return nullptr;
return info->at(chunk_size, id); return info->at(chunk_size, id);
} }
void clear_storage(std::size_t id, std::size_t size) { void release_storage(ipc::storage_id_t id, std::size_t size) {
if (id == ipc::invalid_value) { if (id < 0) {
ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
return; return;
} }
std::size_t chunk_size = calc_chunk_size(size); std::size_t chunk_size = calc_chunk_size(size);
auto & chunk_shm = chunk_storage(chunk_size); auto info = chunk_storage_info(chunk_size);
auto info = chunk_shm.get_info(chunk_size);
if (info == nullptr) return; if (info == nullptr) return;
info->lock_.lock(); info->lock_.lock();
@ -195,9 +183,14 @@ template <typename MsgT>
bool recycle_message(void* p) { bool recycle_message(void* p) {
auto msg = static_cast<MsgT*>(p); auto msg = static_cast<MsgT*>(p);
if (msg->storage_) { if (msg->storage_) {
clear_storage( std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
*reinterpret_cast<std::size_t*>(&msg->data_), if (r_size <= 0) {
static_cast<std::int32_t>(ipc::data_length) + msg->remain_); ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size);
return true;
}
release_storage(
*reinterpret_cast<ipc::storage_id_t*>(&msg->data_),
static_cast<std::size_t>(r_size));
} }
return true; return true;
} }
@ -220,7 +213,7 @@ struct conn_info_head {
* - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html
* - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827
*/ */
ipc::tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_; ipc::tls::pointer<ipc::map<msg_id_t, cache_t>> recv_cache_;
conn_info_head(char const * name) conn_info_head(char const * name)
: name_ {name} : name_ {name}
@ -409,11 +402,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0); static_cast<std::int32_t>(ipc::data_length), &(dat.first), 0);
} }
// try using message fragment // try using message fragment
// ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
} }
// push message fragment // push message fragment
std::int32_t offset = 0; std::int32_t offset = 0;
for (int i = 0; i < static_cast<int>(size / ipc::data_length); ++i, offset += ipc::data_length) { for (std::int32_t i = 0; i < static_cast<std::int32_t>(size / ipc::data_length); ++i, offset += ipc::data_length) {
if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length), if (!try_push(static_cast<std::int32_t>(size) - offset - static_cast<std::int32_t>(ipc::data_length),
static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) { static_cast<ipc::byte_t const *>(data) + offset, ipc::data_length)) {
return false; return false;
@ -479,7 +472,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
return {}; return {};
} }
auto& rc = info_of(h)->recv_cache(); auto& rc = info_of(h)->recv_cache();
while (1) { for (;;) {
// pop a new message // pop a new message
typename queue_t::value_t msg; typename queue_t::value_t msg;
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
@ -491,20 +484,28 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
continue; // ignore message to self continue; // ignore message to self
} }
// msg.remain_ may minus & abs(msg.remain_) < data_length // msg.remain_ may minus & abs(msg.remain_) < data_length
std::size_t remain = static_cast<std::int32_t>(ipc::data_length) + msg.remain_; std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg.remain_;
if (r_size <= 0) {
ipc::error("fail: recv, r_size = %d\n", (int)r_size);
return {};
}
std::size_t msg_size = static_cast<std::size_t>(r_size);
// find cache with msg.id_ // find cache with msg.id_
auto cac_it = rc.find(msg.id_); auto cac_it = rc.find(msg.id_);
if (cac_it == rc.end()) { if (cac_it == rc.end()) {
if (remain <= ipc::data_length) { if (msg_size <= ipc::data_length) {
return make_cache(msg.data_, remain); return make_cache(msg.data_, msg_size);
} }
if (msg.storage_) { if (msg.storage_) {
std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_); std::size_t buf_id = *reinterpret_cast<std::size_t*>(&msg.data_);
void * buf = find_storage(buf_id, remain); void * buf = find_storage(buf_id, msg_size);
if (buf != nullptr) { if (buf != nullptr) {
return ipc::buff_t{buf, remain}; return ipc::buff_t{buf, msg_size};
}
else {
ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);
continue;
} }
else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain);
} }
// gc // gc
if (rc.size() > 1024) { if (rc.size() > 1024) {
@ -518,14 +519,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
for (auto id : need_del) rc.erase(id); for (auto id : need_del) rc.erase(id);
} }
// cache the first message fragment // cache the first message fragment
rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) });
} }
// has cached before this message // has cached before this message
else { else {
auto& cac = cac_it->second; auto& cac = cac_it->second;
// this is the last message fragment // this is the last message fragment
if (msg.remain_ <= 0) { if (msg.remain_ <= 0) {
cac.append(&(msg.data_), remain); cac.append(&(msg.data_), msg_size);
// finish this message, erase it from cache // finish this message, erase it from cache
auto buff = std::move(cac.buff_); auto buff = std::move(cac.buff_);
rc.erase(cac_it); rc.erase(cac_it);

View File

@ -5,6 +5,7 @@
#include <utility> #include <utility>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
#include <map>
#include <string> #include <string>
#include <cstdio> #include <cstdio>
@ -49,6 +50,11 @@ using unordered_map = std::unordered_map<
Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>> Key, T, std::hash<Key>, std::equal_to<Key>, ipc::mem::allocator<std::pair<const Key, T>>
>; >;
template <typename Key, typename T>
using map = std::map<
Key, T, std::less<Key>, ipc::mem::allocator<std::pair<const Key, T>>
>;
template <typename Char> template <typename Char>
using basic_string = std::basic_string< using basic_string = std::basic_string<
Char, std::char_traits<Char>, ipc::mem::allocator<Char> Char, std::char_traits<Char>, ipc::mem::allocator<Char>

View File

@ -2,13 +2,15 @@
#include <type_traits> // std::aligned_storage_t #include <type_traits> // std::aligned_storage_t
#include <cstring> // std::memcmp #include <cstring> // std::memcmp
#include <cstdint>
#include "libipc/def.h" #include "libipc/def.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
namespace ipc { namespace ipc {
using storage_id_t = std::int32_t;
template <std::size_t DataSize, std::size_t AlignSize> template <std::size_t DataSize, std::size_t AlignSize>
struct id_type; struct id_type;
@ -16,7 +18,7 @@ template <std::size_t AlignSize>
struct id_type<0, AlignSize> { struct id_type<0, AlignSize> {
uint_t<8> id_; uint_t<8> id_;
id_type& operator=(std::size_t val) { id_type& operator=(storage_id_t val) {
id_ = static_cast<uint_t<8>>(val); id_ = static_cast<uint_t<8>>(val);
return (*this); return (*this);
} }
@ -57,7 +59,7 @@ public:
} }
void init() { void init() {
for (std::size_t i = 0; i < max_count;) { for (storage_id_t i = 0; i < max_count;) {
i = next_[i] = (i + 1); i = next_[i] = (i + 1);
} }
} }
@ -71,22 +73,22 @@ public:
return cursor_ == max_count; return cursor_ == max_count;
} }
std::size_t acquire() { storage_id_t acquire() {
if (empty()) return invalid_value; if (empty()) return -1;
std::size_t id = cursor_; storage_id_t id = cursor_;
cursor_ = next_[id]; // point to next cursor_ = next_[id]; // point to next
return id; return id;
} }
bool release(std::size_t id) { bool release(storage_id_t id) {
if (id == invalid_value) return false; if (id < 0) return false;
next_[id] = cursor_; next_[id] = cursor_;
cursor_ = static_cast<uint_t<8>>(id); // put it back cursor_ = static_cast<uint_t<8>>(id); // put it back
return true; return true;
} }
void * at(std::size_t id) { return &(next_[id].data_); } void * at(storage_id_t id) { return &(next_[id].data_); }
void const * at(std::size_t id) const { return &(next_[id].data_); } void const * at(storage_id_t id) const { return &(next_[id].data_); }
}; };
template <typename T> template <typename T>
@ -94,8 +96,8 @@ class obj_pool : public id_pool<sizeof(T), alignof(T)> {
using base_t = id_pool<sizeof(T), alignof(T)>; using base_t = id_pool<sizeof(T), alignof(T)>;
public: public:
T * at(std::size_t id) { return reinterpret_cast<T *>(base_t::at(id)); } T * at(storage_id_t id) { return reinterpret_cast<T *>(base_t::at(id)); }
T const * at(std::size_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); } T const * at(storage_id_t id) const { return reinterpret_cast<T const *>(base_t::at(id)); }
}; };
} // namespace ipc } // namespace ipc

View File

@ -1,6 +1,8 @@
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include <mutex>
#include <atomic>
#include <cstring> #include <cstring>
#include "libipc/ipc.h" #include "libipc/ipc.h"
@ -16,15 +18,28 @@ using namespace ipc;
namespace { namespace {
constexpr int LoopCount = 10000;
constexpr int MultiMax = 8;
struct msg_head {
int id_;
};
class rand_buf : public buffer { class rand_buf : public buffer {
public: public:
rand_buf() { rand_buf() {
int size = capo::random<>{1, 65536}(); int size = capo::random<>{sizeof(msg_head), 65536}();
*this = buffer(new char[size], size, [](void * p, std::size_t) { *this = buffer(new char[size], size, [](void * p, std::size_t) {
delete [] static_cast<char *>(p); delete [] static_cast<char *>(p);
}); });
} }
rand_buf(msg_head const &msg) {
*this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) {
delete static_cast<msg_head *>(p);
});
}
rand_buf(rand_buf &&) = default; rand_buf(rand_buf &&) = default;
rand_buf(rand_buf const & rhs) { rand_buf(rand_buf const & rhs) {
if (rhs.empty()) return; if (rhs.empty()) return;
@ -40,11 +55,11 @@ public:
} }
void set_id(int k) noexcept { void set_id(int k) noexcept {
*get<char *>() = static_cast<char>(k); get<msg_head *>()->id_ = k;
} }
int get_id() const noexcept { int get_id() const noexcept {
return static_cast<int>(*get<char *>()); return get<msg_head *>()->id_;
} }
using buffer::operator=; using buffer::operator=;
@ -67,57 +82,67 @@ void test_basic(char const * name) {
EXPECT_EQ(que2.recv(), test2); EXPECT_EQ(que2.recv(), test2);
} }
template <relat Rp, relat Rc, trans Ts> class data_set {
void test_sr(char const * name, int size, int s_cnt, int r_cnt) { std::vector<rand_buf> datas_;
using que_t = chan<Rp, Rc, Ts>;
public:
data_set() {
datas_.resize(LoopCount);
for (int i = 0; i < LoopCount; ++i) {
datas_[i].set_id(i);
}
}
std::vector<rand_buf> const &get() const noexcept {
return datas_;
}
} const data_set__;
template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>>
void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt)); ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
ipc_ut::reader().start(static_cast<std::size_t>(r_cnt)); ipc_ut::reader().start(static_cast<std::size_t>(r_cnt));
std::atomic_thread_fence(std::memory_order_seq_cst);
ipc_ut::test_stopwatch sw; ipc_ut::test_stopwatch sw;
std::vector<rand_buf> tests(size);
for (int k = 0; k < s_cnt; ++k) { for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { ipc_ut::sender() << [name, &sw, r_cnt, k] {
que_t que1 { name }; Que que { name, ipc::sender };
EXPECT_TRUE(que1.wait_for_recv(r_cnt)); EXPECT_TRUE(que.wait_for_recv(r_cnt));
sw.start(); sw.start();
for (auto & buf : tests) { for (int i = 0; i < (int)data_set__.get().size(); ++i) {
rand_buf data { buf }; EXPECT_TRUE(que.send(data_set__.get()[i]));
data.set_id(k);
EXPECT_TRUE(que1.send(data));
} }
}; };
} }
for (int k = 0; k < r_cnt; ++k) { for (int k = 0; k < r_cnt; ++k) {
ipc_ut::reader() << [name, &tests, s_cnt] { ipc_ut::reader() << [name] {
que_t que2 { name, ipc::receiver }; Que que { name, ipc::receiver };
std::vector<int> cursors(s_cnt);
for (;;) { for (;;) {
rand_buf got { que2.recv() }; rand_buf got { que.recv() };
ASSERT_FALSE(got.empty()); ASSERT_FALSE(got.empty());
int & cur = cursors.at(got.get_id()); int i = got.get_id();
ASSERT_TRUE((cur >= 0) && (cur < static_cast<int>(tests.size()))); if (i == -1) {
rand_buf buf { tests.at(cur++) }; return;
buf.set_id(got.get_id());
EXPECT_EQ(got, buf);
int n = 0;
for (; n < static_cast<int>(cursors.size()); ++n) {
if (cursors[n] < static_cast<int>(tests.size())) break;
} }
if (n == static_cast<int>(cursors.size())) break; ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
EXPECT_EQ(data_set__.get()[i], got);
} }
}; };
} }
ipc_ut::sender().wait_for_done(); ipc_ut::sender().wait_for_done();
Que que { name };
EXPECT_TRUE(que.wait_for_recv(r_cnt));
for (int k = 0; k < r_cnt; ++k) {
que.send(rand_buf{msg_head{-1}});
}
ipc_ut::reader().wait_for_done(); ipc_ut::reader().wait_for_done();
sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, size, name); sw.print_elapsed<std::chrono::microseconds>(s_cnt, r_cnt, (int)data_set__.get().size(), name);
} }
constexpr int LoopCount = 10000;
constexpr int MultiMax = 8;
} // internal-linkage } // internal-linkage
TEST(IPC, basic) { TEST(IPC, basic) {
@ -129,22 +154,26 @@ TEST(IPC, basic) {
} }
TEST(IPC, 1v1) { TEST(IPC, 1v1) {
test_sr<relat::single, relat::single, trans::unicast >("ssu", LoopCount, 1, 1); test_sr<relat::single, relat::single, trans::unicast >("ssu", 1, 1);
test_sr<relat::single, relat::multi , trans::unicast >("smu", LoopCount, 1, 1); test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, 1);
test_sr<relat::multi , relat::multi , trans::unicast >("mmu", LoopCount, 1, 1); test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, 1);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, 1); test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, 1); test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, 1);
} }
TEST(IPC, 1vN) { TEST(IPC, 1vN) {
test_sr<relat::single, relat::multi , trans::broadcast>("smb", LoopCount, 1, MultiMax); //test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, 1, MultiMax); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, MultiMax);
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, MultiMax);
} }
TEST(IPC, Nv1) { TEST(IPC, Nv1) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, 1); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, 1);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, 1);
} }
TEST(IPC, NvN) { TEST(IPC, NvN) {
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", LoopCount, MultiMax, MultiMax); //test_sr<relat::multi , relat::multi , trans::unicast >("mmu", MultiMax, MultiMax);
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", MultiMax, MultiMax);
} }

View File

@ -9,6 +9,8 @@
#include <cstddef> // std::size_t #include <cstddef> // std::size_t
#include <cassert> // assert #include <cassert> // assert
#include "capo/scope_guard.hpp"
namespace ipc_ut { namespace ipc_ut {
class thread_pool final { class thread_pool final {
@ -32,6 +34,9 @@ class thread_pool final {
if (pool->quit_) return; if (pool->quit_) return;
if (pool->jobs_.empty()) { if (pool->jobs_.empty()) {
pool->waiting_cnt_ += 1; pool->waiting_cnt_ += 1;
CAPO_SCOPE_GUARD_ = [pool] {
pool->waiting_cnt_ -= 1;
};
if (pool->waiting_cnt_ == pool->workers_.size()) { if (pool->waiting_cnt_ == pool->workers_.size()) {
pool->cv_empty_.notify_all(); pool->cv_empty_.notify_all();
@ -41,8 +46,6 @@ class thread_pool final {
pool->cv_jobs_.wait(guard); pool->cv_jobs_.wait(guard);
if (pool->quit_) return; if (pool->quit_) return;
} while (pool->jobs_.empty()); } while (pool->jobs_.empty());
pool->waiting_cnt_ -= 1;
} }
assert(!pool->jobs_.empty()); assert(!pool->jobs_.empty());
job = std::move(pool->jobs_.front()); job = std::move(pool->jobs_.front());
@ -71,6 +74,7 @@ public:
} }
void start(std::size_t n) { void start(std::size_t n) {
std::unique_lock<std::mutex> guard { lock_ };
if (n <= workers_.size()) return; if (n <= workers_.size()) return;
for (std::size_t i = workers_.size(); i < n; ++i) { for (std::size_t i = workers_.size(); i < n; ++i) {
workers_.push_back(std::thread { &thread_pool::proc, this }); workers_.push_back(std::thread { &thread_pool::proc, this });