import secrets
import struct
import time
-from typing import Optional
+from typing import Optional, Set, Tuple, Union
import aioquic
import aioquic.buffer
self.expecting = 0
async def receive(self, timeout: Optional[float] = None):
- context: trio.CancelScope | NullContext
+ context: Union[trio.CancelScope, NullContext]
if timeout is None:
context = NullContext(None)
else:
def __init__(self, listener, cid, peer, retry_cid=None):
self.original_cid: bytes = cid
self.listener = listener
- self.cids: set[bytes] = set()
+ self.cids: Set[bytes] = set()
self.cids.add(cid)
self.listener.connections[cid] = self
self.peer = peer
self.worker_scope = None
self.streams = {}
- def get_timer_values(self, now: float) -> tuple[float, float]:
+ def get_timer_values(self, now: float) -> Tuple[float, float]:
expiration = self.quic_connection.get_timer()
if expiration is None:
expiration = now + 3600 # arbitrary "big" value