mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
still have bugs
This commit is contained in:
parent
2d86727be1
commit
db35146542
@ -36,8 +36,9 @@ enum : std::size_t {
|
|||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize>
|
||||||
class elem_array : private elem_array_head {
|
class elem_array : private elem_array_head {
|
||||||
struct head_t {
|
struct head_t {
|
||||||
ac_t rf_; // read flag
|
ac_t rf_; // read flag
|
||||||
ac_t wf_; // write flag
|
std::atomic_bool wf_; // write flag
|
||||||
|
std::atomic_flag acq_; // acquire flag
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -102,18 +103,27 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void* acquire(void) {
|
void* acquire(void) {
|
||||||
auto el = elem(wt_.load(std::memory_order_consume));
|
elem_t* el;
|
||||||
// check read finished by all consumers
|
while (1) {
|
||||||
do {
|
// searching an available element
|
||||||
uc_t expected = 0;
|
el = elem(wt_.fetch_add(1, std::memory_order_acquire));
|
||||||
std::atomic_thread_fence(std::memory_order_acquire);
|
if (el->head_.acq_.test_and_set(std::memory_order_release)) {
|
||||||
if (el->head_.rf_.compare_exchange_weak(
|
std::this_thread::yield();
|
||||||
expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) {
|
continue;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
// check read finished by all consumers
|
||||||
} while(1);
|
while(1) {
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
uc_t expected = 0;
|
||||||
|
std::atomic_thread_fence(std::memory_order_acquire);
|
||||||
|
if (el->head_.rf_.compare_exchange_weak(
|
||||||
|
expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
el->head_.acq_.clear(std::memory_order_release);
|
||||||
|
break;
|
||||||
|
}
|
||||||
return el->data_;
|
return el->data_;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +141,7 @@ public:
|
|||||||
* set wf_ for the other producer thread which is commiting
|
* set wf_ for the other producer thread which is commiting
|
||||||
* the element matches cr_ could see it has commited
|
* the element matches cr_ could see it has commited
|
||||||
*/
|
*/
|
||||||
el->head_.wf_.store(1, std::memory_order_release);
|
el->head_.wf_.store(true, std::memory_order_release);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/*
|
/*
|
||||||
@ -139,7 +149,7 @@ public:
|
|||||||
* so we just increase the cursor & go check the next
|
* so we just increase the cursor & go check the next
|
||||||
*/
|
*/
|
||||||
++next;
|
++next;
|
||||||
el->head_.wf_.store(0, std::memory_order_release);
|
el->head_.wf_.store(false, std::memory_order_release);
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
* it needs to go back and judge again
|
* it needs to go back and judge again
|
||||||
@ -155,7 +165,7 @@ public:
|
|||||||
/*
|
/*
|
||||||
* check next element has commited or not
|
* check next element has commited or not
|
||||||
*/
|
*/
|
||||||
} while(el = elem(++wi), el->head_.wf_.exchange(0, std::memory_order_acq_rel));
|
} while(el = elem(++wi), el->head_.wf_.exchange(false, std::memory_order_acq_rel));
|
||||||
}
|
}
|
||||||
|
|
||||||
uc_t cursor(void) const {
|
uc_t cursor(void) const {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user