"""TCP Connection Management tests"""
+import struct
+import time
import dns
import dns.message
msg_answer = utils.receive_parse_answer(kresd_sock)
assert msg_answer.id == MSG_ID_SECOND
+
+
+def test_prefix_shorter_than_header(kresd_sock):
+ """
+ Test prefixes message by the value, which is less then the length of the DNS
+ message header and sequentially sends it over TCP connection. (RFC1035 4.2.2)
+
+ Expected: TCP connection must be closed after `net.tcp_in_idle` milliseconds.
+ (by default, after about 10s after connection is established)
+ """
+ msg = dns.message.make_query('localhost.', dns.rdatatype.A, dns.rdataclass.IN)
+ data = msg.to_wire()
+ datalen = 11 # DNS Header size minus 1
+ buf = struct.pack("!H", datalen) + data
+
+ for _ in range(15):
+ try:
+ kresd_sock.sendall(buf)
+ except BrokenPipeError:
+ break
+ else:
+ time.sleep(1)
+ else:
+ assert False, "kresd didn't close connection"