From 7a067ba8167cccc60009e9be3cc685ec6ef4ff6d Mon Sep 17 00:00:00 2001 From: Benedek Kupper <57992871+bku-sue@users.noreply.github.com> Date: Tue, 6 Jul 2021 16:42:37 +0200 Subject: [PATCH] introduce bip buffer spsc atomic (#402) Based on the works of Andrea Lattuada and James Munns: https://blog.systems.ethz.ch/blog/2019/the-design-and-implementation-of-a-lock-free-ring-buffer-with-contiguous-reservations.html Whose design was inspired by Simon Cooke: https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist Signed-off-by: Benedek Kupper --- .gitignore | 3 + include/etl/bip_buffer_spsc_atomic.h | 400 +++++++++++++++++++++++++++ test/CMakeLists.txt | 1 + test/codeblocks/ETL.cbp | 2 + test/test_bip_buffer_spsc_atomic.cpp | 356 ++++++++++++++++++++++++ test/vs2019/etl.vcxproj | 2 + test/vs2019/etl.vcxproj.filters | 297 ++++++++++++++++++++ 7 files changed, 1061 insertions(+) create mode 100644 include/etl/bip_buffer_spsc_atomic.h create mode 100644 test/test_bip_buffer_spsc_atomic.cpp diff --git a/.gitignore b/.gitignore index 870ab075..a3f4ac99 100644 --- a/.gitignore +++ b/.gitignore @@ -318,3 +318,6 @@ test/vs2019/DebugNoSTL test/vs2022/Debug LLVM test/vs2019/Debug MSVC - No STL - Built-ins/etl.exe test/vs2019/Debug-LLVM-NoSTL-Builtins/etl.exe +test/vs2019/cmake-build +test/vs2019/Debug MSVC - No STL - Built-ins +test/vs2019/Test1 diff --git a/include/etl/bip_buffer_spsc_atomic.h b/include/etl/bip_buffer_spsc_atomic.h new file mode 100644 index 00000000..517436ac --- /dev/null +++ b/include/etl/bip_buffer_spsc_atomic.h @@ -0,0 +1,400 @@ +///\file + +/** + * @note Based on the works of Andrea Lattuada and James Munns: + * https://blog.systems.ethz.ch/blog/2019/the-design-and-implementation-of-a-lock-free-ring-buffer-with-contiguous-reservations.html + * Whose design was inspired by Simon Cooke: + * https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist + */ +#ifndef ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED +#define ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED + +#include +#include + +#include "platform.h" +#include "alignment.h" +#include "parameter_type.h" +#include "atomic.h" +#include "memory_model.h" +#include "integral_limits.h" +#include "utility.h" +#include "error_handler.h" +#include "span.h" + +#if ETL_HAS_ATOMIC + +namespace etl +{ + class bip_buffer_exception: public exception + { + public: + bip_buffer_exception(string_type reason_, string_type file_name_, numeric_type line_number_) + : exception(reason_, file_name_, line_number_) + { + } + }; + + class bip_buffer_reserve_invalid: public bip_buffer_exception + { + public: + bip_buffer_reserve_invalid(string_type file_name_, numeric_type line_number_) + : bip_buffer_exception("bip_buffer:reserve", file_name_, line_number_) + { + } + }; + + template + class bip_buffer_spsc_atomic_base + { + public: + /// The type used for determining the size of buffer. + typedef typename etl::size_type_lookup::type size_type; + + bool empty() const + { + return size() == 0; + } + + bool full() const + { + return available() == 0; + } + + // returns the total used size, which may be split in two blocks + // so the size will always be smaller after a read commit + size_type size() const + { + size_type write_index = write.load(etl::memory_order_acquire); + size_type read_index = read.load(etl::memory_order_acquire); + + // no wraparound + if (write_index >= read_index) + { + // size is distance between read and write + return write_index - read_index; + } + else + { + size_type last_index = last.load(etl::memory_order_acquire); + + // size is distance between beginning and write, plus read and last + return (write_index - 0) + (last_index - read_index); + } + } + + // returns the largest contiguous(!) available block size + size_type available() const + { + size_type write_index = write.load(etl::memory_order_acquire); + size_type read_index = read.load(etl::memory_order_acquire); + + // no wraparound + if (write_index >= read_index) + { + size_type forward_size = capacity() - write_index; + + // check if there's more space if wrapping around + if (read_index > (forward_size + 1)) + { + return read_index - 1; + } + else + { + return forward_size; + } + } + else // read_index > write_index + { + return read_index - write_index - 1; + } + } + + size_type capacity() const + { + return RESERVED; + } + + size_type max_size() const + { + return RESERVED; + } + + void clear() + { + read.store(0, etl::memory_order_release); + write.store(0, etl::memory_order_release); + last.store(0, etl::memory_order_release); + } + + protected: + bip_buffer_spsc_atomic_base(size_type reserved_) + : read(0), + write(0), + last(0), + RESERVED(reserved_) + { + } + + size_type get_write_reserve(size_type* psize) + { + size_type write_index = write.load(etl::memory_order_relaxed); + size_type read_index = read.load(etl::memory_order_acquire); + + // no wraparound + if (write_index >= read_index) + { + size_type forward_size = capacity() - write_index; + + // we still fit in linearly + if (*psize <= forward_size) + { + return write_index; + } + // there isn't more space even when wrapping around + else if (read_index <= (forward_size + 1)) + { + *psize = forward_size; + return write_index; + } + // better wrap around now + else + { + // check if size fits + // when wrapping, the write index cannot reach read index, + // then we'd not be able to distinguish wrapped situation from linear + if (*psize >= read_index) + { + if (read_index > 0) + { + *psize = read_index - 1; + } + else + { + *psize = 0; + } + } + return 0; + } + } + else // read_index > write_index + { + // doesn't fit + if ((write_index + *psize) >= read_index) + { + *psize = read_index - write_index - 1; + } + return write_index; + } + } + + void apply_write_reserve(size_type windex, size_type wsize) + { + if (wsize > 0) + { + size_type write_index = write.load(etl::memory_order_relaxed); + size_type read_index = read.load(etl::memory_order_acquire); + + // wrapped around already + if (write_index < read_index) + { + ETL_ASSERT_AND_RETURN((windex == write_index) && ((wsize + 1) <= read_index), + ETL_ERROR(bip_buffer_reserve_invalid)); + } + // no wraparound so far, also not wrapping around with this block + else if (windex == write_index) + { + ETL_ASSERT_AND_RETURN(wsize <= (capacity() - write_index), + ETL_ERROR(bip_buffer_reserve_invalid)); + + // move both indexes forward + last.store(windex + wsize, etl::memory_order_release); + } + // wrapping around now + else + { + ETL_ASSERT_AND_RETURN((windex == 0) && ((wsize + 1) <= read_index), + ETL_ERROR(bip_buffer_reserve_invalid)); + } + // always update write index + write.store(windex + wsize, etl::memory_order_release); + } + } + + size_type get_read_reserve(size_type* psize) + { + size_type read_index = read.load(etl::memory_order_relaxed); + size_type write_index = write.load(etl::memory_order_acquire); + + if (read_index > write_index) + { + // writer has wrapped around + + size_type last_index = last.load(etl::memory_order_relaxed); + if (read_index == last_index) + { + // reader reached the end, start read from 0 + read_index = 0; + } + else // (read_index < last_index) + { + // use the remaining buffer at the end + write_index = last_index; + } + } + else + { + // no wraparound, nothing to adjust + } + + // limit to max available size + if ((write_index - read_index) < *psize) + { + *psize = write_index - read_index; + } + return read_index; + } + + void apply_read_reserve(size_type rindex, size_type rsize) + { + if (rsize > 0) + { + size_type rsize_checker = rsize; + ETL_ASSERT_AND_RETURN((rindex == get_read_reserve(&rsize_checker)) && (rsize == rsize_checker), + ETL_ERROR(bip_buffer_reserve_invalid)); + + read.store(rindex + rsize, etl::memory_order_release); + } + } + + private: + etl::atomic read; + etl::atomic write; + etl::atomic last; + const size_type RESERVED; + +#if defined(ETL_POLYMORPHIC_SPSC_BIP_BUFFER_ATOMIC) || defined(ETL_POLYMORPHIC_CONTAINERS) + public: + virtual ~bip_buffer_spsc_atomic_base() + { + } +#else + protected: + ~bip_buffer_spsc_atomic_base() + { + } +#endif + }; + + template + class ibip_buffer_spsc_atomic : public bip_buffer_spsc_atomic_base + { + private: + typedef typename etl::bip_buffer_spsc_atomic_base base_t; + using base_t::get_read_reserve; + using base_t::apply_read_reserve; + using base_t::get_write_reserve; + using base_t::apply_write_reserve; + + public: + typedef T value_type; ///< The type stored in the buffer. + typedef T& reference; ///< A reference to the type used in the buffer. + typedef const T& const_reference; ///< A const reference to the type used in the buffer. +#if ETL_CPP11_SUPPORTED + typedef T&& rvalue_reference;///< An rvalue_reference to the type used in the buffer. +#endif + typedef typename base_t::size_type size_type; ///< The type used for determining the size of the buffer. + + // reserves a memory area for reading up to the max_reserve_size + span read_reserve(size_type max_reserve_size) + { + size_type reserve_size = max_reserve_size; + auto rindex = get_read_reserve(&reserve_size); + return span(p_buffer + rindex, reserve_size); + } + + // commits the previously reserved read memory area + // the reserve can be trimmed at the end before committing + // throws bip_buffer_reserve_invalid + void read_commit(const span &reserve) + { + size_type rindex = etl::distance(p_buffer, reserve.data()); + apply_read_reserve(rindex, reserve.size()); + } + + // reserves a memory area for writing up to the max_reserve_size + span write_reserve(size_type max_reserve_size) + { + size_type reserve_size = max_reserve_size; + auto windex = get_write_reserve(&reserve_size); + return span(p_buffer + windex, reserve_size); + } + + // commits the previously reserved write memory area + // the reserve can be trimmed at the end before committing + // throws bip_buffer_reserve_invalid + void write_commit(const span &reserve) + { + size_type windex = etl::distance(p_buffer, reserve.data()); + apply_write_reserve(windex, reserve.size()); + } + + protected: + ibip_buffer_spsc_atomic(T* p_buffer_, size_type reserved_) + : base_t(reserved_), + p_buffer(p_buffer_) + { + } + + private: + // Disable copy construction and assignment. + ibip_buffer_spsc_atomic(const ibip_buffer_spsc_atomic&) ETL_DELETE; + ibip_buffer_spsc_atomic& operator =(const ibip_buffer_spsc_atomic&) ETL_DELETE; + +#if ETL_CPP11_SUPPORTED + ibip_buffer_spsc_atomic(ibip_buffer_spsc_atomic&&) = delete; + ibip_buffer_spsc_atomic& operator =(ibip_buffer_spsc_atomic&&) = delete; +#endif + + T* const p_buffer; + }; + + //*************************************************************************** + /// A fixed capacity bipartite buffer. + /// This buffer supports concurrent access by one producer and one consumer. + /// \tparam T The type this buffer should support. + /// \tparam SIZE The maximum capacity of the buffer. + /// \tparam MEMORY_MODEL The memory model for the buffer. Determines the type of the internal counter variables. + //*************************************************************************** + template + class bip_buffer_spsc_atomic : public ibip_buffer_spsc_atomic + { + private: + typedef typename etl::ibip_buffer_spsc_atomic base_t; + + public: + typedef typename base_t::size_type size_type; + + private: + static const size_type RESERVED_SIZE = size_type(SIZE); + + public: + ETL_STATIC_ASSERT((SIZE <= (etl::integral_limits::max)), "Size too large for memory model"); + + static const size_type MAX_SIZE = size_type(SIZE); + + bip_buffer_spsc_atomic() + : base_t(reinterpret_cast(&buffer[0]), RESERVED_SIZE) + { + } + + private: + + /// The uninitialised buffer of T used in the bip_buffer_spsc. + typename etl::aligned_storage::value>::type buffer[RESERVED_SIZE]; + }; +} + +#endif /* ETL_HAS_ATOMIC */ + +#endif /* ETL_BIP_BUFFER_SPSC_ATOMIC_INCLUDED */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 74cf97b1..a9780307 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -32,6 +32,7 @@ set(TEST_SOURCE_FILES test_atomic_gcc_sync.cpp test_atomic_std.cpp test_binary.cpp + test_bip_buffer_spsc_atomic.cpp test_bitset.cpp test_bit_stream.cpp test_bloom_filter.cpp diff --git a/test/codeblocks/ETL.cbp b/test/codeblocks/ETL.cbp index a4be44b9..e6572c4b 100644 --- a/test/codeblocks/ETL.cbp +++ b/test/codeblocks/ETL.cbp @@ -255,6 +255,7 @@ + @@ -514,6 +515,7 @@ + diff --git a/test/test_bip_buffer_spsc_atomic.cpp b/test/test_bip_buffer_spsc_atomic.cpp new file mode 100644 index 00000000..f1891040 --- /dev/null +++ b/test/test_bip_buffer_spsc_atomic.cpp @@ -0,0 +1,356 @@ +/****************************************************************************** +The MIT License(MIT) + +Embedded Template Library. +https://github.com/ETLCPP/etl +https://www.etlcpp.com + +Copyright(c) 2021 jwellbelove + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files(the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions : + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +******************************************************************************/ + +#include "unit_test_framework.h" + +#include +#include +#include + +#include "etl/bip_buffer_spsc_atomic.h" + +#include "data.h" + +#if ETL_HAS_ATOMIC + +#if defined(ETL_TARGET_OS_WINDOWS) + #include +#endif + +#define REALTIME_TEST 0 + +namespace +{ + SUITE(test_bip_buffer_spsc_atomic) + { + //************************************************************************* + TEST(test_constructor) + { + etl::bip_buffer_spsc_atomic stream; + + CHECK_EQUAL(5U, stream.max_size()); + CHECK_EQUAL(5U, stream.capacity()); + } + + //************************************************************************* + TEST(test_size_write_read) + { + etl::bip_buffer_spsc_atomic stream; + etl::ibip_buffer_spsc_atomic& istream = stream; + + // Verify empty buffer + CHECK_EQUAL(0U, stream.size()); + CHECK(stream.empty()); + CHECK((stream.max_size() / 2U) <= stream.available()); + CHECK(stream.available() <= stream.max_size()); + + auto reader = istream.read_reserve(1U); + CHECK_EQUAL(0U, reader.size()); + + // Write partially + auto writer = istream.write_reserve(1U); + CHECK_EQUAL(1U, writer.size()); + writer[0] = 1; + + CHECK(stream.empty()); + + istream.write_commit(writer); // 1 * * * * + CHECK_EQUAL(1U, stream.size()); + + writer = istream.write_reserve(1U); + CHECK_EQUAL(1U, writer.size()); + writer[0] = 2; + + istream.write_commit(writer); // 1 2 * * * + CHECK_EQUAL(2U, stream.size()); + + // Write to capacity + writer = istream.write_reserve(istream.available()); + CHECK_EQUAL(3U, writer.size()); + writer[0] = 3; + writer[1] = 4; + writer[2] = 5; + + istream.write_commit(writer); // 1 2 3 4 5 + + // Verify full buffer + CHECK_EQUAL(0U, stream.available()); + CHECK(stream.full()); + CHECK((stream.max_size() - 1) <= stream.size()); + CHECK(stream.size() <= stream.max_size()); + + writer = istream.write_reserve(1U); + CHECK_EQUAL(0U, writer.size()); + + // Read partially + reader = istream.read_reserve(1U); + CHECK_EQUAL(1U, reader.size()); + CHECK_EQUAL(1, reader[0]); + CHECK_EQUAL(5U, stream.size()); + + istream.read_commit(reader); // * 2 3 4 5 + CHECK_EQUAL(4U, stream.size()); + + reader = istream.read_reserve(1U); + CHECK_EQUAL(1U, reader.size()); + CHECK_EQUAL(2, reader[0]); + CHECK_EQUAL(4U, stream.size()); + + istream.read_commit(reader); // * * 3 4 5 + CHECK_EQUAL(3U, stream.size()); + + // Write to wraparound area (one element remains reserved) + writer = istream.write_reserve(istream.available()); + CHECK_EQUAL(1U, writer.size()); + CHECK_EQUAL(1U, stream.available()); + writer[0] = 6; + + istream.write_commit(writer); // 6 * 3 4 5 + + // Verify full buffer + CHECK_EQUAL(0U, stream.available()); + CHECK(stream.full()); + CHECK((stream.max_size() - 1) <= stream.size()); + CHECK(stream.size() <= stream.max_size()); + + writer = istream.write_reserve(1U); + CHECK_EQUAL(0U, writer.size()); + + // Read to capacity + reader = istream.read_reserve(istream.size()); + CHECK_EQUAL(3U, reader.size()); + CHECK_EQUAL(3, reader[0]); + CHECK_EQUAL(4, reader[1]); + CHECK_EQUAL(5, reader[2]); + CHECK_EQUAL(4U, stream.size()); + + istream.read_commit(reader); // * 2 * * * + CHECK_EQUAL(1U, stream.size()); + + reader = istream.read_reserve(istream.size()); + CHECK_EQUAL(1U, reader.size()); + CHECK_EQUAL(6, reader[0]); + CHECK_EQUAL(1U, stream.size()); + + istream.read_commit(reader); // * * * * * + + // Verify empty buffer + CHECK_EQUAL(0U, stream.size()); + CHECK(stream.empty()); + CHECK((stream.max_size() / 2U) <= stream.available()); + CHECK(stream.available() <= stream.max_size()); + } + + //************************************************************************* + TEST(test_clear) + { + etl::bip_buffer_spsc_atomic stream; + etl::ibip_buffer_spsc_atomic& istream = stream; + + CHECK(stream.empty()); + + // Write the whole buffer + auto writer = istream.write_reserve(istream.capacity()); + // data is committed without set to valid value (it won't be read anyway) + istream.write_commit(writer); + CHECK(stream.full()); + + istream.clear(); + CHECK(stream.empty()); + + // Repeat to see that clear() resets the internal state completely and correctly + writer = istream.write_reserve(istream.capacity()); + // data is committed without set to valid value (it won't be read anyway) + istream.write_commit(writer); + CHECK(stream.full()); + + istream.clear(); + CHECK(stream.empty()); + } + + //************************************************************************* + TEST(test_partial_commits) + { + etl::bip_buffer_spsc_atomic stream; + etl::ibip_buffer_spsc_atomic& istream = stream; + + // Write reserve available + auto writer_1 = istream.write_reserve(istream.capacity()); + CHECK_EQUAL(5U, stream.available()); + CHECK_EQUAL(5U, writer_1.size()); + + // Write and commit partially + writer_1[0] = 1; + writer_1[1] = 2; + writer_1[2] = 3; + writer_1[3] = 4; + + // Cannot commit subspans with offset + CHECK_THROW(istream.write_commit(writer_1.subspan(1U, 3U)), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.write_commit(writer_1.subspan(2U, 2U)), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.write_commit(writer_1.subspan(3U, 1U)), etl::bip_buffer_reserve_invalid); + + CHECK_NO_THROW(istream.write_commit(writer_1.subspan(0U, 4U))); // 1 2 3 4 * + CHECK_EQUAL(4U, stream.size()); + + // Can only commit once for each reserve (provided they don't cover a valid area) + CHECK_THROW(istream.write_commit(writer_1), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.write_commit(writer_1.subspan(0U, 4U)), etl::bip_buffer_reserve_invalid); + + // Read reserve available + auto reader_1 = istream.read_reserve(istream.capacity()); + CHECK_EQUAL(4U, reader_1.size()); + + // Read and commit partially + CHECK_EQUAL(1, reader_1[0]); + CHECK_EQUAL(2, reader_1[1]); + CHECK_EQUAL(3, reader_1[2]); + + // Cannot commit subspans with offset + CHECK_THROW(istream.read_commit(reader_1.subspan(1U, 2U)), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.read_commit(reader_1.subspan(2U, 1U)), etl::bip_buffer_reserve_invalid); + + CHECK_NO_THROW(istream.read_commit(reader_1.subspan(0U, 3U))); // * * * 4 * + CHECK_EQUAL(1U, stream.size()); + + // Can only commit once for each reserve (provided they don't cover a valid area) + CHECK_THROW(istream.read_commit(reader_1), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.read_commit(reader_1.subspan(0U, 3U)), etl::bip_buffer_reserve_invalid); + + // Write reserve available + auto writer_2 = istream.write_reserve(istream.capacity()); + CHECK_EQUAL(2U, stream.available()); + CHECK_EQUAL(2U, writer_2.size()); + + // Write and commit partially + writer_2[0] = 5; + CHECK_NO_THROW(istream.write_commit(writer_2.subspan(0U, 1U))); // 5 * * 4 * + CHECK_EQUAL(2U, stream.size()); + // Even though the second committed span could have fit at the end, + // the reservation asked for the largest consecutive block, + // which resulted in a wraparound span to be allocated + + // Can only commit once for each reserve (provided they don't cover a valid area) + CHECK_THROW(istream.write_commit(writer_1), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.write_commit(writer_2), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.write_commit(writer_2.subspan(0U, 1U)), etl::bip_buffer_reserve_invalid); + + // Read reserve available + auto reader_2 = istream.read_reserve(istream.capacity()); + CHECK_EQUAL(1U, reader_2.size()); + + // Read and commit + CHECK_EQUAL(4, reader_2[0]); + CHECK_NO_THROW(istream.read_commit(reader_2)); // 5 * * * * + CHECK_EQUAL(1U, stream.size()); + + // Can only commit once for each reserve (provided they don't cover a valid area) + CHECK_THROW(istream.read_commit(reader_1), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.read_commit(reader_2), etl::bip_buffer_reserve_invalid); + + // Read reserve available + auto reader_3 = istream.read_reserve(istream.capacity()); + CHECK_EQUAL(1U, reader_3.size()); + + // Read and commit + CHECK_EQUAL(5, reader_3[0]); + CHECK_NO_THROW(istream.read_commit(reader_3)); // * * * * * + CHECK_EQUAL(0U, stream.size()); + + // Can only commit once for each reserve (provided they don't cover a valid area) + CHECK_THROW(istream.read_commit(reader_1), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.read_commit(reader_2), etl::bip_buffer_reserve_invalid); + CHECK_THROW(istream.read_commit(reader_3), etl::bip_buffer_reserve_invalid); + + CHECK(stream.empty()); + } + + //************************************************************************* +#if REALTIME_TEST && defined(ETL_COMPILER_MICROSOFT) + #if defined(ETL_TARGET_OS_WINDOWS) // Only Windows priority is currently supported + #define FIX_PROCESSOR_AFFINITY1 SetThreadAffinityMask(GetCurrentThread(), 1); + #define FIX_PROCESSOR_AFFINITY2 SetThreadAffinityMask(GetCurrentThread(), 2); + #else + #error No thread priority modifier defined + #endif + + etl::bip_buffer_spsc_atomic stream; + + const size_t LENGTH = 1000000; + + void timer_event() + { + FIX_PROCESSOR_AFFINITY1; + + const size_t write_chunk_size = 7; + size_t tick = 0; + + while (tick < LENGTH) + { + auto writer = stream.write_reserve(std::min(write_chunk_size, LENGTH - tick)); + for (auto& item : writer) + { + item = tick++; + } + stream.write_commit(writer); + } + } + + TEST(bip_buffer_threads) + { + FIX_PROCESSOR_AFFINITY2; + + const size_t read_chunk_size = stream.capacity(); + + std::vector tick_list; + tick_list.reserve(LENGTH); + + std::thread t1(timer_event); + + while (tick_list.size() < LENGTH) + { + reader = stream.read_reserve(read_chunk_size); + tick_list.insert(tick_list.end(), reader.begin(), reader.end()); + stream.read_commit(reader); + } + + // Join the thread with the main thread + t1.join(); + + CHECK_EQUAL(LENGTH, tick_list.size()); + + for (size_t i = 0; i < LENGTH; i++) + { + CHECK_EQUAL(i, tick_list[i]); + } + } +#endif + }; +} + +#endif // ETL_HAS_ATOMIC diff --git a/test/vs2019/etl.vcxproj b/test/vs2019/etl.vcxproj index 5f65af23..8a0ac45e 100644 --- a/test/vs2019/etl.vcxproj +++ b/test/vs2019/etl.vcxproj @@ -1489,6 +1489,7 @@ + @@ -6560,6 +6561,7 @@ + diff --git a/test/vs2019/etl.vcxproj.filters b/test/vs2019/etl.vcxproj.filters index d1b3f4b4..07cbbfa0 100644 --- a/test/vs2019/etl.vcxproj.filters +++ b/test/vs2019/etl.vcxproj.filters @@ -1125,6 +1125,9 @@ ETL\Utilities + + ETL\Containers + ETL\Private @@ -2561,6 +2564,300 @@ Source Files\Sanity Checks + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files\Sanity Checks + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + + + Source Files\Sanity Checks + Source Files\Sanity Checks