* Make test fail when WSGI iterable is not closed
* Close WSGI iterable when WSGIByteStream is closed
class WSGIByteStream(SyncByteStream):
def __init__(self, result: typing.Iterable[bytes]) -> None:
+ self._close = getattr(result, "close", None)
self._result = _skip_leading_empty_chunks(result)
def __iter__(self) -> typing.Iterator[bytes]:
for part in self._result:
yield part
+ def close(self) -> None:
+ if self._close is not None:
+ self._close()
+
class WSGITransport(BaseTransport):
"""
import sys
+import wsgiref.validate
from functools import partial
import pytest
for item in output:
yield item
- return application
+ return wsgiref.validate.validator(application)
def echo_body(environ, start_response):