import time
import pytest
-import asyncio
import logging
-import sys
import weakref
import psycopg
from psycopg import AsyncConnection, Notify
from psycopg.rows import tuple_row
-from psycopg.errors import InterfaceError, UndefinedTable
+from psycopg.errors import UndefinedTable
from psycopg.conninfo import conninfo_to_dict, make_conninfo
from .utils import gc_collect
cur = await aconn2.execute("select %b", ["hello"])
assert (await cur.fetchone())[0] == "hellob" # type: ignore[index]
await aconn2.close()
-
-
-@pytest.mark.skipif(sys.platform != "win32", reason="windows only test")
-def test_windows_error(dsn):
- loop = asyncio.ProactorEventLoop() # type: ignore[attr-defined]
-
- async def go():
- with pytest.raises(
- InterfaceError,
- match="Psycopg cannot use the 'ProactorEventLoop'",
- ):
- await psycopg.AsyncConnection.connect(dsn)
-
- try:
- loop.run_until_complete(go())
- finally:
- loop.run_until_complete(loop.shutdown_asyncgens())
- loop.close()
--- /dev/null
+import pytest
+import asyncio
+import sys
+
+import psycopg
+from psycopg.errors import InterfaceError
+
+
+@pytest.mark.skipif(sys.platform != "win32", reason="windows only test")
+def test_windows_error(dsn):
+ loop = asyncio.ProactorEventLoop() # type: ignore[attr-defined]
+
+ async def go():
+ with pytest.raises(
+ InterfaceError,
+ match="Psycopg cannot use the 'ProactorEventLoop'",
+ ):
+ await psycopg.AsyncConnection.connect(dsn)
+
+ try:
+ loop.run_until_complete(go())
+ finally:
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ loop.close()