"""TCP Connection Management tests"""
+import socket
import struct
import time
assert answer.id == msgid2
+@pytest.mark.parametrize('sock_func_name', [
+ 'ip_tcp_socket',
+ 'ip6_tcp_socket',
+])
+def test_oob(kresd, sock_func_name):
+ """TCP out-of-band (urgent) data must not crash resolver."""
+ make_sock = getattr(kresd, sock_func_name)
+ sock = make_sock()
+ msg_buff, msgid = utils.get_msgbuff()
+ sock.sendall(msg_buff, socket.MSG_OOB)
+
+ try:
+ msg_answer = utils.receive_parse_answer(sock)
+ assert msg_answer.id == msgid
+ except ConnectionError:
+ pass # TODO kresd responds with TCP RST, this should be fixed
+
+ # check kresd is alive
+ sock2 = make_sock()
+ utils.ping_alive(sock2)
+
+
def flood_buffer(msgcount):
flood_buff = bytes()
msgbuff, _ = utils.get_msgbuff()