#!/usr/bin/env python
-# -*- coding: utf-8 -*-
import os
import re
import functools
import inspect
import re
-import sys
import traceback
import types
import typing
import warnings
+from contextlib import asynccontextmanager
from enum import Enum
from starlette.concurrency import run_in_threadpool
from starlette.types import ASGIApp, Receive, Scope, Send
from starlette.websockets import WebSocket, WebSocketClose
-if sys.version_info >= (3, 7):
- from contextlib import asynccontextmanager # pragma: no cover
-else:
- from contextlib2 import asynccontextmanager # pragma: no cover
-
class NoMatchFound(Exception):
"""
And RFC 2324 - https://tools.ietf.org/html/rfc2324
"""
-import sys
import warnings
from typing import List
}
deprecated = __deprecated__.get(name)
if deprecated:
- stacklevel = 3 if sys.version_info >= (3, 7) else 4
warnings.warn(
f"'{name}' is deprecated. Use '{deprecation_changes[name]}' instead.",
category=DeprecationWarning,
- stacklevel=stacklevel,
+ stacklevel=3,
)
return deprecated
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
import os
-import sys
+from contextlib import asynccontextmanager
import pytest
from starlette.routing import Host, Mount, Route, Router, WebSocketRoute
from starlette.staticfiles import StaticFiles
-if sys.version_info >= (3, 7):
- from contextlib import asynccontextmanager # pragma: no cover
-else:
- from contextlib2 import asynccontextmanager # pragma: no cover
-
async def error_500(request, exc):
return JSONResponse({"detail": "Server Error"}, status_code=500)
-import asyncio
import itertools
-import sys
+from asyncio import current_task as asyncio_current_task
+from contextlib import asynccontextmanager
import anyio
import pytest
from starlette.routing import Route
from starlette.websockets import WebSocket, WebSocketDisconnect
-if sys.version_info >= (3, 7): # pragma: no cover
- from asyncio import current_task as asyncio_current_task
- from contextlib import asynccontextmanager
-else: # pragma: no cover
- asyncio_current_task = asyncio.Task.current_task
- from contextlib2 import asynccontextmanager
-
def mock_service_endpoint(request):
return JSONResponse({"mock": "example"})