It's a fairly common pattern to use regular expression in our tests.
Instead of using the fairly verbose re.compile(), import that function
as Re() instead to allow for more brevity in the test syntax.
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import re
+from re import compile as Re
import pytest
@pytest.fixture(scope="module")
def transfers_complete(servers):
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
- pattern = re.compile(
+ pattern = Re(
f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
)
for ns in ["ns2", "ns3", "ns4", "ns5"]:
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import re
-
+from re import compile as Re
import isctest
"apply_configuration",
"loop exclusive mode: starting",
"apply_configuration: configure_views",
- re.compile(r".*port '9999999' out of range"),
+ Re(r".*port '9999999' out of range"),
"apply_configuration: detaching views",
"loop exclusive mode: ending",
"reloading configuration failed",
import filecmp
import os
from pathlib import Path
-import re
+from re import compile as Re
import shutil
import subprocess
import tempfile
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent)
-ENV_RE = re.compile(b"([^=]+)=(.*)")
+ENV_RE = Re(b"([^=]+)=(.*)")
PRIORITY_TESTS = [
# Tests that are scheduled first. Speeds up parallel execution.
"rpz/",
"timeouts/",
"upforwd/",
]
-PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
-SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
-SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
+PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
+SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
+SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
# ----------------------- Global requirements ----------------------------
from collections import namedtuple
import os
import re
+from re import compile as Re
import struct
import time
os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private")
def loadkeys():
- pattern = re.compile(f"{zone}/IN.*next key event")
+ pattern = Re(f"{zone}/IN.*next key event")
with ns2.watch_log_from_here() as watcher:
ns2.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(pattern)
# information regarding copyright ownership.
import os
-import re
+from re import compile as Re
from dns import edns
msg = isctest.query.create(".", "DNSKEY")
opt = edns.GenericOption(14, b"\xff\xff")
msg.use_edns(edns=True, options=[opt])
- pattern = re.compile("trust-anchor-telemetry './IN' from .* 65535")
+ pattern = Re("trust-anchor-telemetry './IN' from .* 65535")
with ns1.watch_log_from_here() as watcher:
res = isctest.query.tcp(msg, "10.53.0.1")
watcher.wait_for_line(pattern)
opt1 = edns.GenericOption(14, b"\xff\xff")
opt2 = edns.GenericOption(14, b"\xff\xfe")
msg.use_edns(edns=True, options=[opt2, opt1])
- pattern = re.compile("trust-anchor-telemetry './IN' from .* 65534")
+ pattern = Re("trust-anchor-telemetry './IN' from .* 65534")
with ns1.watch_log_from_here() as watcher:
res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.noerror(res)
# information regarding copyright ownership.
import os
-import re
+from re import compile as Re
import isctest
def test_validator_logging(ns4):
# check that validator logging includes the view name with multiple views
- pattern = re.compile("view rec: *validat")
+ pattern = Re("view rec: *validat")
with ns4.watch_log_from_start() as watcher:
msg = isctest.query.create("secure.example", "NS")
isctest.query.tcp(msg, "10.53.0.4")
import os
from pathlib import Path
import re
+from re import compile as Re
import time
from typing import Dict, List, Optional, Tuple, Union
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
with server.watch_log_from_start() as watcher:
- watcher.wait_for_line(re.compile(waitfor))
+ watcher.wait_for_line(Re(waitfor))
# WMM: The with code below is extracting the line the watcher was
# waiting for. If WatchLog.wait_for_line()` returned the matched string,
import abc
import os
import re
+from re import compile as Re
import time
if isinstance(string, Pattern):
patterns.append(string)
elif isinstance(string, str):
- pattern = re.compile(re.escape(string))
+ pattern = Re(re.escape(string))
patterns.append(pattern)
else:
raise WatchLogException(
Recommended use:
```python
+ from re import compile as Re
import isctest
def test_foo(servers):
with servers["ns1"].watch_log_from_start() as watcher:
watcher.wait_for_line("all zones loaded")
- pattern = re.compile(r"next key event in ([0-9]+) seconds")
+ pattern = Re(r"next key event in ([0-9]+) seconds")
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
match = watcher.wait_for_line(pattern)
>>> # Different values must be returned depending on which line is
>>> # found in the log file.
>>> import tempfile
- >>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
+ >>> from re import compile as Re
+ >>> patterns = [Re(r"bar ([0-9])"), "qux"]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo bar 3", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
>>> assert ret[1].group(0) == "foo"
>>> import tempfile
- >>> bar_pattern = re.compile('bar')
+ >>> from re import compile as Re
+ >>> bar_pattern = Re('bar')
>>> patterns = ['foo', bar_pattern]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)
# information regarding copyright ownership.
import os
-import re
+from re import compile as Re
from typing import Optional
from .. import log
return
assert os.path.isfile(path), f"{path} exists, but it's not a file"
- regex = re.compile(r"([^=]+)=(.*)")
+ regex = Re(r"([^=]+)=(.*)")
log.debug(f"parsing openssl config: {path}")
with open(path, "r", encoding="utf-8") as conf:
for line in conf:
from datetime import timedelta
import os
import re
+from re import compile as Re
import pytest
]
cmd = isctest.run.cmd(journalprint)
- pattern = re.compile(
- r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
- )
+ pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
match = pattern.search(cmd.out)
assert not match, f"{match.group(1)} record found in journal"
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-import re
+from re import compile as Re
import isctest
def wait_for_sending_notify(ns1, ns, key_name):
- pattern = re.compile(
+ pattern = Re(
f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)"
)
with ns1.watch_log_from_start() as watcher:
import glob
import os
import re
+from re import compile as Re
import shutil
import signal
import time
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
- pattern = re.compile(
+ pattern = Re(
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)