import h2.connection
import h2.events
-from ..concurrency.base import BaseStream, ConcurrencyBackend, TimeoutFlag, BaseEvent
+from ..concurrency.base import BaseEvent, BaseStream, ConcurrencyBackend, TimeoutFlag
from ..config import TimeoutConfig, TimeoutTypes
from ..models import AsyncRequest, AsyncResponse
from ..utils import get_logger
if url.origin != request.url.origin:
# Strip Authorization headers when responses are redirected away from
# the origin.
- del headers["Authorization"]
+ headers.pop("Authorization", None)
headers["Host"] = url.authority
if method != request.method and method == "GET":
# If we've switch to a 'GET' request, then strip any headers which
# are only relevant to the request body.
- del headers["Content-Length"]
- del headers["Transfer-Encoding"]
-
+ headers.pop("Content-Length", None)
+ headers.pop("Transfer-Encoding", None)
return headers
def redirect_content(self, request: AsyncRequest, method: str) -> bytes:
for idx, (item_key, _) in enumerate(self._list):
if item_key == del_key:
pop_indexes.append(idx)
+ if not pop_indexes:
+ raise KeyError(key)
for idx in reversed(pop_indexes):
del self._list[idx]
assert hasattr(data, "__aiter__")
self.is_streaming = True
self.content_aiter = data
-
self.prepare()
async def read(self) -> bytes:
import json
+import pytest
+
from httpx import (
AsyncDispatcher,
AsyncRequest,
TimeoutTypes,
VerifyTypes,
__version__,
+ models,
)
"user-agent": "python-myclient/0.2.1",
}
}
+
+
+def test_header_does_not_exist():
+ headers = models.Headers({"foo": "bar"})
+ with pytest.raises(KeyError):
+ del headers["baz"]