self.cursor._rowcount = nrows if nrows is not None else -1
-class QueueWriter(LibpqWriter):
+class QueuedLibpqDriver(LibpqWriter):
"""
A writer using a buffer to queue data to write to a Postgres database.
self.cursor._rowcount = nrows if nrows is not None else -1
-class AsyncQueueWriter(AsyncLibpqWriter):
+class AsyncQueuedLibpqWriter(AsyncLibpqWriter):
"""
An `AsyncWriter` using a buffer to queue data to write.
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.copy import Copy, Writer, LibpqWriter, QueueWriter
+from psycopg.copy import Copy, Writer, LibpqWriter, QueuedLibpqDriver
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
from psycopg.types.hstore import register_hstore
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(
- f"copy copy_in from stdin (format {format.name})", writer=QueueWriter(cur)
+ f"copy copy_in from stdin (format {format.name})", writer=QueuedLibpqDriver(cur)
) as copy:
assert not copy.writer._worker
copy.write(globals()[buffer])
cur = conn.cursor()
cur.execute("create temp table wat (a text, b text)")
with pytest.raises(ZeroDivisionError):
- with cur.copy("copy wat from stdin", writer=QueueWriter(cur)) as copy:
+ with cur.copy("copy wat from stdin", writer=QueuedLibpqDriver(cur)) as copy:
copy.write("a,b")
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.copy import AsyncCopy, AsyncWriter, AsyncLibpqWriter, AsyncQueueWriter
+from psycopg.copy import AsyncCopy
+from psycopg.copy import AsyncWriter, AsyncLibpqWriter, AsyncQueuedLibpqWriter
from psycopg.types import TypeInfo
from psycopg.adapt import PyFormat
from psycopg.types.hstore import register_hstore
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy(
- f"copy copy_in from stdin (format {format.name})", writer=AsyncQueueWriter(cur)
+ f"copy copy_in from stdin (format {format.name})",
+ writer=AsyncQueuedLibpqWriter(cur),
) as copy:
assert not copy.writer._worker
await copy.write(globals()[buffer])
await cur.execute("create temp table wat (a text, b text)")
with pytest.raises(ZeroDivisionError):
async with cur.copy(
- "copy wat from stdin", writer=AsyncQueueWriter(cur)
+ "copy wat from stdin", writer=AsyncQueuedLibpqWriter(cur)
) as copy:
await copy.write("a,b")