assert subdig.alive(), "The single DIG instance is expected to be alive"
assert multidig.alive(), (
"The DIG instances from the set are all expected to "
- "be alive, but {} of them have completed"
- ).format(multidig.completed())
+ f"be alive, but {multidig.completed()} of them have completed"
+ )
# Let's close opened connections (in random order) to let all dig
# processes to complete
connector.disconnect_all()
def zone_mtime(zonedir, name):
try:
- si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
+ si = os.stat(os.path.join(zonedir, f"{name}.db"))
except FileNotFoundError:
return dayzero
def update_expected(expected, key, msg):
msg_len = len(msg.to_wire())
bucket_num = (msg_len // 16) * 16
- bucket = "{}-{}".format(bucket_num, bucket_num + 15)
+ bucket = f"{bucket_num}-{bucket_num + 15}"
expected[key][bucket] += 1
# JSON helper functions
def fetch_zones_json(statsip, statsport):
- r = requests.get(
- "http://{}:{}/json/v1/zones".format(statsip, statsport), timeout=600
- )
+ r = requests.get(f"http://{statsip}:{statsport}/json/v1/zones", timeout=600)
assert r.status_code == 200
data = r.json()
def fetch_traffic_json(statsip, statsport):
- r = requests.get(
- "http://{}:{}/json/v1/traffic".format(statsip, statsport), timeout=600
- )
+ r = requests.get(f"http://{statsip}:{statsport}/json/v1/traffic", timeout=600)
assert r.status_code == 200
data = r.json()
# XML helper functions
def fetch_zones_xml(statsip, statsport):
- r = requests.get(
- "http://{}:{}/xml/v3/zones".format(statsip, statsport), timeout=600
- )
+ r = requests.get(f"http://{statsip}:{statsport}/xml/v3/zones", timeout=600)
assert r.status_code == 200
root = ET.fromstring(r.text)
return out
- r = requests.get(
- "http://{}:{}/xml/v3/traffic".format(statsip, statsport), timeout=600
- )
+ r = requests.get(f"http://{statsip}:{statsport}/xml/v3/traffic", timeout=600)
assert r.status_code == 200
root = ET.fromstring(r.text)
proto_root = root.find("traffic").find(ip).find(proto)
for counters in proto_root.findall("counters"):
if counters.attrib["type"] == "request-size":
- key = "dns-{}-requests-sizes-received-{}".format(proto, ip)
+ key = f"dns-{proto}-requests-sizes-received-{ip}"
else:
- key = "dns-{}-responses-sizes-sent-{}".format(proto, ip)
+ key = f"dns-{proto}-responses-sizes-sent-{ip}"
values = load_counters(counters)
traffic[key] = values
sock.setblocking(0)
err = sock.connect_ex((host, port))
if err not in (0, errno.EINPROGRESS):
- log("%s on connect for socket %s" % (errno.errorcode[err], sock))
+ log(f"{errno.errorcode[err]} on connect for socket {sock}")
errors.append(sock)
else:
queued.append(sock)
queued.remove(sock)
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err:
- log("%s for socket %s" % (errno.errorcode[err], sock))
+ log(f"{errno.errorcode[err]} for socket {sock}")
errors.append(sock)
else:
sock.send(VERSION_QUERY)
def close_connections(active_conns, count):
- log("Closing %s connections..." % "all" if count == 0 else str(count))
+ log("Closing {} connections...".format("all") if count == 0 else str(count))
if count == 0:
count = len(active_conns)
for _ in range(count):
while True:
clientsock, _ = ctlsock.accept()
- log("Accepted control connection from %s" % clientsock)
+ log(f"Accepted control connection from {clientsock}")
cmdline = clientsock.recv(512).decode("ascii").strip()
if cmdline:
- log("Received command: %s" % cmdline)
+ log(f"Received command: {cmdline}")
cmd = cmdline.split()
if cmd[0] == "open":
count, host, port = cmd[1:]
S.p_index += 1
for k, v in S.mutexes.items():
r = re.compile(k)
- line = r.sub("M{:04d}".format(v), line)
+ line = r.sub(f"M{v:04d}", line)
for k, v in S.threads.items():
r = re.compile(k)
- line = r.sub("T{:04d}".format(v), line)
+ line = r.sub(f"T{v:04d}", line)
for k, v in S.pointers.items():
r = re.compile(k)
- line = r.sub("0x{:012d}".format(v), line)
+ line = r.sub(f"0x{v:012d}", line)
line = STACK.sub("", line)
line = PID.sub("", line)