We often find a "delete" record more than once in the changefeed, before
deleting it. Tolerate one.
cur.execute("cancel query %s", [qid])
assert cur.statusmessage == "CANCEL QUERIES 1"
- assert q.get(timeout=1) is None
+ # We often find the record with {"after": null} at least another time
+ # in the queue. Let's tolerate an extra one.
+ for i in range(2):
+ row = q.get(timeout=1)
+ if row is None:
+ break
+ assert json.loads(row.value)["after"] is None, json
+ else:
+ pytest.fail("keep on receiving messages")
+
t.join()
await cur.execute("cancel query %s", [qid])
assert cur.statusmessage == "CANCEL QUERIES 1"
- assert await asyncio.wait_for(q.get(), 1.0) is None
+ # We often find the record with {"after": null} at least another time
+ # in the queue. Let's tolerate an extra one.
+ for i in range(2):
+ row = await asyncio.wait_for(q.get(), 1.0)
+ if row is None:
+ break
+ assert json.loads(row.value)["after"] is None, json
+ else:
+ pytest.fail("keep on receiving messages")
+
await asyncio.gather(t)