Partial implementation

This commit is contained in:
John Wellbelove 2022-07-31 13:55:23 +01:00
parent e5fea50fab
commit 40290c4fa2
7 changed files with 265 additions and 187 deletions

View File

@ -169,6 +169,7 @@ namespace etl
NULL_MESSAGE_ROUTER = 255,
MESSAGE_BUS = 254,
ALL_MESSAGE_ROUTERS = 253,
MESSAGE_BROKER = 252,
MAX_MESSAGE_ROUTER = 249
};

View File

@ -39,153 +39,147 @@ SOFTWARE.
#include "array.h"
#include "span.h"
#if ETL_USING_STL && ETL_USING_CPP11
#include <array>
#endif
namespace etl
{
//***************************************************************************
/// Base exception class for message broker
//***************************************************************************
class message_broker_exception : public etl::exception
{
public:
message_broker_exception(string_type reason_, string_type file_name_, numeric_type line_number_)
: etl::exception(reason_, file_name_, line_number_)
{
}
};
//***************************************************************************
/// Too many messages.
//***************************************************************************
class message_broker_too_many_mesaages : public etl::message_broker_exception
{
public:
message_broker_too_many_mesaages(string_type file_name_, numeric_type line_number_)
: message_broker_exception(ETL_ERROR_TEXT("message broker:too many messages", ETL_MESSAGE_BROKER_FILE_ID"A"), file_name_, line_number_)
{
}
};
//***************************************************************************
/// Too many subscribers.
//***************************************************************************
class message_broker_too_many_subscribers : public etl::message_broker_exception
{
public:
message_broker_too_many_subscribers(string_type file_name_, numeric_type line_number_)
: message_broker_exception(ETL_ERROR_TEXT("message broker:too many subscribers", ETL_MESSAGE_BROKER_FILE_ID"B"), file_name_, line_number_)
{
}
};
//***************************************************************************
/// Message broker
//***************************************************************************
class message_broker : public etl::imessage_router
{
public:
private:
//*******************************************
class subscription
class subscription_node
{
public:
friend class message_broker;
subscription(etl::imessage_router& router_,
etl::message_id_t* p_message_ids_,
size_t n_messages_)
: p_router(&router_)
, p_message_ids(p_message_ids_)
, p_next(ETL_NULLPTR)
, n_messages(uint16_t(n_messages_))
protected:
//*******************************
subscription_node()
: p_next(ETL_NULLPTR)
{
}
subscription(etl::imessage_router& router_,
etl::message_id_t* first,
etl::message_id_t* last)
: p_router(&router_)
, p_message_ids(first)
, p_next(ETL_NULLPTR)
, n_messages(uint16_t(etl::distance(first, last)))
{
}
template <size_t NMessages>
subscription(etl::imessage_router& router_,
etl::array<etl::message_id_t, NMessages> message_ids_)
: p_router(&router_)
, p_message_ids(message_ids_.data())
, p_next(ETL_NULLPTR)
, n_messages(uint16_t(NMessages))
{
}
#if ETL_USING_STL && ETL_USING_CPP11
template <size_t NMessages>
subscription(etl::imessage_router& router_,
std::array<etl::message_id_t, NMessages> message_ids_)
: p_router(&router_)
, p_message_ids(message_ids_.data())
, p_next(ETL_NULLPTR)
, n_messages(uint16_t(NMessages))
{
}
#endif
etl::imessage_router* router() const
{
return p_router;
}
etl::span<const etl::message_id_t> message_id_list() const
{
return etl::span<const etl::message_id_t>(p_message_ids, p_message_ids + n_messages);
}
void next(subscription* sub)
//*******************************
void next(subscription_node* sub)
{
p_next = sub;
}
subscription* next() const
//*******************************
subscription_node* next() const
{
return p_next;
}
//*******************************
void terminate()
{
next(ETL_NULLPTR);
}
void append(subscription* sub)
//*******************************
void append(subscription_node* sub)
{
sub->next(next());
if (sub != ETL_NULLPTR)
{
sub->next(next());
}
next(sub);
}
void append(subscription& sub)
//*******************************
void append(subscription_node& sub)
{
append(&sub);
}
private:
subscription_node* p_next;
};
subscription()
: p_router(ETL_NULLPTR)
, p_message_ids(ETL_NULLPTR)
, p_next(ETL_NULLPTR)
, n_messages(0U)
typedef etl::span<const etl::message_id_t> message_id_span_t;
public:
//*******************************************
class subscription : public subscription_node
{
public:
friend class message_broker;
//*******************************
subscription(etl::imessage_router& router_,
etl::message_id_t* p_message_ids_,
size_t n_messages_)
: p_router(&router_)
, message_ids(p_message_ids_, p_message_ids_ + n_messages_)
{
}
etl::imessage_router* const p_router;
const etl::message_id_t* const p_message_ids;
subscription* p_next;
const uint16_t n_messages;
//*******************************
subscription(etl::imessage_router& router_,
etl::message_id_t* first,
etl::message_id_t* last)
: p_router(&router_)
, message_ids(first, last)
{
}
//*******************************
template <size_t NMessages>
subscription(etl::imessage_router& router_,
const etl::array<etl::message_id_t, NMessages>& message_ids_)
: p_router(&router_)
, message_ids(message_ids_.data(), message_ids_.data() + NMessages)
{
}
#if ETL_USING_STL && ETL_USING_CPP11
//*******************************
template <size_t NMessages>
subscription(etl::imessage_router& router_,
const std::array<etl::message_id_t, NMessages>& message_ids_)
: p_router(&router_)
, message_ids(message_ids_.data(), message_ids_.data() + NMessages)
{
}
#endif
private:
//*******************************
etl::imessage_router* router() const
{
return p_router;
}
//*******************************
message_id_span_t message_id_list() const
{
return message_ids;
}
//*******************************
subscription* next_subscription() const
{
return static_cast<subscription*>(next());
}
////*******************************
//subscription()
// : p_router(ETL_NULLPTR)
// , message_ids()
//{
//}
etl::imessage_router* const p_router;
message_id_span_t message_ids;
};
using etl::imessage_router::receive;
@ -243,79 +237,83 @@ namespace etl
//*******************************************
virtual void receive(const etl::imessage& msg) ETL_OVERRIDE
{
//const etl::message_id_t id = msg.get_message_id();
const etl::message_id_t id = msg.get_message_id();
//// Find the range of routers that are subscribed to this message.
//ETL_OR_STD::pair<message_list_t, message_list_t> range = etl::equal_range(message_list_begin, message_list_end, message_element{ id, 0 }, CompareMessageElements);
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
//// We have a valid range?
//if (range.first != range.second)
//{
// // Call receive() for each.
// for (message_list_t itr = range.first; itr != range.second; ++itr)
// {
// etl::imessage_router& router = *router_list_begin[itr->router_index];
while (sub != ETL_NULLPTR)
{
message_id_span_t message_ids = sub->message_id_list();
// router.receive(msg);
// }
//}
//else
//{
// // Pass on to the successor, if present.
// if (has_successor())
// {
// etl::imessage_router& successor = get_successor();
message_id_span_t::const_iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
// if (successor.accepts(msg.get_message_id()))
// {
// successor.receive(msg);
// }
// }
//}
if (itr != message_ids.end())
{
sub->router()->receive(msg);
}
sub = sub->next_subscription();
}
}
// Always pass the message on to the successor.
if (has_successor())
{
etl::imessage_router& successor = get_successor();
successor.receive(msg);
}
}
//*******************************************
virtual void receive(etl::shared_message shared_msg) ETL_OVERRIDE
{
//if (has_successor())
//{
// etl::imessage_router& successor = get_successor();
const etl::message_id_t id = shared_msg.get_message().get_message_id();
// if (successor.accepts(shared_msg.get_message().get_message_id()))
// {
// successor.receive(shared_msg);
// }
//}
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
while (sub != ETL_NULLPTR)
{
message_id_span_t message_ids = sub->message_id_list();
message_id_span_t::const_iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
if (itr != message_ids.end())
{
sub->router()->receive(shared_msg);
}
sub = sub->next_subscription();
}
}
// Always pass the message on to a successor.
if (has_successor())
{
get_successor().receive(shared_msg);
}
}
using imessage_router::accepts;
//*******************************************
/// Does this message broker accept the message id?
/// Message brokers accept all messages.
//*******************************************
virtual bool accepts(etl::message_id_t id) const ETL_OVERRIDE
{
if (is_in_subscription_list(id))
{
return true;
}
else
{
if (has_successor())
{
return get_successor().accepts(id);
}
else
{
return false;
}
}
return true;
}
//*******************************************
void clear()
{
head.next(ETL_NULLPTR);
head.terminate();
}
//********************************************
@ -349,13 +347,13 @@ namespace etl
{
const etl::imessage_router* p_target_router = p_router;
etl::message_broker::subscription* p_sub = head.next();
etl::message_broker::subscription* p_sub_previous = &head;
subscription_node* p_sub = head.next();
subscription_node* p_sub_previous = &head;
while (p_sub != ETL_NULLPTR)
{
// Do we already have a subscription for the router?
if (p_sub->router() == p_target_router)
if (static_cast<subscription*>(p_sub)->router() == p_target_router)
{
// Then unlink it.
p_sub_previous->next(p_sub->next()); // Jump over the subscription.
@ -382,18 +380,30 @@ namespace etl
//*******************************************
bool is_in_subscription_list(etl::message_id_t id) const
{
//if (!empty())
//{
// // Scan the subscription lists.
// subscription* list = subscription_list;
//}
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
while (sub != ETL_NULLPTR)
{
message_id_span_t message_ids = sub->message_id_list();
message_id_span_t::const_iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
if (itr != message_ids.end())
{
return true;
}
sub = sub->next_subscription();
}
}
return false;
}
static ETL_CONSTANT uint_least8_t Unused = etl::integral_limits<uint_least8_t>::max;
subscription head;
subscription_node head;
};
//***************************************************************************

View File

@ -26,12 +26,10 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
******************************************************************************/
#ifndef ETL_MESSAGE_BUS_
#define ETL_MESSAGE_BUS_
#ifndef ETL_MESSAGE_BUS_INCLUDED
#define ETL_MESSAGE_BUS_INCLUDED
#include <stdint.h>
#include "algorithm.h"
#include "platform.h"
#include "algorithm.h"
#include "vector.h"
@ -208,9 +206,9 @@ namespace etl
// Do any message buses.
// These are always at the end of the list.
router_list_t::iterator irouter = etl::lower_bound(router_list.begin(),
router_list.end(),
etl::imessage_bus::MESSAGE_BUS,
compare_router_id());
router_list.end(),
etl::imessage_bus::MESSAGE_BUS,
compare_router_id());
while (irouter != router_list.end())
{
@ -336,7 +334,7 @@ namespace etl
//*******************************************
void clear()
{
return router_list.clear();
router_list.clear();
}
//********************************************

View File

@ -157,6 +157,7 @@ namespace etl
NULL_MESSAGE_ROUTER = 255,
MESSAGE_BUS = 254,
ALL_MESSAGE_ROUTERS = 253,
MESSAGE_BROKER = 252,
MAX_MESSAGE_ROUTER = 249
};

