* `.cookies` - **Cookies**
* `.history` - **List[Response]**
* `.elapsed` - **[timedelta](https://docs.python.org/3/library/datetime.html)**
- * The amount of time elapsed between sending the first byte and parsing the headers (not including time spent reading
- the response). Use
+ * The amount of time elapsed between sending the request and calling `close()` on the corresponding response received for that request.
[total_seconds()](https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds) to correctly get
the total elapsed seconds.
* `def .raise_for_status()` - **None**
URLTypes,
)
from .status_codes import codes
-from .utils import ElapsedTimer, NetRCInfo, get_environment_proxies, get_logger
+from .utils import NetRCInfo, get_environment_proxies, get_logger
logger = get_logger(__name__)
dispatcher = self.dispatcher_for_url(request.url)
try:
- with ElapsedTimer() as timer:
- response = await dispatcher.send(request, timeout=timeout)
- response.elapsed = timer.elapsed
- response.request = request
+ response = await dispatcher.send(request, timeout=timeout)
except HTTPError as exc:
# Add the original request to any HTTPError unless
# there'a already a request attached in the case of
)
from .status_codes import StatusCode
from .utils import (
+ ElapsedTimer,
flatten_queryparams,
guess_json_utf,
is_known_encoding,
else:
self.stream = encode(data, files, json)
+ self.timer = ElapsedTimer()
self.prepare()
def prepare(self) -> None:
stream: ContentStream = None,
content: bytes = None,
history: typing.List["Response"] = None,
- elapsed: datetime.timedelta = None,
):
self.status_code = status_code
self.http_version = http_version
self.headers = Headers(headers)
self.request = request
- self.elapsed = datetime.timedelta(0) if elapsed is None else elapsed
self.call_next: typing.Optional[typing.Callable] = None
self.history = [] if history is None else list(history)
self.is_closed = True
self.is_stream_consumed = True
self._raw_content = content or b""
+ self._elapsed = request.timer.elapsed
else:
self.is_closed = False
self.is_stream_consumed = False
self._raw_stream = stream
+ @property
+ def elapsed(self) -> datetime.timedelta:
+ """
+ Returns the time taken for the complete request/response
+ cycle to complete.
+ """
+ if not hasattr(self, "_elapsed"):
+ raise RuntimeError(
+ "'.elapsed' may only be accessed after the response "
+ "has been read or closed."
+ )
+ return self._elapsed
+
@property
def reason_phrase(self) -> str:
return StatusCode.get_reason_phrase(self.status_code)
"""
if not self.is_closed:
self.is_closed = True
+ self._elapsed = self.request.timer.elapsed
if hasattr(self, "_raw_stream"):
await self._raw_stream.aclose()
yield b"world!"
-def test_response():
+@pytest.mark.asyncio
+async def test_response():
response = httpx.Response(200, content=b"Hello, world!", request=REQUEST)
+
assert response.status_code == 200
assert response.reason_phrase == "OK"
assert response.text == "Hello, world!"
assert response.request is REQUEST
- assert response.elapsed == datetime.timedelta(0)
+ assert response.elapsed >= datetime.timedelta(0)
assert not response.is_error