)
from ._utils import (
NetRCInfo,
+ Timer,
URLPattern,
get_environment_proxies,
get_logger,
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
+ timer = Timer()
+ timer.sync_start()
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(
headers=headers,
stream=stream, # type: ignore
request=request,
+ elapsed_func=timer.sync_elapsed,
)
self.cookies.extract_cookies(response)
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
+ timer = Timer()
+ await timer.async_start()
with map_exceptions(HTTPCORE_EXC_MAP, request=request):
(
headers=headers,
stream=stream, # type: ignore
request=request,
+ elapsed_func=timer.async_elapsed,
)
self.cookies.extract_cookies(response)
URLTypes,
)
from ._utils import (
- ElapsedTimer,
flatten_queryparams,
guess_json_utf,
is_known_encoding,
else:
self.stream = encode(data, files, json)
- self.timer = ElapsedTimer()
self.prepare()
def prepare(self) -> None:
stream: ContentStream = None,
content: bytes = None,
history: typing.List["Response"] = None,
+ elapsed_func: typing.Callable = None,
):
self.status_code = status_code
self.http_version = http_version
self.call_next: typing.Optional[typing.Callable] = None
self.history = [] if history is None else list(history)
+ self._elapsed_func = elapsed_func
self.is_closed = False
self.is_stream_consumed = False
"'.elapsed' may only be accessed after the response "
"has been read or closed."
)
- return self._elapsed
+ return datetime.timedelta(seconds=self._elapsed)
@property
def request(self) -> Request:
"""
if not self.is_closed:
self.is_closed = True
- if self._request is not None:
- self._elapsed = self.request.timer.elapsed
+ if self._elapsed_func is not None:
+ self._elapsed = self._elapsed_func()
self._raw_stream.close()
async def aread(self) -> bytes:
"""
if not self.is_closed:
self.is_closed = True
- if self._request is not None:
- self._elapsed = self.request.timer.elapsed
+ if self._elapsed_func is not None:
+ self._elapsed = await self._elapsed_func()
await self._raw_stream.aclose()
import os
import re
import sys
+import time
import typing
import warnings
-from datetime import timedelta
from pathlib import Path
-from time import perf_counter
-from types import TracebackType
from urllib.request import getproxies
+import sniffio
+
from ._types import PrimitiveData
if typing.TYPE_CHECKING: # pragma: no cover
return items
-class ElapsedTimer:
- def __init__(self) -> None:
- self.start: float = perf_counter()
- self.end: typing.Optional[float] = None
+class Timer:
+ async def _get_time(self) -> float:
+ library = sniffio.current_async_library()
+ if library == "trio":
+ import trio
- def __enter__(self) -> "ElapsedTimer":
- self.start = perf_counter()
- return self
+ return trio.current_time()
+ elif library == "curio": # pragma: nocover
+ import curio
- def __exit__(
- self,
- exc_type: typing.Type[BaseException] = None,
- exc_value: BaseException = None,
- traceback: TracebackType = None,
- ) -> None:
- self.end = perf_counter()
+ return await curio.clock()
- @property
- def elapsed(self) -> timedelta:
- if self.end is None:
- return timedelta(seconds=perf_counter() - self.start)
- return timedelta(seconds=self.end - self.start)
+ import asyncio
+
+ return asyncio.get_event_loop().time()
+
+ def sync_start(self) -> None:
+ self.started = time.perf_counter()
+
+ async def async_start(self) -> None:
+ self.started = await self._get_time()
+
+ def sync_elapsed(self) -> float:
+ now = time.perf_counter()
+ return now - self.started
+
+ async def async_elapsed(self) -> float:
+ now = await self._get_time()
+ return now - self.started
class URLPattern:
-import datetime
import json
from unittest import mock
assert response.text == "Hello, world!"
assert response.request.method == "GET"
assert response.request.url == "https://example.org"
- assert response.elapsed >= datetime.timedelta(0)
assert not response.is_error
-import asyncio
import os
import random
import httpx
from httpx._utils import (
- ElapsedTimer,
NetRCInfo,
URLPattern,
get_ca_bundle_from_env,
assert get_ca_bundle_from_env() is None
-@pytest.mark.asyncio
-async def test_elapsed_timer():
- with ElapsedTimer() as timer:
- assert timer.elapsed.total_seconds() == pytest.approx(0, abs=0.05)
- await asyncio.sleep(0.1)
- await asyncio.sleep(
- 0.1
- ) # test to ensure time spent after timer exits isn't accounted for.
- assert timer.elapsed.total_seconds() == pytest.approx(0.1, abs=0.05)
-
-
@pytest.mark.parametrize(
["environment", "proxies"],
[