Finished message_broker code

This commit is contained in:
John Wellbelove 2022-08-01 13:00:20 +01:00
parent 40290c4fa2
commit 838cdc24af
2 changed files with 285 additions and 160 deletions

View File

@ -66,13 +66,13 @@ namespace etl
}
//*******************************
void next(subscription_node* sub)
void set_next(subscription_node* sub)
{
p_next = sub;
}
//*******************************
subscription_node* next() const
subscription_node* get_next() const
{
return p_next;
}
@ -80,7 +80,7 @@ namespace etl
//*******************************
void terminate()
{
next(ETL_NULLPTR);
set_next(ETL_NULLPTR);
}
//*******************************
@ -88,24 +88,18 @@ namespace etl
{
if (sub != ETL_NULLPTR)
{
sub->next(next());
sub->set_next(get_next());
}
next(sub);
}
//*******************************
void append(subscription_node& sub)
{
append(&sub);
set_next(sub);
}
subscription_node* p_next;
};
typedef etl::span<const etl::message_id_t> message_id_span_t;
public:
typedef etl::span<const etl::message_id_t> message_id_span_t;
//*******************************************
class subscription : public subscription_node
{
@ -114,72 +108,29 @@ namespace etl
friend class message_broker;
//*******************************
subscription(etl::imessage_router& router_,
etl::message_id_t* p_message_ids_,
size_t n_messages_)
subscription(etl::imessage_router& router_)
: p_router(&router_)
, message_ids(p_message_ids_, p_message_ids_ + n_messages_)
{
}
//*******************************
subscription(etl::imessage_router& router_,
etl::message_id_t* first,
etl::message_id_t* last)
: p_router(&router_)
, message_ids(first, last)
{
}
//*******************************
template <size_t NMessages>
subscription(etl::imessage_router& router_,
const etl::array<etl::message_id_t, NMessages>& message_ids_)
: p_router(&router_)
, message_ids(message_ids_.data(), message_ids_.data() + NMessages)
{
}
#if ETL_USING_STL && ETL_USING_CPP11
//*******************************
template <size_t NMessages>
subscription(etl::imessage_router& router_,
const std::array<etl::message_id_t, NMessages>& message_ids_)
: p_router(&router_)
, message_ids(message_ids_.data(), message_ids_.data() + NMessages)
{
}
#endif
private:
//*******************************
etl::imessage_router* router() const
virtual message_id_span_t message_id_list() const = 0;
//*******************************
etl::imessage_router* get_router() const
{
return p_router;
}
//*******************************
message_id_span_t message_id_list() const
{
return message_ids;
}
//*******************************
subscription* next_subscription() const
{
return static_cast<subscription*>(next());
return static_cast<subscription*>(get_next());
}
////*******************************
//subscription()
// : p_router(ETL_NULLPTR)
// , message_ids()
//{
//}
etl::imessage_router* const p_router;
message_id_span_t message_ids;
};
using etl::imessage_router::receive;
@ -202,30 +153,12 @@ namespace etl
{
}
//*******************************************
/// Constructor.
//*******************************************
message_broker(etl::message_broker::subscription & subscription_list_)
: imessage_router(etl::imessage_router::MESSAGE_BROKER)
, head()
{
}
//*******************************************
/// Constructor.
//*******************************************
message_broker(etl::message_router_id_t id_, etl::message_broker::subscription& subscription_list_)
: imessage_router(id_)
, head()
{
}
//*******************************************
/// Subscribe to the broker.
//*******************************************
void subscribe(etl::message_broker::subscription& new_sub)
{
initialise_insertion_point(new_sub.router(), &new_sub);
initialise_insertion_point(new_sub.get_router(), &new_sub);
}
//*******************************************
@ -242,7 +175,7 @@ namespace etl
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
subscription* sub = static_cast<subscription*>(head.get_next());
while (sub != ETL_NULLPTR)
{
@ -252,7 +185,7 @@ namespace etl
if (itr != message_ids.end())
{
sub->router()->receive(msg);
sub->get_router()->receive(msg);
}
sub = sub->next_subscription();
@ -276,7 +209,7 @@ namespace etl
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
subscription* sub = static_cast<subscription*>(head.get_next());
while (sub != ETL_NULLPTR)
{
@ -286,7 +219,7 @@ namespace etl
if (itr != message_ids.end())
{
sub->router()->receive(shared_msg);
sub->get_router()->receive(shared_msg);
}
sub = sub->next_subscription();
@ -337,7 +270,7 @@ namespace etl
//********************************************
bool empty() const
{
return head.next() == ETL_NULLPTR;
return head.get_next() == ETL_NULLPTR;
}
private:
@ -347,25 +280,25 @@ namespace etl
{
const etl::imessage_router* p_target_router = p_router;
subscription_node* p_sub = head.next();
subscription_node* p_sub = head.get_next();
subscription_node* p_sub_previous = &head;
while (p_sub != ETL_NULLPTR)
{
// Do we already have a subscription for the router?
if (static_cast<subscription*>(p_sub)->router() == p_target_router)
if (static_cast<subscription*>(p_sub)->get_router() == p_target_router)
{
// Then unlink it.
p_sub_previous->next(p_sub->next()); // Jump over the subscription.
p_sub->terminate(); // Terminate the unlinked subscription.
p_sub_previous->set_next(p_sub->get_next()); // Jump over the subscription.
p_sub->terminate(); // Terminate the unlinked subscription.
// We're done now.
break;
}
// Move on up the list.
p_sub = p_sub->next();
p_sub_previous = p_sub_previous->next();
p_sub = p_sub->get_next();
p_sub_previous = p_sub_previous->get_next();
}
if (p_new_sub != ETL_NULLPTR)
@ -375,54 +308,26 @@ namespace etl
}
}
//*******************************************
/// Check if the message id is in the message list.
//*******************************************
bool is_in_subscription_list(etl::message_id_t id) const
{
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.next());
while (sub != ETL_NULLPTR)
{
message_id_span_t message_ids = sub->message_id_list();
message_id_span_t::const_iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
if (itr != message_ids.end())
{
return true;
}
sub = sub->next_subscription();
}
}
return false;
}
subscription_node head;
};
//***************************************************************************
/// Send a message to a broker.
//***************************************************************************
//static inline void send_message(etl::imessage_broker& broker,
// const etl::imessage& message)
//{
// broker.receive(message);
//}
static inline void send_message(etl::message_broker& broker,
const etl::imessage& message)
{
broker.receive(message);
}
////***************************************************************************
///// Send a shared message to a broker.
////***************************************************************************
//static inline void send_message(etl::imessage_broker& broker,
// etl::shared_message message)
//{
// broker.receive(message);
//}
//***************************************************************************
/// Send a shared message to a broker.
//***************************************************************************
static inline void send_message(etl::message_broker& broker,
etl::shared_message message)
{
broker.receive(message);
}
}
#endif

