import threading
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import Any, AsyncIterator, Dict, Generic, Iterator, List, Match
+from typing import Any, AsyncIterator, Dict, Generic, Iterator, List, Match, IO
from typing import Optional, Sequence, Tuple, Type, TypeVar, Union, TYPE_CHECKING
from . import pq
super().finish(exc)
+class FileWriter(Writer):
+ """
+ A `Writer` to write copy data to a file-like object.
+
+ The file must be open for writing in binary mode.
+ """
+
+ def __init__(self, file: IO[bytes]):
+ self.file = file
+
+ def write(self, data: Buffer) -> None:
+ self.file.write(data)
+
+
class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation."""
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.copy import Copy, Writer, LibpqWriter, QueuedLibpqDriver
+from psycopg.copy import Copy, LibpqWriter, QueuedLibpqDriver, FileWriter
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
from psycopg.types.hstore import register_hstore
def test_copy_in_format(conn):
- writer = BytesWriter()
+ file = BytesIO()
conn.execute("set client_encoding to utf8")
cur = conn.cursor()
- with Copy(cur, writer=writer) as copy:
+ with Copy(cur, writer=FileWriter(file)) as copy:
for i in range(1, 256):
copy.write_row((i, chr(i)))
- writer.file.seek(0)
- rows = writer.file.read().split(b"\n")
+ file.seek(0)
+ rows = file.read().split(b"\n")
assert not rows[-1]
del rows[-1]
block = block.encode()
m.update(block)
return m.hexdigest()
-
-
-class BytesWriter(Writer):
- def __init__(self):
- self.file = BytesIO()
-
- def write(self, data):
- self.file.write(data)