From af1caa88fd1f2da58ed6b6fb67f0a64da7e4f347 Mon Sep 17 00:00:00 2001 From: Sergei Date: Thu, 22 Jan 2026 11:22:23 +0200 Subject: [PATCH] Fix for issue 1276 "Data corruption in the etl::bip_buffer_spsc_atomic" (#1277) * Reproduce data corruption bug in the `etl::bip_buffer_spsc_atomic`. * Fix data corruption bug in the `etl::bip_buffer_spsc_atomic`. --- include/etl/bip_buffer_spsc_atomic.h | 3 + test/test_bip_buffer_spsc_atomic.cpp | 119 +++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/include/etl/bip_buffer_spsc_atomic.h b/include/etl/bip_buffer_spsc_atomic.h index 14d03ca2..9a557eae 100644 --- a/include/etl/bip_buffer_spsc_atomic.h +++ b/include/etl/bip_buffer_spsc_atomic.h @@ -281,6 +281,9 @@ namespace etl else { ETL_ASSERT_OR_RETURN((windex == 0) && ((wsize + 1) <= read_index), ETL_ERROR(bip_buffer_reserve_invalid)); + + // Correct wrapping point + last.store(write_index, etl::memory_order_release); } // Always update write index diff --git a/test/test_bip_buffer_spsc_atomic.cpp b/test/test_bip_buffer_spsc_atomic.cpp index 9711685d..c5f3aec5 100644 --- a/test/test_bip_buffer_spsc_atomic.cpp +++ b/test/test_bip_buffer_spsc_atomic.cpp @@ -28,6 +28,8 @@ SOFTWARE. #include "unit_test_framework.h" +#include +#include #include #include #include @@ -332,6 +334,123 @@ namespace CHECK(stream.empty()); } + //************************************************************************* + TEST(test_optimal_write_issue_1276) + { + etl::bip_buffer_spsc_atomic stream; + etl::ibip_buffer_spsc_atomic& istream = stream; + + // 1. Make all `read`, `write` and `last` in the end of the buffer. + { + const auto writer = istream.write_reserve_optimal(5); + CHECK_EQUAL(5U, writer.size()); + writer[0] = '0'; + writer[1] = '1'; + writer[2] = '2'; + writer[3] = '3'; + writer[4] = '4'; + CHECK_NO_THROW(istream.write_commit(writer)); // [0 1 2 3 4] + const auto reader = istream.read_reserve(); + CHECK_EQUAL(5U, reader.size()); + CHECK_NO_THROW(istream.read_commit(reader)); // * * * * *[] + } + // 2. Write & read 4 bytes. + { + const auto writer = istream.write_reserve_optimal(4); + CHECK_EQUAL(4U, writer.size()); + writer[0] = '5'; + writer[1] = '6'; + writer[2] = '7'; + writer[3] = '8'; + CHECK_NO_THROW(istream.write_commit(writer.subspan(0U, 4U))); // 5 6 7 8] *[ + + const auto reader = istream.read_reserve(); + CHECK_EQUAL(4U, reader.size()); + CHECK_EQUAL('5', reader[0]); + CHECK_EQUAL('6', reader[1]); + CHECK_EQUAL('7', reader[2]); + CHECK_EQUAL('8', reader[3]); + CHECK_NO_THROW(istream.read_commit(reader)); // * * * *[]* + } + // 3. Write and read 2 bytes. + { + const auto writer = istream.write_reserve_optimal(2); + CHECK_EQUAL(3U, writer.size()); + writer[0] = '9'; + writer[1] = 'A'; + CHECK_NO_THROW(istream.write_commit(writer.subspan(0, 2))); // 9 A]* *[* + + const auto reader = istream.read_reserve(); + CHECK_EQUAL(2U, reader.size()); + CHECK_EQUAL('9', reader[0]); + CHECK_EQUAL('A', reader[1]); + } + } + + //************************************************************************* + TEST(test_write_random_back_pressure) + { + // Deliberately seeded with fixed number, so that if it fails then always in the same way. + std::mt19937 mte(123); + + constexpr size_t N = 256; + etl::bip_buffer_spsc_atomic stream; + etl::ibip_buffer_spsc_atomic& istream = stream; + + auto makeRandomNumber = [&mte](const size_t n) -> size_t { return mte() % n; }; + + auto verifyIota = [&](const etl::span& seq) + { + if (!seq.empty()) + { + auto prev = seq[0]; + for (auto i = 1U; i < seq.size(); ++i) + { + const auto curr = seq[i]; + if (prev > curr) + { + CHECK(prev <= curr); + std::cerr << "prev(" << prev << ") should not be bigger than curr(" << curr << ")!\n"; + } + prev = curr; + } + } + }; + + const auto all = istream.write_reserve(N); + CHECK_EQUAL(N, all.size()); + std::fill_n(all.begin(), all.size(), 0); + + int iota = 0; + // Loop writing a bit more than reading. + // 10K iterations is enough to detect 2 failures. + for (int i = 0; i < 10000; ++i) + { + // Write [0...N/16] chunks - on average N/32. + { + const size_t toWrite = makeRandomNumber(N / 16 + 1); + const auto reserve = istream.write_reserve(toWrite); + if (reserve.size() >= toWrite) + { + ++iota; + const auto written = makeRandomNumber(toWrite + 1); + std::fill_n(reserve.begin(), written, iota); + istream.write_commit(reserve.first(written)); + verifyIota(reserve.first(written)); + } + } + + // Read a bit less [0...N/17] chunks - on average N/34. + if (const size_t toRead = makeRandomNumber(N / 17 + 1)) + { + const auto reserve = istream.read_reserve(toRead); + verifyIota(reserve); + const auto read = makeRandomNumber(reserve.size() + 1); + istream.read_commit(reserve.first(read)); + } + } + } + //************************************************************************* #if REALTIME_TEST && defined(ETL_COMPILER_MICROSOFT) #if defined(ETL_TARGET_OS_WINDOWS) // Only Windows priority is currently supported