]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/doqclient.py
5 from typing
import Any
, Optional
, cast
9 from aioquic
.quic
.configuration
import QuicConfiguration
10 from aioquic
.asyncio
.client
import connect
11 from aioquic
.asyncio
.protocol
import QuicConnectionProtocol
12 from aioquic
.quic
.configuration
import QuicConfiguration
13 from aioquic
.quic
.events
import QuicEvent
, StreamDataReceived
, StreamReset
14 from aioquic
.quic
.logger
import QuicFileLogger
16 class DnsClientProtocol(QuicConnectionProtocol
):
17 def __init__(self
, *args
, **kwargs
):
18 super().__init
__(*args
, **kwargs
)
19 self
._ack
_waiter
: Any
= None
24 data
= struct
.pack("!H", len(data
)) + data
27 async def query(self
, query
: dns
.message
) -> None:
28 data
= self
.pack(query
.to_wire())
29 # send query and wait for answer
30 stream_id
= self
._quic
.get_next_available_stream_id()
31 self
._quic
.send_stream_data(stream_id
, data
, end_stream
=True)
32 waiter
= self
._loop
.create_future()
33 self
._ack
_waiter
= waiter
36 return await asyncio
.shield(waiter
)
38 def quic_event_received(self
, event
: QuicEvent
) -> None:
39 if self
._ack
_waiter
is not None:
40 if isinstance(event
, StreamDataReceived
):
41 length
= struct
.unpack("!H", bytes(event
.data
[:2]))[0]
42 answer
= dns
.message
.from_wire(event
.data
[2 : 2 + length
], ignore_trailing
=True)
44 waiter
= self
._ack
_waiter
45 self
._ack
_waiter
= None
46 waiter
.set_result(answer
)
47 if isinstance(event
, StreamReset
):
48 waiter
= self
._ack
_waiter
49 self
._ack
_waiter
= None
50 waiter
.set_result(event
)
52 class BogusDnsClientProtocol(DnsClientProtocol
):
56 data
= struct
.pack("!H", len(data
) * 2) + data
60 async def async_quic_query(
61 configuration
: QuicConfiguration
,
66 create_protocol
=DnsClientProtocol
68 print("Connecting to {}:{}".format(host
, port
))
72 configuration
=configuration
,
73 create_protocol
=create_protocol
,
75 client
= cast(DnsClientProtocol
, client
)
76 print("Sending DNS query")
78 async with async_timeout
.timeout(timeout
):
79 answer
= await client
.query(query
)
81 except asyncio
.TimeoutError
as e
:
84 class StreamResetError(Exception):
85 def __init__(self
, error
, message
="Stream reset by peer"):
87 super().__init
__(message
)
89 def quic_query(query
, host
='127.0.0.1', timeout
=2, port
=853, verify
=None, server_hostname
=None):
90 configuration
= QuicConfiguration(alpn_protocols
=["doq"], is_client
=True)
92 configuration
.load_verify_locations(verify
)
95 configuration
=configuration
,
100 create_protocol
=DnsClientProtocol
103 if (isinstance(result
, StreamReset
)):
104 raise StreamResetError(result
.error_code
)
105 if (isinstance(result
, asyncio
.TimeoutError
)):
109 def quic_bogus_query(query
, host
='127.0.0.1', timeout
=2, port
=853, verify
=None, server_hostname
=None):
110 configuration
= QuicConfiguration(alpn_protocols
=["doq"], is_client
=True)
112 configuration
.load_verify_locations(verify
)
113 result
= asyncio
.run(
115 configuration
=configuration
,
120 create_protocol
=BogusDnsClientProtocol
123 if (isinstance(result
, StreamReset
)):
124 raise StreamResetError(result
.error_code
)
125 if (isinstance(result
, asyncio
.TimeoutError
)):