from . import check
from . import instance
from . import query
+from . import name
from . import rndc
from . import run
from . import log
def is_executable(cmd: str, errmsg: str) -> None:
executable = shutil.which(cmd)
assert executable is not None, errmsg
+
+
+def nxdomain(message: dns.message.Message) -> None:
+ rcode(message, dns.rcode.NXDOMAIN)
+
+
+def single_question(message: dns.message.Message) -> None:
+ assert len(message.question) == 1, str(message)
+
+
+def empty_answer(message: dns.message.Message) -> None:
+ assert not message.answer, str(message)
+
+
+def is_response_to(response: dns.message.Message, query: dns.message.Message) -> None:
+ single_question(response)
+ single_question(query)
+ assert query.is_response(response), str(response)
--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+import dns.name
+
+
+def prepend_label(label: str, name: dns.name.Name) -> dns.name.Name:
+ return dns.name.Name((label,) + name.labels)
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
+ timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
- return dns.query.udp(message, ip, QUERY_TIMEOUT, port=port, source=source)
+ return dns.query.udp(message, ip, timeout, port=port, source=source)
def tcp(
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
+ timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
- return dns.query.tcp(message, ip, QUERY_TIMEOUT, port=port, source=source)
+ return dns.query.tcp(message, ip, timeout, port=port, source=source)