merge issue-61; add condition for linux

This commit is contained in:
mutouyun 2021-09-19 17:21:39 +08:00
parent 4ca300b3e5
commit 0cccdac868
13 changed files with 191 additions and 33 deletions

View File

@ -0,0 +1,39 @@
#pragma once
#include <cstdint> // std::uint64_t
#include "libipc/export.h"
#include "libipc/def.h"
#include "libipc/mutex.h"
namespace ipc {
namespace sync {
class IPC_EXPORT condition {
condition(condition const &) = delete;
condition &operator=(condition const &) = delete;
public:
condition();
explicit condition(char const *name);
~condition();
void const *native() const noexcept;
void *native() noexcept;
bool valid() const noexcept;
bool open(char const *name) noexcept;
void close() noexcept;
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept;
bool notify() noexcept;
bool broadcast() noexcept;
private:
class condition_;
condition_* p_;
};
} // namespace sync
} // namespace ipc

View File

@ -5,7 +5,8 @@ if(UNIX)
else()
file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_win.cpp)
endif()
aux_source_directory(${LIBIPC_PROJECT_DIR}/src SRC_FILES)
aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES)
aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES)
file(GLOB HEAD_FILES
${LIBIPC_PROJECT_DIR}/include/libipc/*.h
@ -37,8 +38,8 @@ set_target_properties(${PROJECT_NAME}
# set version
set_target_properties(${PROJECT_NAME}
PROPERTIES
VERSION 1.0.0
SOVERSION 1)
VERSION 1.1.0
SOVERSION 2)
target_include_directories(${PROJECT_NAME}
PUBLIC ${LIBIPC_PROJECT_DIR}/include

View File

@ -1,12 +1,15 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <pthread.h>
#include "libipc/utility/log.h"
#include "libipc/platform/get_wait_time.h"
#include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h"
#include "libipc/mutex.h"
#include "libipc/shm.h"
namespace ipc {
namespace detail {
@ -89,7 +92,7 @@ public:
return true;
case invalid_value: {
int eno;
if ((eno = ::pthread_cond_wait(cond_, mtx.native())) != 0) {
if ((eno = ::pthread_cond_wait(cond_, static_cast<pthread_mutex_t *>(mtx.native()))) != 0) {
ipc::error("fail pthread_cond_wait[%d]\n", eno);
return false;
}
@ -98,7 +101,7 @@ public:
default: {
auto ts = detail::make_timespec(tm);
int eno;
if ((eno = ::pthread_cond_timedwait(cond_, mtx.native(), &ts)) != 0) {
if ((eno = ::pthread_cond_timedwait(cond_, static_cast<pthread_mutex_t *>(mtx.native()), &ts)) != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);

View File

@ -0,0 +1,70 @@
#include "libipc/condition.h"
#include "libipc/utility/pimpl.h"
#include "libipc/memory/resource.h"
#include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/condition_win.h"
#elif defined(IPC_OS_LINUX_)
#include "libipc/platform/condition_linux.h"
#else/*linux*/
# error "Unsupported platform."
#endif
namespace ipc {
namespace sync {
class condition::condition_ : public ipc::pimpl<condition_> {
public:
ipc::detail::sync::condition cond_;
};
condition::condition()
: p_(p_->make()) {
}
condition::condition(char const * name)
: condition() {
open(name);
}
condition::~condition() {
close();
p_->clear();
}
void const *condition::native() const noexcept {
return impl(p_)->cond_.native();
}
void *condition::native() noexcept {
return impl(p_)->cond_.native();
}
bool condition::valid() const noexcept {
return impl(p_)->cond_.valid();
}
bool condition::open(char const *name) noexcept {
return impl(p_)->cond_.open(name);
}
void condition::close() noexcept {
impl(p_)->cond_.close();
}
bool condition::wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
return impl(p_)->cond_.wait(mtx, tm);
}
bool condition::notify() noexcept {
return impl(p_)->cond_.notify();
}
bool condition::broadcast() noexcept {
return impl(p_)->cond_.broadcast();
}
} // namespace sync
} // namespace ipc

View File

