return flow;
}
-static Flow* consume_ha_message(HAMessage& msg, Packet* p = nullptr)
+static Flow* consume_ha_message(HAMessage& msg,
+ FlowKey* packet_key = nullptr, Packet* p = nullptr)
{
ha_stats.msgs_recv++;
if (read_flow_key(msg, hdr, key) == 0)
return nullptr;
+ if (packet_key and !FlowKey::is_equal(packet_key, &key, 0))
+ {
+ ha_stats.key_mismatch++;
+ return nullptr;
+ }
+
Flow* flow = nullptr;
switch (hdr->event)
{
if (p.daq_instance->ioctl(DIOCTL_GET_FLOW_HA_STATE, &fhs, sizeof(fhs)) == DAQ_SUCCESS)
{
HAMessage ha_msg(fhs.data, fhs.length);
- flow = consume_ha_message(ha_msg, &p);
+ flow = consume_ha_message(ha_msg, &key, &p);
ha_stats.daq_imports++;
// Validate that the imported flow matches up with the given flow key.
if (flow)
{
- if (FlowKey::is_equal(&key, flow->key, 0))
+ if (flow->flow_state == Flow::FlowState::BLOCK
+ or flow->flow_state == Flow::FlowState::RESET)
{
- if (flow->flow_state == Flow::FlowState::BLOCK
- or flow->flow_state == Flow::FlowState::RESET)
- {
- flow->disable_inspection();
- p.disable_inspect = true;
- }
-
- // Clear the standby bit so that we don't immediately trigger a new data store
- // FIXIT-L streamline the consume process so this doesn't have to be done here
- flow->ha_state->clear(FlowHAState::STANDBY);
+ flow->disable_inspection();
+ p.disable_inspect = true;
}
- else
- flow = nullptr;
+ // Clear the standby bit so that we don't immediately trigger a new data store
+ // FIXIT-L streamline the consume process so this doesn't have to be done here
+ flow->ha_state->clear(FlowHAState::STANDBY);
}
}
}
{ CountType::SUM, "delete_msgs_consumed", "deletion messages consumed" },
{ CountType::SUM, "daq_stores", "states stored via daq" },
{ CountType::SUM, "daq_imports", "states imported via daq" },
+ { CountType::SUM, "key_mismatch", "messages received with a flow key mismatch" },
{ CountType::SUM, "msg_version_mismatch", "messages received with a version mismatch" },
{ CountType::SUM, "msg_length_mismatch", "messages received with an inconsistent total length" },
{ CountType::SUM, "truncated_msgs", "truncated messages received" },
PegCount delete_msgs_consumed;
PegCount daq_stores;
PegCount daq_imports;
+ PegCount key_mismatch;
PegCount msg_version_mismatch;
PegCount msg_length_mismatch;
PegCount truncated_msgs;
CHECK(ha_stats.client_consume_errors == 1);
}
+TEST(high_availability_test, consume_error_key_mismatch)
+{
+ HAMessageHeader hdr[10] = { 0, HA_MESSAGE_VERSION, 0x32, KEY_TYPE_IP4 };
+ HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
+
+ FlowKey packet_key;
+ FlowKey* key = &packet_key;
+ CHECK(consume_ha_message(msg, key, &s_pkt) == nullptr);
+ CHECK(ha_stats.key_mismatch == 1);
+}
+
TEST(high_availability_test, consume_error_truncated_msg_hdr)
{
HAMessageHeader hdr = { };
HAMessage msg((uint8_t*) &hdr, sizeof(hdr) / 2);
- CHECK(consume_ha_message(msg, &s_pkt) == nullptr);
+ FlowKey* key = nullptr;
+ CHECK(consume_ha_message(msg, key, &s_pkt) == nullptr);
CHECK(ha_stats.truncated_msgs == 1);
}
HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION + 1, 0, 0 };
HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
- CHECK(consume_ha_message(msg, &s_pkt) == nullptr);
+ FlowKey* key = nullptr;
+ CHECK(consume_ha_message(msg, key, &s_pkt) == nullptr);
CHECK(ha_stats.msg_version_mismatch == 1);
}
HAMessageHeader hdr = { 0, HA_MESSAGE_VERSION, 0x42, 0 };
HAMessage msg((uint8_t*) &hdr, sizeof(hdr));
- CHECK(consume_ha_message(msg, &s_pkt) == nullptr);
+ FlowKey* key = nullptr;
+ CHECK(consume_ha_message(msg, key, &s_pkt) == nullptr);
CHECK(ha_stats.msg_length_mismatch == 1);
}