Modified accepts() message router functions to interrogate subscribed and successor routers to achieve consistency.

This commit is contained in:
John Wellbelove 2024-03-15 13:01:59 +00:00
parent 80613827c9
commit cb2700e75d
4 changed files with 187 additions and 28 deletions

View File

@ -274,11 +274,48 @@ namespace etl
using imessage_router::accepts;
//*******************************************
/// Message brokers accept all messages.
/// Message brokers accept messages determined
/// by the subscribed routers.
//*******************************************
virtual bool accepts(etl::message_id_t) const ETL_OVERRIDE
virtual bool accepts(etl::message_id_t id) const ETL_OVERRIDE
{
return true;
if (!empty())
{
// Scan the subscription lists.
subscription* sub = static_cast<subscription*>(head.get_next());
while (sub != ETL_NULLPTR)
{
message_id_span_t message_ids = sub->message_id_list();
message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
if (itr != message_ids.end())
{
etl::imessage_router* router = sub->get_router();
if (router->accepts(id))
{
return true;
}
}
sub = sub->next_subscription();
}
}
// Check any successor.
if (has_successor())
{
if (get_successor().accepts(id))
{
return true;
}
}
return false;
//return true;
}
//*******************************************

View File

@ -315,11 +315,35 @@ namespace etl
//*******************************************
/// Does this message bus accept the message id?
/// Yes!, it accepts everything!
/// Returns <b>true</b> on the first router that does.
//*******************************************
bool accepts(etl::message_id_t) const ETL_OVERRIDE
bool accepts(etl::message_id_t id) const ETL_OVERRIDE
{
return true;
// Check the list of subscribed routers.
router_list_t::iterator irouter = router_list.begin();
while (irouter != router_list.end())
{
etl::imessage_router& router = **irouter;
if (router.accepts(id))
{
return true;
}
++irouter;
}
// Check any successor.
if (has_successor())
{
if (get_successor().accepts(id))
{
return true;
}
}
return false;
}
//*******************************************

View File

@ -232,7 +232,7 @@ namespace
SUITE(test_message_broker)
{
//*************************************************************************
TEST(message_check_broker_id)
TEST(test_message_check_broker_id)
{
Broker broker1;
Broker broker2(2);
@ -245,7 +245,7 @@ namespace
}
//*************************************************************************
TEST(message_broker_subscribe_then_unsubscribe)
TEST(test_message_broker_subscribe_then_unsubscribe)
{
Router router1(1);
Router router2(2);
@ -255,33 +255,27 @@ namespace
Subscription subscription3{ router2, { Message1::ID, Message3::ID } };
Broker broker;
CHECK(broker.empty());
CHECK(broker.accepts(MESSAGE1));
CHECK(broker.accepts(MESSAGE2));
CHECK(broker.accepts(MESSAGE3));
CHECK(broker.accepts(MESSAGE4));
CHECK(broker.accepts(MESSAGE5));
CHECK(broker.accepts(MESSAGE6));
CHECK_TRUE(broker.empty());
broker.subscribe(subscription1);
CHECK(!broker.empty());
CHECK_FALSE(broker.empty());
broker.subscribe(subscription3);
CHECK(!broker.empty());
CHECK_FALSE(broker.empty());
broker.unsubscribe(router1);
CHECK(!broker.empty());
CHECK_FALSE(broker.empty());
broker.unsubscribe(router2);
CHECK(broker.empty());
CHECK_TRUE(broker.empty());
// There is no router3 subscription in 'broker'.
broker.unsubscribe(router3);
CHECK(broker.empty());
CHECK_TRUE(broker.empty());
}
//*************************************************************************
TEST(message_broker_subscribe_then_clear)
TEST(test_message_broker_subscribe_then_clear)
{
Router router1(1);
Router router2(2);
@ -337,7 +331,7 @@ namespace
}
//*************************************************************************
TEST(message_broker_send_messages_to_broker_with_no_subscribers)
TEST(test_message_broker_send_messages_to_broker_with_no_subscribers)
{
Broker broker;
Router router1(1);
@ -382,7 +376,7 @@ namespace
}
//*************************************************************************
TEST(message_broker_send_messages_to_subscribers)
TEST(test_message_broker_send_messages_to_subscribers)
{
Broker broker;
Router router1(1);
@ -439,7 +433,7 @@ namespace
}
//*************************************************************************
TEST(message_broker_send_messages_to_specific_subscribers)
TEST(test_message_broker_send_messages_to_specific_subscribers)
{
Broker broker;
Router router1(1);
@ -460,7 +454,7 @@ namespace
}
//*************************************************************************
TEST(message_broker_send_messages_to_subscribers_after_unsubscribe)
TEST(test_message_broker_send_messages_to_subscribers_after_unsubscribe)
{
Broker broker;
Router router1(1);
@ -516,5 +510,53 @@ namespace
CHECK_EQUAL(0, router2.message_unknown_count);
CHECK_EQUAL(1, router3.message_unknown_count);
}
//*************************************************************************
TEST(test_message_broker_accepts)
{
Router router1(1);
Router router2(2);
Router router3(3);
Subscription subscription1{ router1, { Message1::ID, Message3::ID } };
Subscription subscription2{ router2, { Message1::ID, Message2::ID, Message3::ID, Message4::ID } };
// Default constructed broker.
Broker broker;
CHECK_FALSE(broker.accepts(MESSAGE1));
CHECK_FALSE(broker.accepts(MESSAGE2));
CHECK_FALSE(broker.accepts(MESSAGE3));
CHECK_FALSE(broker.accepts(MESSAGE4));
CHECK_FALSE(broker.accepts(MESSAGE5));
CHECK_FALSE(broker.accepts(MESSAGE6));
// Subscribe router1.
broker.subscribe(subscription1);
CHECK_TRUE(broker.accepts(MESSAGE1));
CHECK_FALSE(broker.accepts(MESSAGE2));
CHECK_TRUE(broker.accepts(MESSAGE3));
CHECK_FALSE(broker.accepts(MESSAGE4));
CHECK_FALSE(broker.accepts(MESSAGE5));
CHECK_FALSE(broker.accepts(MESSAGE6));
// Subscribe router2.
broker.subscribe(subscription2);
CHECK_TRUE(broker.accepts(MESSAGE1));
CHECK_TRUE(broker.accepts(MESSAGE2));
CHECK_TRUE(broker.accepts(MESSAGE3));
CHECK_TRUE(broker.accepts(MESSAGE4));
CHECK_FALSE(broker.accepts(MESSAGE5));
CHECK_FALSE(broker.accepts(MESSAGE6));
// Set router3 as a successor.
broker.set_successor(router3);
CHECK_TRUE(broker.accepts(MESSAGE1));
CHECK_TRUE(broker.accepts(MESSAGE2));
CHECK_TRUE(broker.accepts(MESSAGE3));
CHECK_TRUE(broker.accepts(MESSAGE4));
CHECK_TRUE(broker.accepts(MESSAGE5));
CHECK_TRUE(broker.accepts(MESSAGE6));
}
};
}