@ -99,22 +99,6 @@ public:
}
} const data_set__;
#define IPC_ASSERT_TRUE(condition) \
do { \
bool check = !!(condition); \
GTEST_TEST_BOOLEAN_(check, #condition, false, true, \
GTEST_NONFATAL_FAILURE_); \
if (!check) return; \
} while (0)
#define IPC_ASSERT_FALSE(condition) \
do { \
bool check = !!(condition); \
GTEST_TEST_BOOLEAN_(!check, #condition, true, false, \
GTEST_NONFATAL_FAILURE_); \
if (check) return; \
} while (0)
template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>>
void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
@ -126,10 +110,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [name, &sw, r_cnt, k] {
Que que { name, ipc::sender };
IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt));
ASSERT_TRUE(que.wait_for_recv(r_cnt));
sw.start();
for (int i = 0; i < (int)data_set__.get().size(); ++i) {
IPC_ASSERT_TRUE(que.send(data_set__.get()[i]));
ASSERT_TRUE(que.send(data_set__.get()[i]));
}
};
}
@ -139,17 +123,17 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
Que que { name, ipc::receiver };
for (;;) {
rand_buf got { que.recv() };
IPC_ASSERT_FALSE(got.empty());
ASSERT_FALSE(got.empty());
int i = got.get_id();
if (i == -1) {
return;
}
IPC_ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
auto const &data_set = data_set__.get()[i];
if (data_set != got) {
printf("data_set__.get()[%d] != got, size = %zd/%zd\n",
i, data_set.size(), got.size());
IPC_ASSERT_TRUE(false);
ASSERT_TRUE(false);
}
}
};
@ -157,7 +141,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().wait_for_done();
Que que { name };
IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt));
ASSERT_TRUE(que.wait_for_recv(r_cnt));
for (int k = 0; k < r_cnt; ++k) {
que.send(rand_buf{msg_head{-1}});
}

View File

@ -3,6 +3,8 @@
#include <iostream>
#include <mutex>
#include <chrono>
#include <deque>
#include <cstdio>
#include "test.h"
@ -57,7 +59,7 @@ TEST(Sync, Mutex) {
ipc::sync::mutex lock;
EXPECT_TRUE(lock.open("test-mutex-robust"));
std::thread{[] {
ipc::sync::mutex lock{"test-mutex-robust"};
ipc::sync::mutex lock {"test-mutex-robust"};
EXPECT_TRUE(lock.valid());
EXPECT_TRUE(lock.lock());
}}.join();
@ -68,7 +70,7 @@ TEST(Sync, Mutex) {
EXPECT_TRUE(lock.lock());
i = 100;
auto t2 = std::thread{[&i] {
ipc::sync::mutex lock{"test-mutex-robust"};
ipc::sync::mutex lock {"test-mutex-robust"};
EXPECT_TRUE(lock.valid());
EXPECT_FALSE(lock.try_lock());
EXPECT_TRUE(lock.lock());
@ -88,12 +90,71 @@ TEST(Sync, Semaphore) {
ipc::sync::semaphore sem;
EXPECT_TRUE(sem.open("test-sem"));
std::thread{[] {
ipc::sync::semaphore sem{"test-sem"};
EXPECT_TRUE(sem.post(10));
ipc::sync::semaphore sem {"test-sem"};
EXPECT_TRUE(sem.post(1000));
}}.join();
for (int i = 0; i < 10; ++i) {
for (int i = 0; i < 1000; ++i) {
EXPECT_TRUE(sem.wait(0));
}
EXPECT_FALSE(sem.wait(0));
}
#include "libipc/condition.h"
TEST(Sync, Condition) {
ipc::sync::condition cond;
EXPECT_TRUE(cond.open("test-cond"));
ipc::sync::mutex lock;
EXPECT_TRUE(lock.open("test-mutex"));
std::deque<int> que;
auto job = [&que](int num) {
ipc::sync::condition cond {"test-cond"};
ipc::sync::mutex lock {"test-mutex"};
for (;;) {
int val = 0;
{
std::lock_guard<ipc::sync::mutex> guard {lock};
while (que.empty()) {
ASSERT_TRUE(cond.wait(lock));
}
val = que.front();
que.pop_front();
}
if (val == 0) {
std::printf("test-cond-%d: exit.\n", num);
return;
}
std::printf("test-cond-%d: %d\n", num, val);
}
};
std::thread test_cond1 {job, 1};
std::thread test_cond2 {job, 2};
for (int i = 1; i < 100; ++i) {
{
std::lock_guard<ipc::sync::mutex> guard {lock};
que.push_back(i);
}
cond.notify();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
for (int i = 1; i < 100; ++i) {
{
std::lock_guard<ipc::sync::mutex> guard {lock};
que.push_back(i);
}
cond.broadcast();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
{
std::lock_guard<ipc::sync::mutex> guard {lock};
que.push_back(0);
que.push_back(0);
}
cond.broadcast();
test_cond1.join();
test_cond2.join();
}