# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-from typing import List, Tuple
+from typing import Any, Dict, List, Tuple
import dns._features
import dns.asyncbackend
AsyncioQuicManager,
AsyncioQuicStream,
)
- from dns.quic._common import AsyncQuicConnection, AsyncQuicManager
- from dns.quic._sync import SyncQuicConnection, SyncQuicManager, SyncQuicStream
+ from dns.quic._common import AsyncQuicConnection # pyright: ignore
+ from dns.quic._common import AsyncQuicManager
+ from dns.quic._sync import SyncQuicConnection # pyright: ignore
+ from dns.quic._sync import SyncQuicStream # pyright: ignore
+ from dns.quic._sync import SyncQuicManager
have_quic = True
# We have a context factory and a manager factory as for trio we need to have
# a nursery.
- _async_factories = {"asyncio": (null_factory, _asyncio_manager_factory)}
+ _async_factories: Dict[str, Tuple[Any, Any]] = {
+ "asyncio": (null_factory, _asyncio_manager_factory)
+ }
if dns._features.have("trio"):
import trio
else: # pragma: no cover
have_quic = False
- from typing import Any
-
class AsyncQuicStream: # type: ignore
pass
import struct
import time
+import aioquic.h3.connection # type: ignore
+import aioquic.h3.events # type: ignore
import aioquic.quic.configuration # type: ignore
import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
datagrams = self._connection.datagrams_to_send(time.time())
for datagram, address in datagrams:
assert address == self._peer
+ assert self._socket is not None
await self._socket.sendto(datagram, self._peer, None)
(expiration, interval) = self._get_timer_values()
try:
return
if isinstance(event, aioquic.quic.events.StreamDataReceived):
if self.is_h3():
+ assert self._h3_conn is not None
h3_events = self._h3_conn.handle_event(event)
for h3_event in h3_events:
if isinstance(h3_event, aioquic.h3.events.HeadersReceived):
self._handshake_complete.set()
elif isinstance(event, aioquic.quic.events.ConnectionTerminated):
self._done = True
- self._receiver_task.cancel()
+ if self._receiver_task is not None:
+ self._receiver_task.cancel()
elif isinstance(event, aioquic.quic.events.StreamReset):
stream = self._streams.get(event.stream_id)
if stream:
async def close(self):
if not self._closed:
- self._manager.closed(self._peer[0], self._peer[1])
+ if self._manager is not None:
+ self._manager.closed(self._peer[0], self._peer[1])
self._closed = True
self._connection.close()
# sender might be blocked on this, so set it
self._socket_created.set()
await self._wakeup()
try:
- await self._receiver_task
+ if self._receiver_task is not None:
+ await self._receiver_task
except asyncio.CancelledError:
pass
try:
- await self._sender_task
+ if self._sender_task is not None:
+ await self._sender_task
except asyncio.CancelledError:
pass
- await self._socket.close()
+ if self._socket is not None:
+ await self._socket.close()
class AsyncioQuicManager(AsyncQuicManager):
import socket
import struct
import time
-import urllib
+import urllib.parse
from typing import Any, Optional
import aioquic.h3.connection # type: ignore
self._closed = False
self._manager = manager
self._streams = {}
- if manager.is_h3():
+ if manager is not None and manager.is_h3():
self._h3_conn = aioquic.h3.connection.H3Connection(connection, False)
else:
self._h3_conn = None
del self._streams[stream_id]
def send_headers(self, stream_id, headers, is_end=False):
+ assert self._h3_conn is not None
self._h3_conn.send_headers(stream_id, headers, is_end)
def send_data(self, stream_id, data, is_end=False):
+ assert self._h3_conn is not None
self._h3_conn.send_data(stream_id, data, is_end)
def _get_timer_values(self, closed_is_special=True):
import threading
import time
+import aioquic.h3.connection # type: ignore
+import aioquic.h3.events # type: ignore
import aioquic.quic.configuration # type: ignore
import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
return
if isinstance(event, aioquic.quic.events.StreamDataReceived):
if self.is_h3():
+ assert self._h3_conn is not None
h3_events = self._h3_conn.handle_event(event)
for h3_event in h3_events:
if isinstance(h3_event, aioquic.h3.events.HeadersReceived):
with self._lock:
if self._closed:
return
- self._manager.closed(self._peer[0], self._peer[1])
+ if self._manager is not None:
+ self._manager.closed(self._peer[0], self._peer[1])
self._closed = True
self._connection.close()
self._send_wakeup.send(b"\x01")
- self._worker_thread.join()
+ if self._worker_thread is not None:
+ self._worker_thread.join()
class SyncQuicManager(BaseQuicManager):
import struct
import time
+import aioquic.h3.connection # type: ignore
+import aioquic.h3.events # type: ignore
import aioquic.quic.configuration # type: ignore
import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
return
if isinstance(event, aioquic.quic.events.StreamDataReceived):
if self.is_h3():
+ assert self._h3_conn is not None
h3_events = self._h3_conn.handle_event(event)
for h3_event in h3_events:
if isinstance(h3_event, aioquic.h3.events.HeadersReceived):
async def close(self):
if not self._closed:
- self._manager.closed(self._peer[0], self._peer[1])
+ if self._manager is not None:
+ self._manager.closed(self._peer[0], self._peer[1])
self._closed = True
self._connection.close()
self._send_pending = True
[tool.pyright]
reportUnsupportedDunderAll = false
-exclude = ["dns/quic/*.py", "examples/*.py", "tests/*.py"] # (mostly) temporary!
+exclude = ["examples/*.py", "tests/*.py"]