import dns.query
import dns.rcode
+import isctest
+
# ISO datetime format without msec
fmt = "%Y-%m-%dT%H:%M:%SZ"
max_expires = timedelta(seconds=14515200) # 24 weeks
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
-TIMEOUT = 10
-
# Generic helper functions
def check_expires(expires, min_time, max_time):
return msg
-def udp_query(ip, port, msg):
- ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
-def tcp_query(ip, port, msg):
- ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
def create_expected(data):
expected = {
"dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
def test_traffic(fetch_traffic, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
- port = kwargs["port"]
data = fetch_traffic(statsip, statsport)
exp = create_expected(data)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
+ ans = isctest.query.udp(msg, statsip)
+ isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
+ ans = isctest.query.udp(msg, statsip)
+ isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
+ ans = isctest.query.tcp(msg, statsip)
+ isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
+ ans = isctest.query.tcp(msg, statsip)
+ isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)