diff --git a/include/libipc/condition.h b/include/libipc/condition.h new file mode 100644 index 0000000..d3b3a59 --- /dev/null +++ b/include/libipc/condition.h @@ -0,0 +1,39 @@ +#pragma once + +#include // std::uint64_t + +#include "libipc/export.h" +#include "libipc/def.h" +#include "libipc/mutex.h" + +namespace ipc { +namespace sync { + +class IPC_EXPORT condition { + condition(condition const &) = delete; + condition &operator=(condition const &) = delete; + +public: + condition(); + explicit condition(char const *name); + ~condition(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name) noexcept; + void close() noexcept; + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; + bool notify() noexcept; + bool broadcast() noexcept; + +private: + class condition_; + condition_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad60120..00dd78e 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,7 +5,8 @@ if(UNIX) else() file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_win.cpp) endif() -aux_source_directory(${LIBIPC_PROJECT_DIR}/src SRC_FILES) +aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES) +aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES) file(GLOB HEAD_FILES ${LIBIPC_PROJECT_DIR}/include/libipc/*.h @@ -37,8 +38,8 @@ set_target_properties(${PROJECT_NAME} # set version set_target_properties(${PROJECT_NAME} PROPERTIES - VERSION 1.0.0 - SOVERSION 1) + VERSION 1.1.0 + SOVERSION 2) target_include_directories(${PROJECT_NAME} PUBLIC ${LIBIPC_PROJECT_DIR}/include diff --git a/src/buffer.cpp b/src/libipc/buffer.cpp similarity index 100% rename from src/buffer.cpp rename to src/libipc/buffer.cpp diff --git a/src/ipc.cpp b/src/libipc/ipc.cpp similarity index 100% rename from src/ipc.cpp rename to src/libipc/ipc.cpp diff --git a/src/libipc/platform/condition_linux.h b/src/libipc/platform/condition_linux.h index 509c552..2c824a9 100644 --- a/src/libipc/platform/condition_linux.h +++ b/src/libipc/platform/condition_linux.h @@ -1,12 +1,15 @@ #pragma once #include +#include #include -#include "libipc/utility/log.h" #include "libipc/platform/get_wait_time.h" +#include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" #include "libipc/mutex.h" +#include "libipc/shm.h" namespace ipc { namespace detail { @@ -89,7 +92,7 @@ public: return true; case invalid_value: { int eno; - if ((eno = ::pthread_cond_wait(cond_, mtx.native())) != 0) { + if ((eno = ::pthread_cond_wait(cond_, static_cast(mtx.native()))) != 0) { ipc::error("fail pthread_cond_wait[%d]\n", eno); return false; } @@ -98,7 +101,7 @@ public: default: { auto ts = detail::make_timespec(tm); int eno; - if ((eno = ::pthread_cond_timedwait(cond_, mtx.native(), &ts)) != 0) { + if ((eno = ::pthread_cond_timedwait(cond_, static_cast(mtx.native()), &ts)) != 0) { if (eno != ETIMEDOUT) { ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", eno, tm, ts.tv_sec, ts.tv_nsec); diff --git a/src/pool_alloc.cpp b/src/libipc/pool_alloc.cpp similarity index 100% rename from src/pool_alloc.cpp rename to src/libipc/pool_alloc.cpp diff --git a/src/shm.cpp b/src/libipc/shm.cpp similarity index 100% rename from src/shm.cpp rename to src/libipc/shm.cpp diff --git a/src/libipc/sync/condition.cpp b/src/libipc/sync/condition.cpp new file mode 100644 index 0000000..2859d21 --- /dev/null +++ b/src/libipc/sync/condition.cpp @@ -0,0 +1,70 @@ + +#include "libipc/condition.h" + +#include "libipc/utility/pimpl.h" +#include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/condition_win.h" +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/condition_linux.h" +#else/*linux*/ +# error "Unsupported platform." +#endif + +namespace ipc { +namespace sync { + +class condition::condition_ : public ipc::pimpl { +public: + ipc::detail::sync::condition cond_; +}; + +condition::condition() + : p_(p_->make()) { +} + +condition::condition(char const * name) + : condition() { + open(name); +} + +condition::~condition() { + close(); + p_->clear(); +} + +void const *condition::native() const noexcept { + return impl(p_)->cond_.native(); +} + +void *condition::native() noexcept { + return impl(p_)->cond_.native(); +} + +bool condition::valid() const noexcept { + return impl(p_)->cond_.valid(); +} + +bool condition::open(char const *name) noexcept { + return impl(p_)->cond_.open(name); +} + +void condition::close() noexcept { + impl(p_)->cond_.close(); +} + +bool condition::wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + return impl(p_)->cond_.wait(mtx, tm); +} + +bool condition::notify() noexcept { + return impl(p_)->cond_.notify(); +} + +bool condition::broadcast() noexcept { + return impl(p_)->cond_.broadcast(); +} + +} // namespace sync +} // namespace ipc diff --git a/src/mutex.cpp b/src/libipc/sync/mutex.cpp similarity index 100% rename from src/mutex.cpp rename to src/libipc/sync/mutex.cpp diff --git a/src/semaphore.cpp b/src/libipc/sync/semaphore.cpp similarity index 100% rename from src/semaphore.cpp rename to src/libipc/sync/semaphore.cpp diff --git a/src/waiter.cpp b/src/libipc/waiter.cpp similarity index 100% rename from src/waiter.cpp rename to src/libipc/waiter.cpp diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 79f3b26..715fa16 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -99,22 +99,6 @@ public: } } const data_set__; -#define IPC_ASSERT_TRUE(condition) \ - do { \ - bool check = !!(condition); \ - GTEST_TEST_BOOLEAN_(check, #condition, false, true, \ - GTEST_NONFATAL_FAILURE_); \ - if (!check) return; \ - } while (0) - -#define IPC_ASSERT_FALSE(condition) \ - do { \ - bool check = !!(condition); \ - GTEST_TEST_BOOLEAN_(!check, #condition, true, false, \ - GTEST_NONFATAL_FAILURE_); \ - if (check) return; \ - } while (0) - template > void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); @@ -126,10 +110,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { for (int k = 0; k < s_cnt; ++k) { ipc_ut::sender() << [name, &sw, r_cnt, k] { Que que { name, ipc::sender }; - IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); for (int i = 0; i < (int)data_set__.get().size(); ++i) { - IPC_ASSERT_TRUE(que.send(data_set__.get()[i])); + ASSERT_TRUE(que.send(data_set__.get()[i])); } }; } @@ -139,17 +123,17 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { Que que { name, ipc::receiver }; for (;;) { rand_buf got { que.recv() }; - IPC_ASSERT_FALSE(got.empty()); + ASSERT_FALSE(got.empty()); int i = got.get_id(); if (i == -1) { return; } - IPC_ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); auto const &data_set = data_set__.get()[i]; if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", i, data_set.size(), got.size()); - IPC_ASSERT_TRUE(false); + ASSERT_TRUE(false); } } }; @@ -157,7 +141,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().wait_for_done(); Que que { name }; - IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); for (int k = 0; k < r_cnt; ++k) { que.send(rand_buf{msg_head{-1}}); } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 50ad929..907437e 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include "test.h" @@ -57,7 +59,7 @@ TEST(Sync, Mutex) { ipc::sync::mutex lock; EXPECT_TRUE(lock.open("test-mutex-robust")); std::thread{[] { - ipc::sync::mutex lock{"test-mutex-robust"}; + ipc::sync::mutex lock {"test-mutex-robust"}; EXPECT_TRUE(lock.valid()); EXPECT_TRUE(lock.lock()); }}.join(); @@ -68,7 +70,7 @@ TEST(Sync, Mutex) { EXPECT_TRUE(lock.lock()); i = 100; auto t2 = std::thread{[&i] { - ipc::sync::mutex lock{"test-mutex-robust"}; + ipc::sync::mutex lock {"test-mutex-robust"}; EXPECT_TRUE(lock.valid()); EXPECT_FALSE(lock.try_lock()); EXPECT_TRUE(lock.lock()); @@ -88,12 +90,71 @@ TEST(Sync, Semaphore) { ipc::sync::semaphore sem; EXPECT_TRUE(sem.open("test-sem")); std::thread{[] { - ipc::sync::semaphore sem{"test-sem"}; - EXPECT_TRUE(sem.post(10)); + ipc::sync::semaphore sem {"test-sem"}; + EXPECT_TRUE(sem.post(1000)); }}.join(); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 1000; ++i) { EXPECT_TRUE(sem.wait(0)); } EXPECT_FALSE(sem.wait(0)); +} + +#include "libipc/condition.h" + +TEST(Sync, Condition) { + ipc::sync::condition cond; + EXPECT_TRUE(cond.open("test-cond")); + ipc::sync::mutex lock; + EXPECT_TRUE(lock.open("test-mutex")); + std::deque que; + + auto job = [&que](int num) { + ipc::sync::condition cond {"test-cond"}; + ipc::sync::mutex lock {"test-mutex"}; + for (;;) { + int val = 0; + { + std::lock_guard guard {lock}; + while (que.empty()) { + ASSERT_TRUE(cond.wait(lock)); + } + val = que.front(); + que.pop_front(); + } + if (val == 0) { + std::printf("test-cond-%d: exit.\n", num); + return; + } + std::printf("test-cond-%d: %d\n", num, val); + } + }; + std::thread test_cond1 {job, 1}; + std::thread test_cond2 {job, 2}; + + for (int i = 1; i < 100; ++i) { + { + std::lock_guard guard {lock}; + que.push_back(i); + } + cond.notify(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + for (int i = 1; i < 100; ++i) { + { + std::lock_guard guard {lock}; + que.push_back(i); + } + cond.broadcast(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + { + std::lock_guard guard {lock}; + que.push_back(0); + que.push_back(0); + } + cond.broadcast(); + + test_cond1.join(); + test_cond2.join(); } \ No newline at end of file