noexcept; adjust memory order

This commit is contained in:
mutouyun 2018-12-24 13:29:20 +08:00
parent b815a5e50f
commit 492d095332
5 changed files with 50 additions and 77 deletions

View File

@ -18,29 +18,30 @@ struct alignas(std::max_align_t) elem_array_head {
std::atomic<u2_t> cc_ { 0 }; // connection counter, using for broadcast std::atomic<u2_t> cc_ { 0 }; // connection counter, using for broadcast
std::atomic<u2_t> wt_ { 0 }; // write index std::atomic<u2_t> wt_ { 0 }; // write index
static u1_t index_of(u2_t c) { return static_cast<u1_t>(c); } static u1_t index_of(u2_t c) noexcept { return static_cast<u1_t>(c); }
std::size_t connect() { std::size_t connect() noexcept {
// connect should be called before cursor
return cc_.fetch_add(1, std::memory_order_relaxed); return cc_.fetch_add(1, std::memory_order_relaxed);
} }
std::size_t disconnect() { std::size_t disconnect() noexcept {
return cc_.fetch_sub(1, std::memory_order_relaxed); return cc_.fetch_sub(1, std::memory_order_release);
} }
std::size_t conn_count() const { std::size_t conn_count() const noexcept {
return cc_.load(std::memory_order_relaxed); return cc_.load(std::memory_order_acquire);
} }
u2_t cursor() const { u2_t cursor() const noexcept {
return wt_.load(std::memory_order_relaxed); return wt_.load(std::memory_order_acquire);
} }
auto acquire() { auto acquire() noexcept {
return index_of(wt_.load(std::memory_order_acquire)); return index_of(wt_.load(std::memory_order_relaxed));
} }
void commit() { void commit() noexcept {
wt_.fetch_add(1, std::memory_order_release); wt_.fetch_add(1, std::memory_order_release);
} }
}; };
@ -79,12 +80,12 @@ private:
}; };
elem_t block_[elem_max]; elem_t block_[elem_max];
elem_t* elem_start() { elem_t* elem_start() noexcept {
return block_; return block_;
} }
static elem_t* elem(void* ptr) { return reinterpret_cast<elem_t*>(static_cast<byte_t*>(ptr) - sizeof(head_t)); } static elem_t* elem(void* ptr) noexcept { return reinterpret_cast<elem_t*>(static_cast<byte_t*>(ptr) - sizeof(head_t)); }
elem_t* elem(u1_t i ) { return elem_start() + i; } elem_t* elem(u1_t i ) noexcept { return elem_start() + i; }
public: public:
elem_array() = default; elem_array() = default;
@ -99,32 +100,31 @@ public:
using base_t::conn_count; using base_t::conn_count;
using base_t::cursor; using base_t::cursor;
void* acquire() { void* acquire() noexcept {
elem_t* el = elem(base_t::acquire()); elem_t* el = elem(base_t::acquire());
// check all consumers have finished reading // check all consumers have finished reading
while(1) { while(1) {
uint_t<32> expected = 0; uint_t<32> expected = 0,
conn_cnt = cc_.load(std::memory_order_acquire);
if (el->head_.rc_.compare_exchange_weak( if (el->head_.rc_.compare_exchange_weak(
expected, expected, conn_cnt, std::memory_order_relaxed)) {
static_cast<uint_t<32>>(conn_count()),
std::memory_order_relaxed)) {
break; break;
} }
std::this_thread::yield(); std::this_thread::yield();
std::atomic_thread_fence(std::memory_order_acquire);
} }
return el->data_; return el->data_;
} }
void commit(void* /*ptr*/) { void commit(void* /*ptr*/) noexcept {
base_t::commit(); base_t::commit();
} }
void* take(u2_t cursor) { void* take(u2_t cursor) noexcept {
std::atomic_thread_fence(std::memory_order_acquire);
return elem(base_t::index_of(cursor))->data_; return elem(base_t::index_of(cursor))->data_;
} }
void put(void* ptr) { void put(void* ptr) noexcept {
elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release);
} }
}; };

View File

