text = response.text
else:
text = response.text
+
syntax = rich.syntax.Syntax(text, lexer_name, theme="ansi_dark", word_wrap=True)
console.print(syntax)
- else: # pragma: nocover
- console.print(rich.markup.escape(response.text))
+ else:
+ console.print(f"<{len(response.content)} bytes of binary data>")
def format_certificate(cert: dict) -> str: # pragma: nocover
await status_code(scope, receive, send)
elif scope["path"].startswith("/echo_body"):
await echo_body(scope, receive, send)
+ elif scope["path"].startswith("/echo_binary"):
+ await echo_binary(scope, receive, send)
elif scope["path"].startswith("/echo_headers"):
await echo_headers(scope, receive, send)
elif scope["path"].startswith("/redirect_301"):
await send({"type": "http.response.body", "body": body})
+async def echo_binary(scope, receive, send):
+ body = b""
+ more_body = True
+
+ while more_body:
+ message = await receive()
+ body += message.get("body", b"")
+ more_body = message.get("more_body", False)
+
+ await send(
+ {
+ "type": "http.response.start",
+ "status": 200,
+ "headers": [[b"content-type", b"application/octet-stream"]],
+ }
+ )
+ await send({"type": "http.response.body", "body": body})
+
+
async def echo_headers(scope, receive, send):
body = {
name.capitalize().decode(): value.decode()
]
+def test_binary(server):
+ url = str(server.url.copy_with(path="/echo_binary"))
+ runner = CliRunner()
+ content = "Hello, world!"
+ result = runner.invoke(httpx.main, [url, "-c", content])
+ assert result.exit_code == 0
+ assert remove_date_header(splitlines(result.output)) == [
+ "HTTP/1.1 200 OK",
+ "server: uvicorn",
+ "content-type: application/octet-stream",
+ "Transfer-Encoding: chunked",
+ "",
+ f"<{len(content)} bytes of binary data>",
+ ]
+
+
def test_redirects(server):
url = str(server.url.copy_with(path="/redirect_301"))
runner = CliRunner()