return s_opcodes.at(opcode);
}
+std::optional<uint8_t> Opcode::from_lowercase_string(const std::string_view& opcode_string)
+{
+ static const std::array<std::string, 6> s_opcodes = { "query", "iquery", "status", "3", "notify", "update" };
+ const auto* position = std::find(s_opcodes.begin(), s_opcodes.end(), opcode_string);
+ if (position == s_opcodes.end()) {
+ return std::nullopt;
+ }
+ return std::distance(s_opcodes.begin(), position);
+}
+
// goal is to hash based purely on the question name, and turn error into 'default'
uint32_t hashQuestion(const uint8_t* packet, uint16_t packet_len, uint32_t init, bool& wasOK)
{
public:
enum opcodes_ : uint8_t { Query=0, IQuery=1, Status=2, Notify=4, Update=5 };
static std::string to_s(uint8_t opcode);
+ static std::optional<uint8_t> from_lowercase_string(const std::string_view& opcode_string);
};
// This needs to be a signed type, so that serialization of UnknownDomainID
return qtype;
}
+static uint8_t strToOpcode(const std::string& context, const std::string& parameterName, const ::rust::String& opcode_rust_string)
+{
+ auto opcode_str = std::string(opcode_rust_string);
+ boost::to_lower(opcode_str);
+ auto opcode = Opcode::from_lowercase_string(opcode_str);
+ if (!opcode) {
+ return checkedConversionFromStr<uint8_t>(context, parameterName, opcode_rust_string);
+ }
+ return *opcode;
+}
+
static std::optional<std::string> loadContentFromConfigurationFile(const std::string& fileName)
{
/* no check on the file size, don't do this with just any file! */
return 'const std::string&'
if type_str == 'RCode':
return 'uint8_t'
+ if type_str == 'Opcode':
+ return 'uint8_t'
return type_str
def get_cpp_object_name(name, is_class=True):
field = f'convertSOAParams({field})'
elif ptype == 'RCode':
field = f'dnsdist::configuration::yaml::strToRCode("{struct_name}", "{name}", {field})'
+ elif ptype == 'Opcode':
+ field = f'dnsdist::configuration::yaml::strToOpcode("{struct_name}", "{name}", {field})'
output += field
return output
description: "Matches queries with opcode equals to ``code``"
parameters:
- name: "code"
- type: "u8"
+ type: "Opcode"
+ rust-type: "String"
description: "The opcode to match"
- name: "Or"
description: "Matches the traffic if one or more of the selectors Rules does match"
#define BOOST_TEST_NO_MAIN
+#include <boost/algorithm/string.hpp>
#include <boost/test/unit_test.hpp>
#include "dns.hh"
for (uint8_t idx = Opcode::Query; idx <= Opcode::Update; idx++) {
auto long_s = Opcode::to_s(idx);
BOOST_CHECK(long_s.size() > 0);
+ boost::to_lower(long_s);
+ auto opcode = Opcode::from_lowercase_string(long_s);
+ BOOST_CHECK(opcode);
+ BOOST_CHECK_EQUAL(*opcode, idx);
}
BOOST_CHECK_EQUAL(Opcode::to_s(Opcode::Update + 1), std::to_string(Opcode::Update + 1));
+ BOOST_CHECK(!Opcode::from_lowercase_string("notanopcode"));
}
BOOST_AUTO_TEST_CASE(test_resource_record_place)
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEqual(receivedResponse, expectedResponse)
+
+class TestYamlOpcode(DNSDistTest):
+
+ _yaml_config_template = """---
+binds:
+ - listen_address: "127.0.0.1:%d"
+ protocol: Do53
+
+backends:
+ - address: "127.0.0.1:%d"
+ protocol: Do53
+
+query_rules:
+ - name: "refuse queries from specific opcode"
+ selector:
+ type: "Opcode"
+ code: "NOTIFY"
+ action:
+ type: "RCode"
+ rcode: "Refused"
+"""
+ _yaml_config_params = ['_dnsDistPort', '_testServerPort']
+ _config_params = []
+
+ def testRefuseOpcodeNotify(self):
+ """
+ YAML: Refuse Opcode NOTIFY
+ """
+ name = 'opcodenotify.yaml.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.set_opcode(dns.opcode.NOTIFY)
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ def testAllowOpcodeUpdate(self):
+ """
+ YAML: Allow Opcode UPDATE
+ """
+ name = 'opcodeupdate.yaml.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'SOA', 'IN')
+ query.set_opcode(dns.opcode.UPDATE)
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)