def __init__(self, cursor: "Cursor[Any]", *, writer: Optional["Writer"] = None):
super().__init__(cursor)
if not writer:
- writer = QueueWriter(cursor)
+ writer = LibpqWriter(cursor)
self.writer = writer
self._write = writer.write
super().__init__(cursor)
if not writer:
- writer = AsyncQueueWriter(cursor)
+ writer = AsyncLibpqWriter(cursor)
self.writer = writer
self._write = writer.write
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
+from psycopg.copy import Copy, Writer, LibpqWriter, QueueWriter
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
from psycopg.types.hstore import register_hstore
writer = BytesWriter()
conn.execute("set client_encoding to utf8")
cur = conn.cursor()
- with psycopg.copy.Copy(cur, writer=writer) as copy:
+ with Copy(cur, writer=writer) as copy:
for i in range(1, 256):
copy.write_row((i, chr(i)))
def test_worker_life(conn, format, buffer):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
- with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
+ with cur.copy(
+ f"copy copy_in from stdin (format {format.name})", writer=QueueWriter(cur)
+ ) as copy:
assert not copy.writer._worker
copy.write(globals()[buffer])
assert copy.writer._worker
cur = conn.cursor()
cur.execute("create temp table wat (a text, b text)")
with pytest.raises(ZeroDivisionError):
- with cur.copy("copy wat from stdin") as copy:
+ with cur.copy("copy wat from stdin", writer=QueueWriter(cur)) as copy:
copy.write("a,b")
)
def test_connection_writer(conn, format, buffer):
cur = conn.cursor()
- writer = psycopg.copy.LibpqWriter(cur)
+ writer = LibpqWriter(cur)
ensure_table(cur, sample_tabledef)
with cur.copy(
return m.hexdigest()
-class BytesWriter(psycopg.copy.Writer):
+class BytesWriter(Writer):
def __init__(self):
self.file = BytesIO()
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
+from psycopg.copy import AsyncCopy, AsyncWriter, AsyncLibpqWriter, AsyncQueueWriter
from psycopg.types import TypeInfo
from psycopg.adapt import PyFormat
from psycopg.types.hstore import register_hstore
writer = AsyncBytesWriter()
await aconn.execute("set client_encoding to utf8")
cur = aconn.cursor()
- async with psycopg.copy.AsyncCopy(cur, writer=writer) as copy:
+ async with AsyncCopy(cur, writer=writer) as copy:
for i in range(1, 256):
await copy.write_row((i, chr(i)))
async def test_worker_life(aconn, format, buffer):
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
- async with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
+ async with cur.copy(
+ f"copy copy_in from stdin (format {format.name})", writer=AsyncQueueWriter(cur)
+ ) as copy:
assert not copy.writer._worker
await copy.write(globals()[buffer])
assert copy.writer._worker
cur = aconn.cursor()
await cur.execute("create temp table wat (a text, b text)")
with pytest.raises(ZeroDivisionError):
- async with cur.copy("copy wat from stdin") as copy:
+ async with cur.copy(
+ "copy wat from stdin", writer=AsyncQueueWriter(cur)
+ ) as copy:
await copy.write("a,b")
)
async def test_connection_writer(aconn, format, buffer):
cur = aconn.cursor()
- writer = psycopg.copy.AsyncLibpqWriter(cur)
+ writer = AsyncLibpqWriter(cur)
await ensure_table(cur, sample_tabledef)
async with cur.copy(
return m.hexdigest()
-class AsyncBytesWriter(psycopg.copy.AsyncWriter):
+class AsyncBytesWriter(AsyncWriter):
def __init__(self):
self.file = BytesIO()