# Create 4 worker threads running "rndc" commands in a loop.
with concurrent.futures.ThreadPoolExecutor() as executor:
for i in range(1, 5):
- domain = "example%d" % i
+ domain = f"example{i}"
executor.submit(rndc_loop, test_state, domain, ns3)
# Run "rndc status" 10 times, with 1-second pauses between attempts.
"--no-ocsp",
"--alpn=dot",
"--logfile=gnutls-cli.log",
- "--port=%d" % named_tlsport,
+ f"--port={named_tlsport}",
"10.53.0.1",
]
with open("gnutls-cli.err", "wb") as gnutls_cli_stderr, subprocess.Popen(
def build_synthetic_name_v4(prefix, ip, domain):
- return dns.name.from_text(
- "{0}{1}.{2}".format(prefix, format(ip).replace(".", "-"), domain)
- )
+ return dns.name.from_text(f"{prefix}{format(ip).replace('.', '-')}.{domain}")
def build_synthetic_name_v6(prefix, ip, domain):
except socket.error:
family = socket.AF_INET6
- log("Opening %d connections..." % count)
+ log(f"Opening {count} connections...")
for _ in range(count):
sock = socket.socket(family, socket.SOCK_STREAM)
active_conns.append(sock)
if errors:
- log("result=FAIL: %d connection(s) failed" % len(errors))
+ log(f"result=FAIL: {len(errors)} connection(s) failed")
elif queued:
- log("result=FAIL: Timed out, aborting %d pending connections" % len(queued))
+ log(f"result=FAIL: Timed out, aborting {len(queued)} pending connections")
for sock in queued:
sock.close()
else:
- log("result=OK: Successfully opened %d connections" % count)
+ log(f"result=OK: Successfully opened {count} connections")
def close_connections(active_conns, count):
- log("Closing {} connections...".format("all") if count == 0 else str(count))
+ log(f"Closing {'all' if count == 0 else count} connections...")
if count == 0:
count = len(active_conns)
for _ in range(count):
sock = active_conns.pop(0)
sock.close()
- log("result=OK: Successfully closed %d connections" % count)
+ log(f"result=OK: Successfully closed {count} connections")
def sigterm(*_):
except KeyError:
port = 5309
- log("Listening on %s:%d" % (listenip, port))
+ log(f"Listening on {listenip}:{port}")
ctlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ctlsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[tool.pylint.messages_control]
disable = [
"C0103", # invalid-name
- "C0209", # consider-using-f-string
"C0114", # missing-module-docstring
"C0115", # missing-class-docstring
"C0116", # missing-function-docstring
[tool.ruff]
target-version = "py310"
+lint.select = [
+ # f-strings
+ "UP031",
+ "UP032",
+]
extend-exclude = [
"bin/tests/system/vulture_ignore_list.py",
"contrib",