View File

@ -45,7 +45,9 @@ namespace
MESSAGE2,
MESSAGE3,
MESSAGE4,
MESSAGE5
MESSAGE5,
MESSAGE6,
MESSAGE7
};
enum
@ -105,6 +107,14 @@ namespace
Response response;
struct Message6 : public etl::message<MESSAGE6>
{
};
struct Message7 : public etl::message<MESSAGE7>
{
};
int call_order;
//***************************************************************************
@ -224,6 +234,27 @@ namespace
int message_unknown_count;
};
//***************************************************************************
// Router that handles message 6 and returns nothing.
//***************************************************************************
class RouterC : public etl::message_router<RouterC, Message6>
{
public:
RouterC(etl::message_router_id_t id)
: message_router(id)
{
}
void on_receive(const Message6&)
{
}
void on_receive_unknown(const etl::imessage&)
{
}
};
//***************************************************************************
template <size_t Size>
class MessageBus : public etl::message_bus<Size>
@ -237,7 +268,7 @@ namespace
using etl::message_bus<Size>::receive;
// Hook 'receive' to count the incomimg messages.
// Hook 'receive' to count the incoming messages.
void receive(etl::message_router_id_t id, const etl::imessage& msg)
{
++message_count;
@ -951,5 +982,30 @@ namespace
CHECK_EQUAL(1, bus.message_count);
}
//*************************************************************************
TEST(message_bus_accepts)
{
MessageBus<2> bus1;
MessageBus<2> bus2;
RouterA router1(ROUTER1);
RouterB router2(ROUTER2);
RouterC router3(ROUTER3);
bus1.subscribe(router1);
bus1.subscribe(router2);
bus1.set_successor(bus2);
bus2.subscribe(router3);
CHECK_TRUE(bus1.accepts(MESSAGE1));
CHECK_TRUE(bus1.accepts(MESSAGE2));
CHECK_TRUE(bus1.accepts(MESSAGE3));
CHECK_TRUE(bus1.accepts(MESSAGE4));
CHECK_TRUE(bus1.accepts(MESSAGE5));
CHECK_TRUE(bus1.accepts(MESSAGE6));
CHECK_FALSE(bus1.accepts(MESSAGE7));
}
};
}