self.recv_state = State.DONE
return body
- async def complete(self):
+ async def reset(self) -> bool:
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
is_keepalive = self.send_keep_alive and self.recv_keep_alive
if not (is_fully_complete and is_keepalive):
await self.close()
- return
+ return False
if self.mode == Mode.CLIENT:
self.send_state = State.SEND_METHOD_LINE
self.send_keep_alive = True
self.recv_keep_alive = True
self.processing_1xx = False
+ return True
async def close(self):
if self.send_state != State.CLOSED:
self.recv_state = State.CLOSED
await self.stream.close()
+ def is_keepalive(self) -> bool:
+ return (
+ self.send_keep_alive and
+ self.recv_keep_alive and
+ self.send_state != State.CLOSED
+ )
+
def is_idle(self) -> bool:
return (
self.send_state == State.SEND_METHOD_LINE or
await self._send_head(request)
await self._send_body(request)
code, headers = await self._recv_head()
- stream = HTTPStream(self._recv_body, self._complete)
+ stream = HTTPStream(self._recv_body, self._reset)
# TODO...
return Response(code, headers=headers, content=stream)
# finally:
async def _recv_body(self) -> bytes:
return await self._parser.recv_body()
- # Request/response cycle complete...
- async def _complete(self) -> None:
- await self._parser.complete()
+ # Request/response cycle reset...
+ async def _reset(self) -> None:
+ await self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
async def _close(self) -> None:
try:
while not self._parser.is_closed():
method, url, headers = await self._recv_head()
- stream = HTTPStream(self._recv_body, self._complete)
+ stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
async with Request(method, url, headers=headers, content=stream) as request:
try:
except Exception:
logger.error("Internal Server Error", exc_info=True)
content = Text("Internal Server Error")
- err = Response(code=500, content=content)
+ err = Response(500, content=content)
await self._send_head(err)
await self._send_body(err)
else:
await self._send_head(response)
await self._send_body(response)
+ if self._parser.is_keepalive():
+ await stream.read()
+ await self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
await self._parser.send_body(b'')
# Start it all over again...
- async def _complete(self):
- await self._parser.complete
+ async def _reset(self):
+ await self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
self._is_closed = True
self._socket.close()
+ def is_closed(self) -> bool:
+ return self._is_closed
+
def __repr__(self):
description = ""
description += " TLS" if self._is_tls else ""
self._max_workers = 5
self._executor = None
self._thread = None
- self._streams = list[NetworkStream]
+ self._streams: list[NetworkStream] = []
@property
def host(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.listener.close()
+ for stream in self._streams:
+ stream.close()
self._executor.shutdown(wait=True)
def _serve(self):
while stream := self.listener.accept():
self._executor.submit(self._handler, stream)
+ self._streams = [
+ stream for stream in self._streams
+ if not stream.is_closed()
+ ]
+ self._streams.append(stream)
def _handler(self, stream):
try:
self.recv_state = State.DONE
return body
- def complete(self):
+ def reset(self) -> bool:
is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE
is_keepalive = self.send_keep_alive and self.recv_keep_alive
if not (is_fully_complete and is_keepalive):
self.close()
- return
+ return False
if self.mode == Mode.CLIENT:
self.send_state = State.SEND_METHOD_LINE
self.send_keep_alive = True
self.recv_keep_alive = True
self.processing_1xx = False
+ return True
def close(self):
if self.send_state != State.CLOSED:
self.recv_state = State.CLOSED
self.stream.close()
+ def is_keepalive(self) -> bool:
+ return (
+ self.send_keep_alive and
+ self.recv_keep_alive and
+ self.send_state != State.CLOSED
+ )
+
def is_idle(self) -> bool:
return (
self.send_state == State.SEND_METHOD_LINE or
self._send_head(request)
self._send_body(request)
code, headers = self._recv_head()
- stream = HTTPStream(self._recv_body, self._complete)
+ stream = HTTPStream(self._recv_body, self._reset)
# TODO...
return Response(code, headers=headers, content=stream)
# finally:
def _recv_body(self) -> bytes:
return self._parser.recv_body()
- # Request/response cycle complete...
- def _complete(self) -> None:
- self._parser.complete()
+ # Request/response cycle reset...
+ def _reset(self) -> None:
+ self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
def _close(self) -> None:
try:
while not self._parser.is_closed():
method, url, headers = self._recv_head()
- stream = HTTPStream(self._recv_body, self._complete)
+ stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
with Request(method, url, headers=headers, content=stream) as request:
try:
except Exception:
logger.error("Internal Server Error", exc_info=True)
content = Text("Internal Server Error")
- err = Response(code=500, content=content)
+ err = Response(500, content=content)
self._send_head(err)
self._send_body(err)
else:
self._send_head(response)
self._send_body(response)
+ if self._parser.is_keepalive():
+ stream.read()
+ self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
self._parser.send_body(b'')
# Start it all over again...
- def _complete(self):
- self._parser.complete
+ def _reset(self):
+ self._parser.reset()
self._idle_expiry = time.monotonic() + self._keepalive_duration
def wait(self):
while(True):
- sleep(1)
+ try:
+ sleep(1)
+ except KeyboardInterrupt:
+ break
@contextlib.contextmanager
assert terminator == b''
assert not p.is_idle()
- p.complete()
+ p.reset()
assert p.is_idle()
)
assert not p.is_idle()
- p.complete()
+ p.reset()
assert p.is_idle()
p.recv_body()
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
- p.complete()
+ p.reset()
assert repr(p) == "<HTTPParser [client SEND_METHOD_LINE, server WAIT]>"
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
- p.complete()
+ p.reset()
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"
assert p.is_closed()
assert terminator == b""
assert repr(p) == "<HTTPParser [client DONE, server DONE]>"
- p.complete()
+ p.reset()
assert repr(p) == "<HTTPParser [client CLOSED, server CLOSED]>"