View File

@ -43,7 +43,9 @@ namespace
MESSAGE2,
MESSAGE3,
MESSAGE4,
MESSAGE5
MESSAGE5,
MESSAGE6,
UNKNOWN_MESSAGE
};
enum
@ -89,10 +91,19 @@ namespace
}
};
int call_order;
struct Message6 : public etl::message<MESSAGE6>
{
Message6()
{
}
};
constexpr uint16_t N_Routers = 4U;
constexpr uint16_t N_Messages = 5U;
struct UnknownMessage : public etl::message<UNKNOWN_MESSAGE>
{
UnknownMessage()
{
}
};
//***************************************************************************
// Broker
@ -118,9 +129,9 @@ namespace
};
//***************************************************************************
// Router that handles messages 1, 2, 3, 4.
// Router that handles messages 1, 2, 3, 4, 5.
//***************************************************************************
class Router : public etl::message_router<Router, Message1, Message2, Message3, Message4>
class Router : public etl::message_router<Router, Message1, Message2, Message3, Message4, Message5, Message6>
{
public:
@ -131,15 +142,14 @@ namespace
, message3_count(0)
, message4_count(0)
, message5_count(0)
, message6_count(0)
, message_unknown_count(0)
, order(0)
{
}
void on_receive(const Message1&)
{
++message1_count;
order = call_order++;
}
void on_receive(const Message2&)
@ -162,6 +172,11 @@ namespace
++message5_count;
}
void on_receive(const Message6&)
{
++message6_count;
}
void on_receive_unknown(const etl::imessage&)
{
++message_unknown_count;
@ -172,11 +187,10 @@ namespace
int message3_count;
int message4_count;
int message5_count;
int message6_count;
int message_unknown_count;
int order;
};
SUITE(test_message_broker)
{
//*************************************************************************
@ -187,9 +201,11 @@ namespace
Router router2(2);
Router router3(3);
broker.set_successor(router3);
CHECK(broker.empty());
std::array<etl::message_id_t, 3U> subscription1_message_list = { Message1::ID, Message2::ID, Message3::ID };
std::array<etl::message_id_t, 4U> subscription1_message_list = { Message1::ID, Message2::ID, Message3::ID, Message4::ID };
Broker::subscription subscription1(router1, subscription1_message_list.data(), subscription1_message_list.size());
etl::array<etl::message_id_t, 2U> subscription2_message_list = { Message1::ID, Message2::ID };
@ -198,6 +214,9 @@ namespace
etl::array<etl::message_id_t, 2U> subscription3_message_list = { Message1::ID, Message3::ID };
Broker::subscription subscription3(router2, subscription3_message_list);
std::array<etl::message_id_t, 2U> subscription4_message_list = { Message5::ID, Message6::ID };
Broker::subscription subscription4(router3, subscription4_message_list.data(), subscription4_message_list.size());
broker.subscribe(subscription1);
CHECK(!broker.empty());
@ -207,12 +226,53 @@ namespace
broker.subscribe(subscription3); // Duplicate router. Replace the old subscription.
CHECK(!broker.empty());
broker.subscribe(subscription1); // Do subscription1 again to see if it breaks.
CHECK(!broker.empty());
broker.receive(Message1());
broker.receive(Message2());
broker.receive(Message3());
broker.receive(Message4());
broker.receive(Message5());
broker.receive(Message6());
broker.receive(UnknownMessage());
CHECK_EQUAL(1, router1.message1_count);
CHECK_EQUAL(1, router2.message1_count);
CHECK_EQUAL(1, router3.message1_count);
CHECK_EQUAL(1, router1.message2_count);
CHECK_EQUAL(0, router2.message2_count);
CHECK_EQUAL(1, router3.message2_count);
CHECK_EQUAL(1, router1.message3_count);
CHECK_EQUAL(1, router2.message3_count);
CHECK_EQUAL(1, router3.message3_count);
CHECK_EQUAL(2, router1.message4_count);
CHECK_EQUAL(0, router2.message4_count);
CHECK_EQUAL(2, router3.message4_count);
// Message5 is translated to Message4 in 'broker'.
CHECK_EQUAL(0, router1.message5_count);
CHECK_EQUAL(0, router2.message5_count);
CHECK_EQUAL(0, router3.message5_count);
CHECK_EQUAL(0, router1.message6_count);
CHECK_EQUAL(0, router2.message6_count);
CHECK_EQUAL(1, router3.message6_count);
CHECK_EQUAL(0, router1.message_unknown_count);
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(1, router3.message_unknown_count);
broker.unsubscribe(router1);
CHECK(!broker.empty());
broker.unsubscribe(router2);
CHECK(broker.empty());
// There is no router3 subscription in 'broker'.
broker.unsubscribe(router3);
CHECK(broker.empty());
}

