The following test suite focuses on edge cases for the prefix - when it
is either too short or too long, instead of matching the length of DNS
message exactly.
-
-The tests with invalid prefix attempt to sequentially send the invalid
-message. After a certain period of time (affected by net.tcp_in_idle,
-TCP_DEFER_ACCEPT, ...), kresd should close the connection. There are
-three variants of these tests - either no valid query is sent, or one
-valid query is sent along with the invalid buffer at once, or one valid
-query is sent and afterwards the invalid buffer is sent.
"""
import time
import utils
-def send_invalid_repeatedly(sock, buff, delay=1):
- """Utility function to keep sending the buffer until MAX_TIMEOUT is reached.
-
- It is expected kresd will close the connection, since the buffer
- contains invalid prefix of the message.
-
- If the connection remains open, test is failed.
- """
- end_time = time.time() + utils.MAX_TIMEOUT
-
- with utils.expect_kresd_close():
- while time.time() < end_time:
- sock.sendall(buff)
- time.sleep(delay)
-
-
@pytest.fixture(params=[
'no_query_before',
- 'send_query_before_invalid',
- 'send_query_before_invalid_single_buffer',
+ 'query_before',
+ 'query_before_in_single_buffer',
])
-def send_query_before(request):
- """This either performs no query, or sends a query along with invalid buffer at once, or
- sends a query and then the invalid buffer."""
+def send_query(request):
+ """Function sends a buffer, either by itself, or with a valid query before.
+ If a valid query is sent before, it can be sent either in a separate buffer, or
+ along with the provided buffer."""
# pylint: disable=possibly-unused-variable
- def no_query_before(*args, **kwargs): # pylint: disable=unused-argument
- pass
+ def no_query_before(sock, buff): # pylint: disable=unused-argument
+ sock.sendall(buff)
- def send_query_before_invalid(sock, invalid_buff, single_buffer=False):
+ def query_before(sock, buff, single_buffer=False):
"""Send an initial query and expect a response."""
msg_buff, msgid = utils.get_msgbuff()
if single_buffer:
- sock.sendall(msg_buff + invalid_buff)
+ sock.sendall(msg_buff + buff)
else:
sock.sendall(msg_buff)
- sock.sendall(invalid_buff)
+ sock.sendall(buff)
answer = utils.receive_parse_answer(sock)
assert answer.id == msgid
- def send_query_before_invalid_single_buffer(sock, invalid_buff):
- return send_query_before_invalid(sock, invalid_buff, single_buffer=True)
+ def query_before_in_single_buffer(sock, buff):
+ return query_before(sock, buff, single_buffer=True)
return locals()[request.param]
-def test_prefix_less_than_header(kresd_sock, send_query_before):
- """Prefix is less than the length of the DNS message header."""
+@pytest.mark.parametrize('datalen', [
+ 1, # just one byte of DNS header
+ 11, # DNS header size minus 1
+ 14, # DNS Header size plus 2
+])
+def test_prefix_cuts_message(kresd_sock, datalen, send_query):
+ """Prefix is shorter than the DNS message."""
wire, _ = utils.prepare_wire()
- datalen = 11 # DNS header size minus 1
+ assert datalen < len(wire)
invalid_buff = utils.prepare_buffer(wire, datalen)
- send_query_before(kresd_sock, invalid_buff)
- send_invalid_repeatedly(kresd_sock, invalid_buff)
+ send_query(kresd_sock, invalid_buff) # buffer breaks parsing of TCP stream
+ with utils.expect_kresd_close():
+ utils.ping_alive(kresd_sock)
-def test_prefix_greater_than_message(kresd_sock, send_query_before):
+
+def test_prefix_greater_than_message(kresd_sock, send_query):
"""Prefix is greater than the length of the entire DNS message."""
- wire, _ = utils.prepare_wire()
+ wire, invalid_msgid = utils.prepare_wire()
datalen = len(wire) + 16
invalid_buff = utils.prepare_buffer(wire, datalen)
- send_query_before(kresd_sock, invalid_buff)
- send_invalid_repeatedly(kresd_sock, invalid_buff)
+ send_query(kresd_sock, invalid_buff)
+ valid_buff, _ = utils.get_msgbuff()
+ kresd_sock.sendall(valid_buff)
-def test_prefix_cuts_message(kresd_sock, send_query_before):
- """Prefix is greater than the length of the DNS message header, but shorter than
- the entire DNS message."""
- wire, _ = utils.prepare_wire()
- datalen = 14 # DNS Header size plus 2
- assert datalen < len(wire)
- invalid_buff = utils.prepare_buffer(wire, datalen)
+ # invalid_buff is answered (treats additional data as trailing garbage)
+ answer = utils.receive_parse_answer(kresd_sock)
+ assert answer.id == invalid_msgid
- send_query_before(kresd_sock, invalid_buff)
- send_invalid_repeatedly(kresd_sock, invalid_buff)
+ # parsing stream is broken by the invalid_buff, valid query is never answered
+ with utils.expect_kresd_close():
+ utils.receive_parse_answer(kresd_sock)
@pytest.mark.parametrize('glength', [