@ -2,7 +2,6 @@
#include <type_traits> #include <type_traits>
#include <new> #include <new>
#include <exception>
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
@ -38,11 +37,11 @@ public:
queue(queue&&) = delete; queue(queue&&) = delete;
queue& operator=(queue&&) = delete; queue& operator=(queue&&) = delete;
constexpr array_t * elems() const { constexpr array_t * elems() const noexcept {
return elems_; return elems_;
} }
std::size_t connect() { std::size_t connect() noexcept {
if (elems_ == nullptr) return invalid_value; if (elems_ == nullptr) return invalid_value;
if (connected_.exchange(true, std::memory_order_relaxed)) { if (connected_.exchange(true, std::memory_order_relaxed)) {
// if it's already connected, just return an error count // if it's already connected, just return an error count
@ -51,7 +50,7 @@ public:
return elems_->connect(); return elems_->connect();
} }
std::size_t disconnect() { std::size_t disconnect() noexcept {
if (elems_ == nullptr) return invalid_value; if (elems_ == nullptr) return invalid_value;
if (!connected_.exchange(false, std::memory_order_relaxed)) { if (!connected_.exchange(false, std::memory_order_relaxed)) {
// if it's already disconnected, just return an error count // if it's already disconnected, just return an error count
@ -60,15 +59,15 @@ public:
return elems_->disconnect(); return elems_->disconnect();
} }
std::size_t conn_count() const { std::size_t conn_count() const noexcept {
return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
} }
bool connected() const { bool connected() const noexcept {
return connected_.load(std::memory_order_relaxed); return connected_.load(std::memory_order_relaxed);
} }
array_t* attach(array_t* arr) { array_t* attach(array_t* arr) noexcept {
if (arr == nullptr) return nullptr; if (arr == nullptr) return nullptr;
auto old = elems_; auto old = elems_;
elems_ = arr; elems_ = arr;
@ -76,14 +75,14 @@ public:
return old; return old;
} }
array_t* detach() { array_t* detach() noexcept {
if (elems_ == nullptr) return nullptr; if (elems_ == nullptr) return nullptr;
auto old = elems_; auto old = elems_;
elems_ = nullptr; elems_ = nullptr;
return old; return old;
} }
bool push(T const & item) { bool push(T const & item) noexcept {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
::new (ptr) T(item); ::new (ptr) T(item);
@ -92,7 +91,7 @@ public:
} }
template <typename P> template <typename P>
auto push(P&& param) // disable this if P is as same as T auto push(P&& param) noexcept // disable this if P is as same as T
-> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> { -> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
@ -102,7 +101,7 @@ public:
} }
template <typename... P> template <typename... P>
auto push(P&&... params) // some compilers are not support this well auto push(P&&... params) noexcept // some compilers are not support this well
-> Requires<(sizeof...(P) != 1), bool> { -> Requires<(sizeof...(P) != 1), bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
@ -112,15 +111,13 @@ public:
} }
template <typename F> template <typename F>
static queue* multi_wait_for(F&& upd) { static queue* multi_wait_for(F&& upd) noexcept {
for (unsigned k = 0;; ++k) { for (unsigned k = 0;; ++k) {
auto [ques, size] = upd(); auto [ques, size] = upd();
for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) { for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
queue* que = ques[i]; queue* que = ques[i];
if (que == nullptr) continue; if (que == nullptr) continue;
if (que->elems_ == nullptr) throw std::logic_error { if (que->elems_ == nullptr) return nullptr;
"This queue hasn't attached any elem_array."
};
if (que->cursor_ != que->elems_->cursor()) { if (que->cursor_ != que->elems_->cursor()) {
return que; return que;
} }
@ -131,20 +128,16 @@ public:
} }
} }
static T pop(queue* que) { static T pop(queue* que) noexcept {
if (que == nullptr) throw std::invalid_argument { if (que == nullptr) return {};
"Invalid ques pointer." if (que->elems_ == nullptr) return {};
};
if (que->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array."
};
auto item_ptr = static_cast<T*>(que->elems_->take(que->cursor_++)); auto item_ptr = static_cast<T*>(que->elems_->take(que->cursor_++));
T item = std::move(*item_ptr); T item = std::move(*item_ptr);
que->elems_->put(item_ptr); que->elems_->put(item_ptr);
return item; return item;
} }
T pop() { T pop() noexcept {
return pop(multi_wait_for([que = this] { return pop(multi_wait_for([que = this] {
return std::make_tuple(&que, 1); return std::make_tuple(&que, 1);
})); }));

View File

@ -56,7 +56,7 @@
namespace ipc { namespace ipc {
inline void yield(unsigned k) { inline void yield(unsigned k) noexcept {
if (k < 4) { /* Do nothing */ } if (k < 4) { /* Do nothing */ }
else else
if (k < 16) { IPC_LOCK_PAUSE_(); } if (k < 16) { IPC_LOCK_PAUSE_(); }
@ -80,7 +80,7 @@ class rw_lock {
}; };
public: public:
void lock() { void lock() noexcept {
for (unsigned k = 0;; ++k) { for (unsigned k = 0;; ++k) {
auto old = lc_.fetch_or(w_flag, std::memory_order_acquire); auto old = lc_.fetch_or(w_flag, std::memory_order_acquire);
if (!old) return; // got w-lock if (!old) return; // got w-lock
@ -93,11 +93,11 @@ public:
} }
} }
void unlock() { void unlock() noexcept {
lc_.store(0, std::memory_order_release); lc_.store(0, std::memory_order_release);
} }
void lock_shared() { void lock_shared() noexcept {
auto old = lc_.load(std::memory_order_relaxed); auto old = lc_.load(std::memory_order_relaxed);
for (unsigned k = 0;; ++k) { for (unsigned k = 0;; ++k) {
// if w_flag set, just continue // if w_flag set, just continue
@ -113,7 +113,7 @@ public:
} }
} }
void unlock_shared() { void unlock_shared() noexcept {
lc_.fetch_sub(1, std::memory_order_release); lc_.fetch_sub(1, std::memory_order_release);
} }
}; };

View File

@ -134,7 +134,7 @@ std::size_t channel::recv_count() const {
template <typename... P> template <typename... P>
inline bool channel_send(route& rt, P&&... params) { inline bool channel_send(route& rt, P&&... params) {
for (int k = 0; rt.recv_count() == 0; ++k) { for (unsigned k = 0; rt.recv_count() == 0; ++k) {
if (k >= 1024) return false; if (k >= 1024) return false;
std::this_thread::yield(); std::this_thread::yield();
} }

View File

@ -20,6 +20,10 @@ struct msg_t {
int dat_; int dat_;
}; };
bool operator==(msg_t const & m1, msg_t const & m2) {
return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_);
}
} // internal-linkage } // internal-linkage
template <> template <>
@ -254,34 +258,10 @@ void Unit::test_prod_cons_performance() {
test_prod_cons <1, 10>(); // test & verify test_prod_cons <1, 10>(); // test & verify
} }
#ifndef QVERIFY_EXCEPTION_THROWN
#define QVERIFY_EXCEPTION_THROWN(expression, exceptiontype) \
do {\
QT_TRY {\
QT_TRY {\
expression;\
QTest::qFail("Expected exception of type " #exceptiontype " to be thrown" \
" but no exception caught", __FILE__, __LINE__);\
return;\
} QT_CATCH (const exceptiontype &) {\
}\
} QT_CATCH (const std::exception &e) {\
QByteArray msg = QByteArray() + "Expected exception of type " #exceptiontype \
" to be thrown but std::exception caught with message: " + e.what(); \
QTest::qFail(msg.constData(), __FILE__, __LINE__);\
return;\
} QT_CATCH (...) {\
QTest::qFail("Expected exception of type " #exceptiontype " to be thrown" \
" but unknown exception caught", __FILE__, __LINE__);\
return;\
}\
} while (false)
#endif/*!QVERIFY_EXCEPTION_THROWN*/
void Unit::test_queue() { void Unit::test_queue() {
ipc::circ::queue<msg_t> queue; ipc::circ::queue<msg_t> queue;
queue.push(1, 2); queue.push(1, 2);
QVERIFY_EXCEPTION_THROWN(queue.pop(), std::exception); QCOMPARE(queue.pop(), msg_t{});
QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__)); QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__));
auto cq = ::new (cq__) decltype(queue)::array_t; auto cq = ::new (cq__) decltype(queue)::array_t;