self._targetSerial = 1
self._serverPort = port
listener = threading.Thread(name='RPZ Listener', target=self._listener, args=[])
- listener.setDaemon(True)
+ listener.daemon = True
listener.start()
def getCurrentSerial(self):
thread = threading.Thread(name='RPZ Connection Handler',
target=self._connectionHandler,
args=[conn])
- thread.setDaemon(True)
+ thread.daemon = True
thread.start()
except socket.error as e:
log-rpz-changes=yes
""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+ def sendNotify(self):
+ notify = dns.message.make_query('zone.rpz', 'SOA', want_dnssec=False)
+ notify.set_opcode(4) # notify
+ res = self.sendUDPQuery(notify)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(res.opcode(), 4)
+ self.assertEqual(res.question[0].to_text(), 'zone.rpz. IN SOA')
+
def assertAdditionalHasSOA(self, msg):
if not isinstance(msg, dns.message.Message):
raise TypeError("msg is not a dns.message.Message but a %s" % type(msg))
global rpzServerPort
_lua_config_file = """
-- The first server is a bogus one, to test that we correctly fail over to the second one
- rpzMaster({'127.0.0.1:9999', '127.0.0.1:%d'}, 'zone.rpz.', { refresh=1, includeSOA=true})
+ rpzMaster({'127.0.0.1:9999', '127.0.0.1:%d'}, 'zone.rpz.', { refresh=3600, includeSOA=true})
""" % (rpzServerPort)
_confdir = 'RPZXFR'
_wsPort = 8042
webserver-password=%s
api-key=%s
disable-packetcache
+allow-notify-from=127.0.0.0/8
+allow-notify-for=zone.rpz
""" % (_confdir, _wsPort, _wsPassword, _apiKey)
_xfrDone = 0
raise AssertionError("Waited %d seconds for the serial to be updated to %d but the serial is still %d" % (timeout, serial, currentSerial))
def testRPZ(self):
+ # Fresh RPZ does not need a notify
self.waitForTCPSocket("127.0.0.1", self._wsPort)
# first zone, only a should be blocked
self.waitUntilCorrectSerialIsLoaded(1)
self.checkNotBlocked('c.example.')
# second zone, a and b should be blocked
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(2)
self.checkRPZStats(2, 2, 1, self._xfrDone)
self.checkBlocked('a.example.', soa=True)
self.checkNotBlocked('c.example.')
# third zone, only b should be blocked
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(3)
self.checkRPZStats(3, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('c.example.')
# fourth zone, only c should be blocked
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(4)
self.checkRPZStats(4, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkBlocked('c.example.', soa=True)
# fifth zone, we should get a full AXFR this time, and only d should be blocked
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(5)
self.checkRPZStats(5, 3, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkBlocked('d.example.', soa=True)
# sixth zone, only e should be blocked, f is a local data record
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(6)
self.checkRPZStats(6, 2, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'), soa=True)
# seventh zone, e should only have one A
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(7)
self.checkRPZStats(7, 4, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkDropped('drop.example.')
# eighth zone, all entries should be gone
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(8)
self.checkRPZStats(8, 0, 3, self._xfrDone)
self.checkNotBlocked('a.example.')
# 9th zone is a duplicate, it might get skipped
global rpzServer
rpzServer.moveToSerial(9)
+ self.sendNotify()
time.sleep(3)
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(10)
self.checkRPZStats(10, 1, 4, self._xfrDone)
self.checkNotBlocked('a.example.')
# the next update will update the zone twice
rpzServer.moveToSerial(11)
+ self.sendNotify()
time.sleep(3)
+ self.sendNotify()
self.waitUntilCorrectSerialIsLoaded(12)
self.checkRPZStats(12, 1, 4, self._xfrDone)
self.checkNotBlocked('a.example.')
def __init__(self, port):
self._serverPort = port
listener = threading.Thread(name='RPZ Simple Auth Listener', target=self._listener, args=[])
- listener.setDaemon(True)
+ listener.daemon = True
listener.start()
def _getAnswer(self, message):