'''Test master-slave-like replication using Redis database.'''
+import random
from dnstest.test import Test
from dnstest.utils import *
-t = Test(redis=True)
+t = Test()
master = t.server("knot")
slave = t.server("knot")
t.link(zones, master)
t.link(zones, slave)
-master.conf_zone(zones).zonefile_sync = "0"
-master.conf_zone(zones).zone_db_output = "1"
-slave.conf_zone(zones).zone_db_input = "1"
+redis_master = t.backend("redis", tls=random.choice([True, False]))
-for z in zones:
- slave.zones[z.name].zfile.remove()
+master.db_out(zones, [redis_master], 1)
+slave.db_in(zones, [redis_master], 1)
t.start()
# Add to DB manually. Slave will diverge from master.
for z in zones:
- txn = t.redis.cli("knot.upd.begin", z.name, master.conf_zone(z).zone_db_output)
- r = t.redis.cli("knot.upd.remove", z.name, txn, "example.com. 3600 in soa dns1.example.com. hostmaster.example.com. %d 10800 3600 1209600 7200" % serials3[z.name])
- r = t.redis.cli("knot.upd.add", z.name, txn, "example.com. 3600 in soa dns1.example.com. hostmaster.example.com. %d 10800 3600 1209600 7200" % (serials3[z.name] + 1))
- r = t.redis.cli("knot.upd.add", z.name, txn, "txtadd 3600 A 1.2.3.4")
- r = t.redis.cli("knot.upd.commit", z.name, txn)
+ txn = redis_master.cli("knot.upd.begin", z.name, master.conf_zone(z).zone_db_output)
+ r = redis_master.cli("knot.upd.remove", z.name, txn, "example.com. 3600 in soa dns1.example.com. hostmaster.example.com. %d 10800 3600 1209600 7200" % serials3[z.name])
+ r = redis_master.cli("knot.upd.add", z.name, txn, "example.com. 3600 in soa dns1.example.com. hostmaster.example.com. %d 10800 3600 1209600 7200" % (serials3[z.name] + 1))
+ r = redis_master.cli("knot.upd.add", z.name, txn, "txtadd 3600 A 1.2.3.4")
+ r = redis_master.cli("knot.upd.commit", z.name, txn)
- r = t.redis.cli("knot.upd.load", z.name, master.conf_zone(z).zone_db_output, str(serials3[z.name]))
+ r = redis_master.cli("knot.upd.load", z.name, master.conf_zone(z).zone_db_output, str(serials3[z.name]))
if not "txtadd" in r:
set_err("NO TXTADD IN UPD")
--- /dev/null
+$ORIGIN example.com.
+$TTL 3600
+
+@ SOA dns1 hostmaster 1 3600 5 7200 600
+ NS dns1
+ NS dns2
+ MX 10 mail
+
+dns1 A 192.0.2.1
+ AAAA 2001:DB8::1
+
+dns2 A 192.0.2.2
+ AAAA 2001:DB8::2
+
+mail A 192.0.2.3
+ AAAA 2001:DB8::3
--- /dev/null
+#!/usr/bin/env python3
+
+'''Test of high availability using Redis replication with the sentinel mode.'''
+
+import random
+from dnstest.test import Test
+from dnstest.utils import *
+
+t = Test()
+
+master = t.server("knot")
+slave1 = t.server("knot")
+slave2 = t.server("knot")
+
+ZONE = "example.com"
+zones = t.zone(ZONE, storage=".")
+
+t.link(zones, master)
+t.link(zones, slave1)
+t.link(zones, slave2)
+
+freeze_kill = random.choice([True, False])
+tls = random.choice([True, False])
+redis_sentinel = t.backend("redis", tls=tls)
+redis_master = t.backend("redis", tls=tls)
+redis_slave1 = t.backend("redis", tls=tls)
+redis_slave2 = t.backend("redis", tls=tls)
+
+redis_slave1.slave_of(redis_master)
+redis_slave2.slave_of(redis_master)
+redis_sentinel.sentinel_of(redis_master, 1)
+
+zone_write_instances = random.choice([
+ [redis_sentinel],
+ [redis_master, redis_slave1, redis_slave2],
+ [redis_sentinel, redis_master, redis_slave1, redis_slave2]
+])
+master.db_out(zones, zone_write_instances, 1)
+slave1.db_in(zones, [redis_slave1], 1)
+slave2.db_in(zones, [redis_slave1, redis_slave2], 1)
+
+t.start()
+
+# Give sentinel some time to discover the replicas
+t.sleep(10)
+
+# Initial synchronization
+serial_init = slave1.zones_wait(zones)
+serial = slave2.zones_wait(zones)
+# serial == serial_init now
+
+# Update master, wait for for replicas
+master.ctl(f"zone-serial-set {ZONE} +1")
+slave1.zones_wait(zones, serial)
+serial = slave2.zones_wait(zones, serial)
+
+# Update replica2/new_master - original master is down
+redis_slave1.stop() # Ensure replica2 becomes a new master
+redis_master.stop()
+t.sleep(1)
+for i in range(10): # usualy just 2x
+ try:
+ master.ctl(f"zone-serial-set {ZONE} +1")
+ except Exception:
+ t.sleep(2)
+ continue
+ break
+slave2.zones_wait(zones, serial)
+redis_slave1.start() # Put replica2 into operation
+serial = slave1.zones_wait(zones, serial)
+
+# Clog replica2/new_master and update new master to replica1
+if freeze_kill:
+ redis_slave2.freeze(20).wait()
+else:
+ redis_slave2.stop(kill=True)
+ t.sleep(20)
+ redis_slave2.start()
+redis_slave1.run_monitor()
+redis_slave2.run_monitor()
+master.ctl(f"zone-serial-set {ZONE} +1")
+slave1.zones_wait(zones, serial)
+serial = slave2.zones_wait(zones, serial)
+
+t.xfr_diff(master, slave1, zones, serial_init)
+t.xfr_diff(master, slave2, zones, serial_init)
+
+# Add to DB manually.
+slave2.stop()
+slave2.db_in(zones, [redis_slave2], 1)
+slave2.gen_confile()
+slave2.start()
+slave2.zones_wait(zones) # interesting: remove and see
+txn = redis_slave1.cli("knot.upd.begin", ZONE, "1")
+redis_slave1.cli("knot.upd.add", ZONE, txn, "test TXT test")
+redis_slave1.cli("knot.upd.commit", ZONE, txn)
+slave1.zones_wait(zones, serial)
+serial = slave2.zones_wait(zones, serial)
+
+t.xfr_diff(slave1, slave2, zones, serial_init)
+
+# Gather some information from replica2.
+redis_slave2.cli("XREAD", "BLOCK", "50", "STREAMS", b"k\x01\x01", "0-0")
+redis_slave2.cli("KNOT.ZONE.INFO", ZONE)
+
+t.end()
"libknot",
"module",
"params",
+ "redis",
"response",
"server",
"test",
import time
class Redis(object):
- def __init__(self, addr, wrk_dir, redis_bin, redis_cli, knotso):
+ counter = 0
+ def __init__(self, addr, wrk_dir, redis_bin, redis_cli, knotso, tls=False):
self.addr = addr
self.port = None
+ self.tls = tls
self.tls_port = None
self.pin = None
- self.wrk_dir = wrk_dir
+ Redis.counter += 1
+ self.wrk_dir = os.path.join(wrk_dir, str(Redis.counter))
self.redis_bin = redis_bin
self.redis_cli = redis_cli
self.knotso = knotso
self.monitor = None
self.monitor_log = None
- if not os.path.exists(wrk_dir):
- os.makedirs(wrk_dir)
+ self._master = None
+ self._sentinel_of = dict()
+
+ if not os.path.exists(self.wrk_dir):
+ os.makedirs(self.wrk_dir)
def wrk_file(self, filename):
return os.path.join(self.wrk_dir, filename)
with open(self.conf_file(), "w") as cf:
cf.write("dir " + self.wrk_dir + os.linesep)
cf.write("logfile " + self.wrk_file("redis.log") + os.linesep)
- cf.write("loadmodule " + self.knotso + os.linesep)
+ if len(self._sentinel_of) == 0:
+ cf.write("loadmodule " + self.knotso + os.linesep)
cf.write("bind " + self.addr + os.linesep)
cf.write("port " + str(self.port) + os.linesep)
cf.write("tls-port " + str(self.tls_port) + os.linesep)
cf.write("tls-protocols \"TLSv1.3\"" + os.linesep)
cf.write("tls-auth-clients no" + os.linesep)
+ cf.write("tls-ca-cert-file cert.pem" + os.linesep)
cf.write("tls-key-file key.pem" + os.linesep)
cf.write("tls-cert-file cert.pem" + os.linesep)
+ cf.write("enable-debug-command yes" + os.linesep)
+ cf.write("repl-ping-replica-period 1" + os.linesep)
if self.addr != "127.0.0.1" and self.addr != "::1":
cf.write("protected-mode no " + os.linesep)
+ if self._master != None:
+ port = self._master.tls_port if self._master.tls else self._master.port
+ cf.write(f"replicaof {self._master.addr} {port}" + os.linesep)
+ if self.tls:
+ cf.write("tls-replication yes" + os.linesep)
+ if not self._sentinel_of.items():
+ cf.write("appendonly yes" + os.linesep)
+
+ server_idx = 0
+ for server, quorum in self._sentinel_of.items():
+ port = server.tls_port if server.tls else server.port
+ cf.write(f"sentinel monitor master-{server_idx} {server.addr} {port} {quorum}" + os.linesep)
+ cf.write(f"sentinel down-after-milliseconds master-{server_idx} 1000" + os.linesep)
+ cf.write(f"sentinel failover-timeout master-{server_idx} 6000" + os.linesep)
+ server_idx += 1
shutil.copy(os.path.join(params.common_data_dir, "cert", "cert.pem"), self.wrk_dir)
shutil.copy(os.path.join(params.common_data_dir, "cert", "key.pem"), self.wrk_dir)
out = subprocess.check_output(["certtool", "--infile=" + keyfile, "-k"]).rstrip().decode('ascii')
self.pin = ssearch(out, r'pin-sha256:([^\n]*)')
+ def get_prio(self):
+ if len(self._sentinel_of) > 0:
+ return 2
+ elif self._master != None:
+ return 1
+ else:
+ return 0
+
+ # Pass just master Redis, slaves are auto-discovered while starting server
+ def sentinel_of(self, master, quorum=1):
+ if self._master is not None:
+ raise AssertionError("can't be sentinel and db at once")
+ self._sentinel_of[master] = quorum
+
+ def slave_of(self, master):
+ if len(self._sentinel_of) != 0:
+ raise AssertionError("can't be sentinel and db at once")
+ self._master = master
+
def start(self):
- self.proc = subprocess.Popen([ self.redis_bin, self.conf_file() ])
- time.sleep(0.3)
- monitor_cmd = [ self.redis_cli, "-h", self.addr, "-p", str(self.port), "monitor" ]
- self.monitor_log = open(os.path.join(self.wrk_dir, "monitor.log"), "a")
- self.monitor = subprocess.Popen(monitor_cmd, stdout=self.monitor_log, stderr=self.monitor_log)
+ prog = [self.redis_bin, self.conf_file()]
+ is_sentinel = len(self._sentinel_of) > 0
+ if is_sentinel:
+ prog.append('--sentinel')
+ self.proc = subprocess.Popen(prog)
+ if not is_sentinel:
+ time.sleep(0.3)
+ monitor_cmd = [ self.redis_cli, "-h", self.addr, "-p", str(self.port), "monitor" ]
+ self.monitor_log = open(os.path.join(self.wrk_dir, "monitor.log"), "a")
+ self.monitor = subprocess.Popen(monitor_cmd, stdout=self.monitor_log, stderr=self.monitor_log)
def stop(self):
if self.monitor:
if self.proc:
self.proc.terminate()
+ def freeze(self, seconds):
+ cmd = [ self.redis_cli, "-h", self.addr, "-p", str(self.port), "DEBUG", "sleep", str(seconds) ]
+ return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
def cli(self, *params):
cmd = [ self.redis_cli, "-h", self.addr, "-p", str(self.port) ] + list(params)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.session_log = None
self.confile = None
- self.redis = None
+ self.redis_backends = list()
self.binding_errors = 0
s.item_str("timer-db-max-size", self.timer_db_size)
s.item_str("timer-db-sync", random.choice(["shutdown", "immediate", "5", "3600"]))
s.item_str("catalog-db-max-size", self.catalog_db_size)
- if self.redis is not None:
- tls = random.choice([True, False])
- port = self.redis.tls_port if tls else self.redis.port
- s.item_str("zone-db-listen", self.redis.addr + "@" + str(port))
- if tls:
- s.item_str("zone-db-cert-key", self.redis.pin)
+ if len(self.redis_backends) > 0:
+ s.item_list("zone-db-listen", map(lambda b: f"{b.addr}@{b.tls_port if b.tls else b.port}",
+ self.redis_backends))
+ if list(self.redis_backends)[0].tls:
+ s.item_list("zone-db-cert-key", map(lambda b: f"{b.pin}", self.redis_backends))
s.item_str("zone-db-tls", "on")
s.end()
return s.conf
+ def db_out(self, zones, redis_list, instance):
+ for z in zones:
+ self.conf_zone(z).zone_db_output = str(instance)
+ self.redis_backends = redis_list
+
+ def db_in(self, zones, redis_list, instance):
+ for z in zones:
+ self.conf_zone(z).zone_db_input = str(instance)
+ self.zones[z.name].zfile.remove()
+ self.redis_backends = redis_list
+
def ctl_sock_rnd(self, name_only=False):
sockname = random.choice(["knot.sock", "knot2.sock"])
sockpath = os.path.join(self.dir, sockname)
rel_time = time.time()
start_time = 0
- def __init__(self, address=None, tsig=None, stress=True, quic=False, tls=False, redis=False):
+ def __init__(self, address=None, tsig=None, stress=True, quic=False, tls=False):
if not os.path.exists(Context().out_dir):
raise Exception("Output directory doesn't exist")
else:
self.addr = Test.LOCAL_ADDR_MULTI[random.choice([4, 6])]
- self.redis = None
- redis_knotso = repo_file("src", "redis", ".libs", "knot.so")
- if redis:
- if params.redis_bin == "":
- raise Skip("Redis server not available")
- if redis_knotso is None:
- raise Skip("Redis knot module not available")
- self.redis = dnstest.redis.Redis(self.addr, os.path.join(self.out_dir, "redis"),
- params.redis_bin, params.redis_cli, redis_knotso)
+ self.backends = set()
self.tsig = None
if tsig != None:
if os.path.isfile(suppressions_file):
srv.valgrind.append("--suppressions=%s" % suppressions_file)
- srv.redis = self.redis
-
self.servers.add(srv)
return srv
for server in servers:
self.server_remove(server)
+ def backend(self, kind, tls=False):
+ if kind == "redis":
+ if params.redis_bin == "":
+ raise Skip("Redis server not available")
+
+ redis_knotso = repo_file("src", "redis", ".libs", "knot.so")
+ if redis_knotso is None:
+ raise Skip("Redis knot module not available")
+
+ backend = dnstest.redis.Redis(self.addr,
+ os.path.join(self.out_dir, "redis"),
+ params.redis_bin, params.redis_cli,
+ redis_knotso, tls)
+ self.backends.add(backend)
+ return backend
+ else:
+ raise Failed("Unsupported backend type '%s'" % kind)
+
def generate_conf(self):
# Next two loops can't be merged!
for server in self.servers:
server.xdp_cover_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server.xdp_cover_sock.bind((server.addr, server.xdp_port))
- if self.redis is not None:
- self.redis.port = self._gen_port()
- self.redis.tls_port = self._gen_port()
- self.redis.gen_confile()
+ for backend in sorted(self.backends, key=lambda e: e.get_prio()):
+ backend.port = self._gen_port()
+ backend.tls_port = self._gen_port()
+ backend.gen_confile()
for server in self.servers:
server.gen_confile()
self.generate_conf()
- if self.redis:
- self.redis.start()
+ for b in self.backends:
+ b.start()
def srv_sort(server):
masters = 0
else:
server.stop(check=check)
- if self.redis:
- self.redis.stop()
+ for b in self.backends:
+ b.stop()
def end(self):
'''Finish testing'''