View File

@ -31,6 +31,7 @@ SOFTWARE.
#include "etl/message_broker.h"
#include <array>
#include <vector>
//***************************************************************************
// The set of messages.
@ -70,11 +71,8 @@ namespace
struct Message3 : public etl::message<MESSAGE3>
{
Message3()
: value()
{
}
int value[10];
};
struct Message4 : public etl::message<MESSAGE4>
@ -112,6 +110,16 @@ namespace
{
public:
Broker()
: message_broker()
{
}
Broker(etl::message_router_id_t id)
: message_broker(id)
{
}
using etl::message_broker::receive;
// Hook incoming messages and translate Message5 to Message4.
@ -147,6 +155,17 @@ namespace
{
}
void clear()
{
message1_count = 0;
message2_count = 0;
message3_count = 0;
message4_count = 0;
message5_count = 0;
message6_count = 0;
message_unknown_count = 0;
}
void on_receive(const Message1&)
{
++message1_count;
@ -191,43 +210,196 @@ namespace
int message_unknown_count;
};
//*************************************************************************
class Subscription : public etl::message_broker::subscription
{
public:
Subscription(etl::imessage_router& router, std::initializer_list<etl::message_id_t> init)
: etl::message_broker::subscription(router)
, id_list(init)
{
}
virtual etl::message_broker::message_id_span_t message_id_list() const
{
return etl::message_broker::message_id_span_t(id_list.begin(), id_list.end());
}
std::vector<etl::message_id_t> id_list;
};
SUITE(test_message_broker)
{
//*************************************************************************
TEST(message_bus_subscribe_unsubscribe)
TEST(message_check_broker_id)
{
Broker broker1;
Broker broker2(2);
CHECK_EQUAL(etl::imessage_router::MESSAGE_BROKER, broker1.get_message_router_id());
CHECK_EQUAL(2, broker2.get_message_router_id());
CHECK(broker1.is_consumer());
CHECK(broker1.is_producer());
CHECK(!broker1.is_null_router());
}
//*************************************************************************
TEST(message_broker_subscribe_then_unsubscribe)
{
Router router1(1);
Router router2(2);
Router router3(3);
Subscription subscription1{ router1, { Message1::ID, Message2::ID, Message3::ID, Message4::ID } };
Subscription subscription3{ router2, { Message1::ID, Message3::ID } };
Broker broker;
CHECK(broker.empty());
CHECK(broker.accepts(MESSAGE1));
CHECK(broker.accepts(MESSAGE2));
CHECK(broker.accepts(MESSAGE3));
CHECK(broker.accepts(MESSAGE4));
CHECK(broker.accepts(MESSAGE5));
CHECK(broker.accepts(MESSAGE6));
broker.subscribe(subscription1);
CHECK(!broker.empty());
broker.subscribe(subscription3);
CHECK(!broker.empty());
broker.unsubscribe(router1);
CHECK(!broker.empty());
broker.unsubscribe(router2);
CHECK(broker.empty());
// There is no router3 subscription in 'broker'.
broker.unsubscribe(router3);
CHECK(broker.empty());
}
//*************************************************************************
TEST(message_broker_subscribe_then_clear)
{
Router router1(1);
Router router2(2);
Router router3(3);
Subscription subscription1{ router1, { Message1::ID, Message2::ID, Message3::ID, Message4::ID } };
Subscription subscription3{ router2, { Message1::ID, Message3::ID } };
Broker broker;
broker.subscribe(subscription1);
broker.subscribe(subscription3); // Duplicate router. Replace the old subscription.
broker.unsubscribe(router1);
broker.clear();
CHECK(broker.empty());
broker.receive(Message1());
broker.receive(Message2());
broker.receive(Message3());
broker.receive(Message4());
broker.receive(Message5());
broker.receive(Message6());
broker.receive(UnknownMessage());
CHECK_EQUAL(0, router1.message1_count);
CHECK_EQUAL(0, router2.message1_count);
CHECK_EQUAL(0, router3.message1_count);
CHECK_EQUAL(0, router1.message2_count);
CHECK_EQUAL(0, router2.message2_count);
CHECK_EQUAL(0, router3.message2_count);
CHECK_EQUAL(0, router1.message3_count);
CHECK_EQUAL(0, router2.message3_count);
CHECK_EQUAL(0, router3.message3_count);
CHECK_EQUAL(0, router1.message4_count);
CHECK_EQUAL(0, router2.message4_count);
CHECK_EQUAL(0, router3.message4_count);
CHECK_EQUAL(0, router1.message5_count);
CHECK_EQUAL(0, router2.message5_count);
CHECK_EQUAL(0, router3.message5_count);
CHECK_EQUAL(0, router1.message6_count);
CHECK_EQUAL(0, router2.message6_count);
CHECK_EQUAL(0, router3.message6_count);
CHECK_EQUAL(0, router1.message_unknown_count);
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(0, router3.message_unknown_count);
}
//*************************************************************************
TEST(message_broker_send_messages_to_broker_with_no_subscribers)
{
Broker broker;
Router router1(1);
Router router2(2);
Router router3(3);
broker.set_successor(router3);
broker.receive(Message1());
broker.receive(Message2());
broker.receive(Message3());
broker.receive(Message4());
broker.receive(Message5());
broker.receive(Message6());
broker.receive(UnknownMessage());
CHECK(broker.empty());
CHECK_EQUAL(0, router1.message1_count);
CHECK_EQUAL(0, router2.message1_count);
CHECK_EQUAL(0, router3.message1_count);
std::array<etl::message_id_t, 4U> subscription1_message_list = { Message1::ID, Message2::ID, Message3::ID, Message4::ID };
Broker::subscription subscription1(router1, subscription1_message_list.data(), subscription1_message_list.size());
CHECK_EQUAL(0, router1.message2_count);
CHECK_EQUAL(0, router2.message2_count);
CHECK_EQUAL(0, router3.message2_count);
etl::array<etl::message_id_t, 2U> subscription2_message_list = { Message1::ID, Message2::ID };
Broker::subscription subscription2(router2, subscription2_message_list);
CHECK_EQUAL(0, router1.message3_count);
CHECK_EQUAL(0, router2.message3_count);
CHECK_EQUAL(0, router3.message3_count);
etl::array<etl::message_id_t, 2U> subscription3_message_list = { Message1::ID, Message3::ID };
Broker::subscription subscription3(router2, subscription3_message_list);
CHECK_EQUAL(0, router1.message4_count);
CHECK_EQUAL(0, router2.message4_count);
CHECK_EQUAL(0, router3.message4_count);
std::array<etl::message_id_t, 2U> subscription4_message_list = { Message5::ID, Message6::ID };
Broker::subscription subscription4(router3, subscription4_message_list.data(), subscription4_message_list.size());
CHECK_EQUAL(0, router1.message5_count);
CHECK_EQUAL(0, router2.message5_count);
CHECK_EQUAL(0, router3.message5_count);
CHECK_EQUAL(0, router1.message6_count);
CHECK_EQUAL(0, router2.message6_count);
CHECK_EQUAL(0, router3.message6_count);
CHECK_EQUAL(0, router1.message_unknown_count);
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(0, router3.message_unknown_count);
}
//*************************************************************************
TEST(message_broker_send_messages_to_subscribers)
{
Broker broker;
Router router1(1);
Router router2(2);
Router router3(3);
Subscription subscription1{ router1, { Message1::ID, Message2::ID, Message3::ID, Message4::ID } };
Subscription subscription2{ router2, { Message1::ID, Message2::ID } };
Subscription subscription3{ router2, { Message1::ID, Message3::ID } };
broker.subscribe(subscription1);
CHECK(!broker.empty());
broker.subscribe(subscription2);
CHECK(!broker.empty());
broker.subscribe(subscription3); // Duplicate router. Replace the old subscription.
CHECK(!broker.empty());
broker.subscribe(subscription1); // Do subscription1 again to see if it breaks.
CHECK(!broker.empty());
broker.set_successor(router3);
broker.receive(Message1());
broker.receive(Message2());
@ -265,16 +437,64 @@ namespace
CHECK_EQUAL(0, router1.message_unknown_count);
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(1, router3.message_unknown_count);
}
broker.unsubscribe(router1);
CHECK(!broker.empty());
//*************************************************************************
TEST(message_broker_send_messages_to_subscribers_after_unsubscribe)
{
Broker broker;
Router router1(1);
Router router2(2);
Router router3(3);
Subscription subscription1{ router1, { Message1::ID, Message2::ID, Message3::ID, Message4::ID } };
Subscription subscription3{ router2, { Message1::ID, Message3::ID } };
broker.subscribe(subscription1);
broker.subscribe(subscription3);
broker.set_successor(router3);
broker.unsubscribe(router2);
CHECK(broker.empty());
// There is no router3 subscription in 'broker'.
broker.unsubscribe(router3);
CHECK(broker.empty());
broker.receive(Message1());
broker.receive(Message2());
broker.receive(Message3());
broker.receive(Message4());
broker.receive(Message5());
broker.receive(Message6());
broker.receive(UnknownMessage());
CHECK(broker.has_successor());
CHECK_EQUAL(1, router1.message1_count);
CHECK_EQUAL(0, router2.message1_count);
CHECK_EQUAL(1, router3.message1_count);
CHECK_EQUAL(1, router1.message2_count);
CHECK_EQUAL(0, router2.message2_count);
CHECK_EQUAL(1, router3.message2_count);
CHECK_EQUAL(1, router1.message3_count);
CHECK_EQUAL(0, router2.message3_count);
CHECK_EQUAL(1, router3.message3_count);
CHECK_EQUAL(2, router1.message4_count);
CHECK_EQUAL(0, router2.message4_count);
CHECK_EQUAL(2, router3.message4_count);
// Message5 is translated to Message4 in 'broker'.
CHECK_EQUAL(0, router1.message5_count);
CHECK_EQUAL(0, router2.message5_count);
CHECK_EQUAL(0, router3.message5_count);
CHECK_EQUAL(0, router1.message6_count);
CHECK_EQUAL(0, router2.message6_count);
CHECK_EQUAL(1, router3.message6_count);
CHECK_EQUAL(0, router1.message_unknown_count);
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(1, router3.message_unknown_count);
}
};
}