Generated by black 26.1.0 which got updated in CI.
from bailiwick_ans import ResponseSpoofer, spoofing_server
-
ATTACKER_IP = "10.53.0.3"
TTL = 3600
from bailiwick_ans import ResponseSpoofer, spoofing_server
-
ATTACKER_IP = "10.53.0.3"
TTL = 3600
import dns.rdataclass
import dns.rdatatype
-
pytestmark = [
pytest.mark.skipif(
sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
import isctest
import isctest.mark
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/example*.db",
import isctest
from isctest.vars.build import SYSTEM_TEST_DIR_GIT_PATH
-
# Silence warnings caused by passing a pytest fixture to another fixture.
# pylint: disable=redefined-outer-name
import isctest
from isctest.util import param
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
from isctest.kasp import SettimeOptions
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
import isctest.mark
from isctest.util import param
-
pytestmark = pytest.mark.extra_artifacts(
[
"*/K*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/trusted.conf",
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
import isctest
-
ARTIFACTS = [
"conf/*.conf",
"ns*/trusted.conf",
prime_cache,
)
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
prime_cache,
)
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
prime_cache,
)
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
prime_cache,
)
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
from filters.common import ARTIFACTS
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
from filters.common import ARTIFACTS
-
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns1/*",
import pytest
import isctest
-
pytestmark = pytest.mark.extra_artifacts(["conf/*.conf"])
from . import log
from .vars import ALL, init_vars
-
if __name__ == "__main__":
# use root logger as fallback - we're not interested in proper logs here
log.basic.LOGGERS["conftest"] = logging.getLogger()
import dns.version
import dns.zone
-
_UdpHandler = Callable[
[bytes, Tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
]
import textwrap
from typing import Dict, Optional
-
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
LOG_INDENT = 4
from isctest.text import compile_pattern, FlexPattern, LineReader
-
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
from re import compile as Re
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
-
FlexPattern = Union[str, Pattern]
from pathlib import Path
from typing import Dict
-
SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system"
from .basic import BASIC_VARS
-
FEATURES = {
"DNSTAP": "--enable-dnstap",
"EXTENDED_DS_DIGEST": "--extended-ds-digest",
from .. import log
-
OPENSSL_VARS = {
"OPENSSL_CONF": os.getenv("OPENSSL_CONF", None),
"SOFTHSM2_CONF": os.getenv("SOFTHSM2_CONF", None),
import isctest
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
["ns2/named.stats"],
)
import isctest.mark
-
pytestmark = [
isctest.mark.softhsm2_environment,
pytest.mark.extra_artifacts(
check_nsec3_case,
)
-
# include the following zones when rendering named configs
ZONES = {
"nsec3-change.kasp",
check_nsec3_case,
)
-
# include the following zones when rendering named configs
ZONES = {
"nsec-to-nsec3.kasp",
check_nsec3_case,
)
-
# include the following zones when rendering named configs
ZONES = {
"nsec3-to-nsec.kasp",
check_nsec3_case,
)
-
# include the following zones when rendering named configs
ZONES = {
"nsec3-fails-to-load.kasp",
check_nsec3param,
)
-
# include the following zones when rendering named configs
ZONES = {
"nsec3.kasp",
import dns.rdataclass
import dns.rdatatype
-
pytestmark = [
pytest.mark.skipif(
sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
while True:
try:
- (clientsock, _) = sock.accept()
+ clientsock, _ = sock.accept()
log("Accepted connection from %s" % clientsock)
thread = TCPDelayer(clientsock, serverip, port)
thread.start()
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/named.pid",
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
".digrc",
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns2/nil.db",
configure_ksk_3crowd,
)
-
CDSS = ["CDS (SHA-256)"]
POLICY = "ksk-doubleksk-autosign"
OFFSET1 = -int(timedelta(days=60).total_seconds())
import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns3/*-rpz-external.local.db",
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out.*",
Generate insane test zone and check expected output of ZoneAnalyzer utility class
"""
-
import collections
import itertools
from pathlib import Path
import isctest
import isctest.name
-
# set of properies present in the tested zone - read by tests_zone_analyzer.py
CATEGORIES = frozenset(
[
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"curl.out.*",
import isctest
-
# ISO datetime format without msec
fmt = "%Y-%m-%dT%H:%M:%SZ"
zones = fetch_zones(statsip, statsport)
for zone in zones:
- (name, loaded, expires, refresh) = load_timers(zone, True)
+ name, loaded, expires, refresh = load_timers(zone, True)
mtime = zone_mtime(zonedir, name)
check_zone_timers(loaded, expires, refresh, mtime)
zones = fetch_zones(statsip, statsport)
again = False
for zone in zones:
- (name, loaded, expires, refresh) = load_timers(zone, False)
+ name, loaded, expires, refresh = load_timers(zone, False)
mtime = zone_mtime(zonedir, name)
if (mtime != dayzero) or (tries == 0):
# mtime was either retrieved successfully or no tries were
pytest.register_assert_rewrite("generic")
import generic
-
pytestmark = [
isctest.mark.with_json_c,
pytest.mark.extra_artifacts(
pytest.register_assert_rewrite("generic")
import generic
-
pytestmark = [
isctest.mark.with_libxml2,
pytest.mark.extra_artifacts(
import sys
import time
-
# Timeout for establishing all connections requested by a single 'open' command.
OPEN_TIMEOUT = 2
VERSION_QUERY = b"\x00\x1e\xaf\xb8\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07version\x04bind\x00\x00\x10\x00\x03"
ctlsock.listen(1)
while True:
- (clientsock, _) = ctlsock.accept()
+ clientsock, _ = ctlsock.accept()
log("Accepted control connection from %s" % clientsock)
cmdline = clientsock.recv(512).decode("ascii").strip()
if cmdline:
dns.query.send_tcp(sock, msg, timeout())
# Receive the initial DNS message with SOA
- (response, _) = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
+ response, _ = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
soa = response.get_rrset(
dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.SOA
)
# Pull DNS message from wire until the second SOA is received
while True:
- (response, _) = dns.query.receive_tcp(
- sock, timeout(), one_rr_per_rrset=True
- )
+ response, _ = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
soa = response.get_rrset(
dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.SOA
)
dns.query.send_tcp(sock, msg, timeout())
# Receive the initial DNS message with SOA
- (response, _) = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
+ response, _ = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
soa = response.get_rrset(
dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.SOA
)
with pytest.raises(ConnectionResetError):
# Process queued TCP messages
while True:
- (response, _) = dns.query.receive_tcp(
+ response, _ = dns.query.receive_tcp(
sock, timeout(), one_rr_per_rrset=True
)
soa = response.get_rrset(
dns.query.send_tcp(sock, msg, timeout())
# Receive the initial DNS message with SOA
- (response, _) = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
+ response, _ = dns.query.receive_tcp(sock, timeout(), one_rr_per_rrset=True)
soa = response.get_rrset(
dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.SOA
)
with pytest.raises(EOFError):
while True:
time.sleep(1)
- (response, _) = dns.query.receive_tcp(
+ response, _ = dns.query.receive_tcp(
sock, timeout(), one_rr_per_rrset=True
)
soa = response.get_rrset(
from hypothesis import assume, example, given
from hypothesis.strategies import binary, booleans, composite, just, sampled_from
-
pytestmark = pytest.mark.extra_artifacts(
[
"ans*/ans.run",
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"Ksig0.example2*",
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"axfr.out",
for z in range(zones):
zn = f"zone{z:06d}.example"
with open(f"ns1/{zn}.db", "w", encoding="utf-8") as f:
- f.write(
- """$TTL 300
+ f.write("""$TTL 300
@ IN SOA ns1 . 1 300 120 3600 86400
NS ns1
NS ns2
MX 20 mail2.isp.example.
www A 10.0.0.1
xyzzy A 10.0.0.2
-"""
- )
+""")
with open("ns1/zones.conf", "w", encoding="utf-8") as priconf, open(
"ns2/zones.conf", "w", encoding="utf-8"
import pytest
-
pytestmark = pytest.mark.extra_artifacts(
[
"dig.out*",
return TextProc(lambda text: value.fun(self.fun(text)))
import inspect
- (_frame, filename, lineno, _function_name, lines, _index) = inspect.stack()[1]
+ _frame, filename, lineno, _function_name, lines, _index = inspect.stack()[1]
raise SyntaxError(
"Invalid syntax in config file",
(
raise
## XXXvlab: should use $COLUMNS in bash and for windows:
## http://stackoverflow.com/questions/14978548
- stderr(
- paragraph_wrap(
- textwrap.dedent(
- """\
+ stderr(paragraph_wrap(textwrap.dedent("""\
UnicodeEncodeError:
There was a problem outputing the resulting changelog to
your console.
This probably means that the changelog contains characters
that can't be translated to characters in your current charset
(%s).
- """
- )
- % sys.stdout.encoding
- )
- )
+ """) % sys.stdout.encoding))
if WIN32 and PY_VERSION < 3.6 and sys.stdout.encoding != "utf-8":
## As of PY 3.6, encoding is now ``utf-8`` regardless of
## PYTHONIOENCODING
import checkgrammar
-
logger = logging.getLogger(__name__)
import parsegrammar
-
misc_path = Path(__file__).resolve().parent.parent.parent / "misc"
options_path = misc_path / "options"
}
}
"""
+
import fileinput
import json
import re