View File

@ -2437,6 +2437,7 @@
<ClInclude Include="..\..\include\etl\crc8_maxim.h" />
<ClInclude Include="..\..\include\etl\crc8_rohc.h" />
<ClInclude Include="..\..\include\etl\crc8_wcdma.h" />
<ClInclude Include="..\..\include\etl\message_broker.h" />
<ClInclude Include="..\..\include\etl\poly_span.h" />
<ClInclude Include="..\..\include\etl\private\diagnostic_pessimizing-move_push.h" />
<ClInclude Include="..\..\include\etl\private\diagnostic_pop.h" />
@ -11256,6 +11257,7 @@
<ClCompile Include="..\test_crc8_maxim.cpp" />
<ClCompile Include="..\test_crc8_rohc.cpp" />
<ClCompile Include="..\test_crc8_wcdma.cpp" />
<ClCompile Include="..\test_message_broker.cpp" />
<ClCompile Include="..\test_poly_span_dynamic_extent.cpp" />
<ClCompile Include="..\test_poly_span_fixed_extent.cpp" />
<ClCompile Include="..\test_pseudo_moving_average.cpp" />

View File

@ -1323,6 +1323,9 @@
<ClInclude Include="..\..\include\etl\private\diagnostic_pessimizing-move_push.h">
<Filter>ETL\Private</Filter>
</ClInclude>
<ClInclude Include="..\..\include\etl\message_broker.h">
<Filter>ETL\Messaging</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\test_string_char.cpp">
@ -3326,6 +3329,9 @@
<ClCompile Include="..\etl_error_handler\exceptions_and_log_errors\test_error_handler.cpp">
<Filter>Tests\Error Handler\Exceptions_And_Log_Errors</Filter>
</ClCompile>
<ClCompile Include="..\test_message_broker.cpp">
<Filter>Tests\Messaging</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<None Include="..\..\library.properties">