nreceived = 0
+ if self._notify_handlers:
+ warnings.warn(
+ "using 'notifies()' together with notifies handlers on the same connection is not reliable. Please use only one of thees methods",
+ RuntimeWarning,
+ )
+
with self.lock:
enc = self.pgconn._encoding
nreceived = 0
+ if self._notify_handlers:
+ warnings.warn(
+ "using 'notifies()' together with notifies handlers on the"
+ " same connection is not reliable."
+ " Please use only one of thees methods",
+ RuntimeWarning,
+ )
+
async with self.lock:
enc = self.pgconn._encoding
@pytest.mark.slow
-def test_generator_and_handler(conn, conn_cls, dsn):
+def test_generator_and_handler(conn, conn_cls, dsn, recwarn):
+ # NOTE: we don't support generator+handlers anymore. So, if in the future
+ # this behaviour will change, we will not consider it a regression. However
+ # we will want to keep the warning check.
+
+ recwarn.clear()
+
conn.set_autocommit(True)
conn.execute("listen foo")
assert n1
assert n2
+ msg = str(recwarn.pop(RuntimeWarning).message)
+ assert "notifies()" in msg
+
@pytest.mark.parametrize("query_between", [True, False])
def test_first_notify_not_lost(conn, conn_cls, dsn, query_between):
conn.execute("select pg_notify('counter', %s)", (str(i),))
sleep(0.2)
- def listener():
- with conn_cls.connect(dsn, autocommit=True) as conn:
- if listen_by == "callback":
+ def nap(conn):
+ if sleep_on == "server":
+ conn.execute("select pg_sleep(0.2)")
+ else:
+ assert sleep_on == "client"
+ sleep(0.2)
+
+ if listen_by == "callback":
+
+ def listener():
+ with conn_cls.connect(dsn, autocommit=True) as conn:
conn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
- conn.execute("listen counter")
- e.set()
- for n in conn.notifies(timeout=0.2):
- if listen_by == "generator":
+ conn.execute("listen counter")
+ e.set()
+
+ nap(conn)
+ conn.execute("")
+ nap(conn)
+ conn.execute("")
+ nap(conn)
+ conn.execute("")
+
+ else:
+
+ def listener():
+ with conn_cls.connect(dsn, autocommit=True) as conn:
+ conn.execute("listen counter")
+ e.set()
+ for n in conn.notifies(timeout=0.2):
notifies.append(int(n.payload))
- if sleep_on == "server":
- conn.execute("select pg_sleep(0.2)")
- else:
- assert sleep_on == "client"
- sleep(0.2)
+ nap(conn)
- for n in conn.notifies(timeout=0.2):
- if listen_by == "generator":
+ for n in conn.notifies(timeout=0.2):
notifies.append(int(n.payload))
workers.append(spawn(listener))
@pytest.mark.slow
-async def test_generator_and_handler(aconn, aconn_cls, dsn):
+async def test_generator_and_handler(aconn, aconn_cls, dsn, recwarn):
+ # NOTE: we don't support generator+handlers anymore. So, if in the future
+ # this behaviour will change, we will not consider it a regression. However
+ # we will want to keep the warning check.
+
+ recwarn.clear()
+
await aconn.set_autocommit(True)
await aconn.execute("listen foo")
assert n1
assert n2
+ msg = str(recwarn.pop(RuntimeWarning).message)
+ assert "notifies()" in msg
+
@pytest.mark.parametrize("query_between", [True, False])
async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between):
await aconn.execute("select pg_notify('counter', %s)", (str(i),))
await asleep(0.2)
- async def listener():
- async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
- if listen_by == "callback":
+ async def nap(aconn):
+ if sleep_on == "server":
+ await aconn.execute("select pg_sleep(0.2)")
+ else:
+ assert sleep_on == "client"
+ await asleep(0.2)
+
+ if listen_by == "callback":
+
+ async def listener():
+ async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
aconn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
- await aconn.execute("listen counter")
- e.set()
- async for n in aconn.notifies(timeout=0.2):
- if listen_by == "generator":
+ await aconn.execute("listen counter")
+ e.set()
+
+ await nap(aconn)
+ await aconn.execute("")
+ await nap(aconn)
+ await aconn.execute("")
+ await nap(aconn)
+ await aconn.execute("")
+
+ else:
+
+ async def listener():
+ async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
+ await aconn.execute("listen counter")
+ e.set()
+ async for n in aconn.notifies(timeout=0.2):
notifies.append(int(n.payload))
- if sleep_on == "server":
- await aconn.execute("select pg_sleep(0.2)")
- else:
- assert sleep_on == "client"
- await asleep(0.2)
+ await nap(aconn)
- async for n in aconn.notifies(timeout=0.2):
- if listen_by == "generator":
+ async for n in aconn.notifies(timeout=0.2):
notifies.append(int(n.payload))
workers.append(spawn(listener))