`close()` is good enough.
# Copyright (C) 2020 The Psycopg Team
from . import pq
-from .connection import AsyncConnection, Connection
+from .connection import AsyncConnection, Connection, Notify
from .errors import (
Warning,
["Warning", "Error", "InterfaceError", "DatabaseError", "DataError"]
+ ["OperationalError", "IntegrityError", "InternalError"]
+ ["ProgrammingError", "NotSupportedError"]
- + ["AsyncConnection", "Connection", "connect"]
+ + ["AsyncConnection", "Connection", "Notify"]
+ ["BINARY", "DATETIME", "NUMBER", "ROWID", "STRING"]
+ ["Binary", "Date", "DateFromTicks", "Time", "TimeFromTicks"]
+ ["Timestamp", "TimestampFromTicks"]
import asyncio
import threading
from types import TracebackType
-from typing import Any, AsyncGenerator, Callable, Generator, List, NamedTuple
+from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple
from typing import Optional, Type, cast
from weakref import ref, ReferenceType
from functools import partial
if result.status != ExecStatus.TUPLES_OK:
raise e.error_from_result(result, encoding=self._pyenc)
- def notifies(self) -> Generator[Optional[Notify], bool, None]:
+ def notifies(self) -> Iterator[Notify]:
+ """Generate a stream of `Notify`"""
while 1:
with self.lock:
ns = self.wait(notifies(self.pgconn))
pgn.extra.decode(self._pyenc),
pgn.be_pid,
)
- if (yield n):
- yield None # for the send who stopped us
- return
+ yield n
def _set_autocommit(self, value: bool) -> None:
with self.lock:
if result.status != ExecStatus.TUPLES_OK:
raise e.error_from_result(result, encoding=self._pyenc)
- async def notifies(self) -> AsyncGenerator[Optional[Notify], bool]:
+ async def notifies(self) -> AsyncIterator[Notify]:
while 1:
async with self.lock:
ns = await self.wait(notifies(self.pgconn))
pgn.extra.decode(self._pyenc),
pgn.be_pid,
)
- if (yield n):
- yield None
- return
+ yield n
def _set_autocommit(self, value: bool) -> None:
raise AttributeError(
@pytest.mark.slow
def test_notifies(conn, dsn):
- nconn = psycopg3.connect(dsn)
+ nconn = psycopg3.connect(dsn, autocommit=True)
npid = nconn.pgconn.backend_pid
def notifier():
time.sleep(0.25)
- nconn.pgconn.exec_(b"notify foo, '1'")
+ nconn.cursor().execute("notify foo, '1'")
time.sleep(0.25)
- nconn.pgconn.exec_(b"notify foo, '2'")
- nconn.close()
+ nconn.cursor().execute("notify foo, '2'")
+
+ conn.autocommit = True
+ conn.cursor().execute("listen foo")
- conn.pgconn.exec_(b"listen foo")
t0 = time.time()
t = threading.Thread(target=notifier)
t.start()
+
ns = []
gen = conn.notifies()
for n in gen:
ns.append((n, time.time()))
if len(ns) >= 2:
- gen.send(True)
+ gen.close()
+
assert len(ns) == 2
n, t1 = ns[0]
+ assert isinstance(n, psycopg3.Notify)
assert n.pid == npid
assert n.channel == "foo"
assert n.payload == "1"
@pytest.mark.slow
async def test_notifies(aconn, dsn):
- nconn = await psycopg3.AsyncConnection.connect(dsn)
+ nconn = await psycopg3.AsyncConnection.connect(dsn, autocommit=True)
npid = nconn.pgconn.backend_pid
async def notifier():
+ cur = await nconn.cursor()
await asyncio.sleep(0.25)
- nconn.pgconn.exec_(b"notify foo, '1'")
+ await cur.execute("notify foo, '1'")
await asyncio.sleep(0.25)
- nconn.pgconn.exec_(b"notify foo, '2'")
+ await cur.execute("notify foo, '2'")
await nconn.close()
async def receiver():
- aconn.pgconn.exec_(b"listen foo")
+ await aconn.set_autocommit(True)
+ cur = await aconn.cursor()
+ await cur.execute("listen foo")
gen = aconn.notifies()
async for n in gen:
ns.append((n, time.time()))
if len(ns) >= 2:
- gen.send(True)
+ gen.close()
ns = []
t0 = time.time()
import weakref
import psycopg3
-from psycopg3 import Connection
+from psycopg3 import Connection, Notify
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
assert len(nots1) == 1
assert len(nots2) == 2
n = nots2[1]
+ assert isinstance(n, Notify)
assert n.channel == "foo"
assert n.payload == "n2"
assert n.pid == conn.pgconn.backend_pid