.. code:: python
- from psycopg3.oids import builtins
-
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
- copy.set_types([builtins["int4"].oid, builtins["date"].oid])
+ copy.set_types(["int4", "date"])
for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
.. admonition:: TODO
- Document the `!builtins` register... but more likely do something
- better such as allowing to pass type names, unifying `TypeRegistry` and
- `AdaptContext`, none of which I have documented, so you haven't seen
- anything... 👀
+ Currently only builtin names are recognised; custom types must be
+ specified by numeric oid. This wll change after the `TypeRegistry` and
+ `AdaptContext` get integrated, none of which I have documented, so you
+ haven't seen anything... 👀
Copying block-by-block
from . import pq
from . import errors as e
from .pq import ExecStatus
+from .oids import builtins
from .adapt import Format
from .proto import ConnectionType, PQGen, Transformer
from .generators import copy_from, copy_to, copy_end
if self._finished:
raise TypeError("copy blocks can be used only once")
- def set_types(self, types: Sequence[int]) -> None:
+ def set_types(self, types: Sequence[Union[int, str]]) -> None:
"""
Set the types expected out of a :sql:`COPY TO` operation.
Without setting the types, the data from :sql:`COPY TO` will be
returned as unparsed strings or bytes.
+
+ The types must be specified as a sequence of oid or PostgreSQL type
+ names (e.g. ``int4``, ``timestamptz[]``).
+
+ .. admonition:: TODO
+
+ Only builtin names are supprted for the moment. In order to specify
+ custom data types you must use their oid.
+
"""
+ # TODO: should allow names of non-builtin types
+ # Must put a types map on the context.
+ oids = [
+ t if isinstance(t, int) else builtins.get_oid(t) for t in types
+ ]
self.formatter.transformer.set_row_types(
- types, [self.formatter.format] * len(types)
+ oids, [self.formatter.format] * len(types)
)
# High level copy protocol generators (state change of the Copy object)
def __getitem__(self, key: Union[str, int]) -> TypeInfo:
if isinstance(key, str):
+ if key.endswith("[]"):
+ key = key[:-2]
return self._by_name[key]
elif isinstance(key, int):
return self._by_oid[key]
except KeyError:
return None
+ def get_oid(self, name: str) -> int:
+ t = self[name]
+ if name.endswith("[]"):
+ return t.array_oid
+ else:
+ return t.oid
+
builtins = TypesRegistry()
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
-def test_read_rows(conn, format):
+@pytest.mark.parametrize("typetype", ["names", "oids"])
+def test_read_rows(conn, format, typetype):
cur = conn.cursor()
with cur.copy(
- f"copy ({sample_values}) to stdout (format {format.name})"
+ f"""copy (
+ select 10::int4, 'hello'::text, '{{0.0,1.0}}'::float8[]
+ ) to stdout (format {format.name})"""
) as copy:
- # TODO: should be passed by name
- # big refactoring to be had, to have builtins not global and merged
- # to adaptation context I guess...
- copy.set_types(
- [builtins["int4"].oid, builtins["int4"].oid, builtins["text"].oid]
- )
- rows = []
- while 1:
- row = copy.read_row()
- if not row:
- break
- rows.append(row)
-
- assert rows == sample_records
+ types = ["int4", "text", "float8[]"]
+ if typetype == "oids":
+ types = [builtins.get_oid(t) for t in types]
+ copy.set_types(types)
+ row = copy.read_row()
+ assert copy.read_row() is None
+
+ assert row == (10, "hello", [0.0, 1.0])
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS