self._autocommit = value
def cursor(
- self, name: Optional[str] = None, binary: bool = False
+ self, name: Optional[str] = None, format: pq.Format = pq.Format.TEXT
) -> cursor.BaseCursor:
if name is not None:
raise NotImplementedError
- return self.cursor_factory(self, binary=binary)
+ return self.cursor_factory(self, format=format)
@property
def codec(self) -> codecs.CodecInfo:
self.pgconn.finish()
def cursor(
- self, name: Optional[str] = None, binary: bool = False
+ self, name: Optional[str] = None, format: pq.Format = pq.Format.TEXT
) -> cursor.Cursor:
- cur = super().cursor(name, binary)
+ cur = super().cursor(name, format=format)
return cast(cursor.Cursor, cur)
def _start_query(self) -> None:
self.pgconn.finish()
def cursor(
- self, name: Optional[str] = None, binary: bool = False
+ self, name: Optional[str] = None, format: pq.Format = pq.Format.TEXT
) -> cursor.AsyncCursor:
- cur = super().cursor(name, binary)
+ cur = super().cursor(name, format=format)
return cast(cursor.AsyncCursor, cur)
async def _start_query(self) -> None:
_transformer: proto.Transformer
- def __init__(self, connection: "BaseConnection", binary: bool = False):
+ def __init__(
+ self, connection: "BaseConnection", format: pq.Format = pq.Format.TEXT
+ ):
self.connection = connection
- self.binary = binary
+ self.format = format
self.dumpers: DumpersMap = {}
self.loaders: LoadersMap = {}
self._reset()
pgq.params,
param_formats=pgq.formats,
param_types=pgq.types,
- result_format=pq.Format(self.binary),
+ result_format=self.format,
)
else:
# if we don't have to, let's use exec_ as it can run more than
# one query in one go
- if self.binary:
+ if self.format == pq.Format.BINARY:
self.connection.pgconn.send_query_params(
- pgq.query, None, result_format=pq.Format(self.binary)
+ pgq.query, None, result_format=self.format
)
else:
self.connection.pgconn.send_query(pgq.query)
name,
pgq.params,
param_formats=pgq.formats,
- result_format=pq.Format(self.binary),
+ result_format=self.format,
)
def nextset(self) -> Optional[bool]:
class Cursor(BaseCursor):
connection: "Connection"
- def __init__(self, connection: "Connection", binary: bool = False):
- super().__init__(connection, binary)
+ def __init__(
+ self, connection: "Connection", format: pq.Format = pq.Format.TEXT
+ ):
+ super().__init__(connection, format=format)
def close(self) -> None:
self._closed = True
class AsyncCursor(BaseCursor):
connection: "AsyncConnection"
- def __init__(self, connection: "AsyncConnection", binary: bool = False):
- super().__init__(connection, binary)
+ def __init__(
+ self, connection: "AsyncConnection", format: pq.Format = pq.Format.TEXT
+ ):
+ super().__init__(connection, format=format)
async def close(self) -> None:
self._closed = True
from typing import Any, Callable, Generator, Sequence, Tuple
from typing import Optional, TYPE_CHECKING
-from . import array
+from .. import pq
from ..adapt import Format, Dumper, Loader, Transformer
from ..proto import AdaptContext
+from . import array
from .oids import builtins, TypeInfo
if TYPE_CHECKING:
def fetch_info(conn: "Connection", name: str) -> Optional[CompositeTypeInfo]:
- cur = conn.cursor(binary=True)
+ cur = conn.cursor(format=pq.Format.BINARY)
cur.execute(_type_info_query, {"name": name})
rec = cur.fetchone()
return CompositeTypeInfo._from_record(rec)
async def fetch_info_async(
conn: "AsyncConnection", name: str
) -> Optional[CompositeTypeInfo]:
- cur = conn.cursor(binary=True)
+ cur = conn.cursor(format=pq.Format.BINARY)
await cur.execute(_type_info_query, {"name": name})
rec = await cur.fetchone()
return CompositeTypeInfo._from_record(rec)
r = conn.cursor().execute("select 'hello'::text").fetchone()
assert r == ("hellot",)
- r = conn.cursor(binary=True).execute("select 'hello'::text").fetchone()
+ r = conn.cursor(format=1).execute("select 'hello'::text").fetchone()
assert r == ("hellob",)
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellotc",)
- cur.binary = True
+ cur.format = Format.BINARY
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellobc",)
cur = conn.cursor()
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellot",)
- cur.binary = True
+ cur.format = Format.BINARY
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellob",)
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_cursor_ctx_nested(conn, sql, obj, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
Loader.register(
TEXT_OID, lambda b: b.decode("ascii") + "c", cur, format=fmt_out
)
def test_execute_binary_result(conn):
- cur = conn.cursor(binary=True)
+ cur = conn.cursor(format=psycopg3.pq.Format.BINARY)
cur.execute("select %s, %s", ["foo", None])
assert cur.pgresult.fformat(0) == 1
async def test_execute_binary_result(aconn):
- cur = aconn.cursor(binary=True)
+ cur = aconn.cursor(format=psycopg3.pq.Format.BINARY)
await cur.execute("select %s, %s", ["foo", None])
assert cur.pgresult.fformat(0) == 1
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_str)
def test_load_list_str(conn, obj, want, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
cur.execute("select %s::text[]", (obj,))
assert cur.fetchone()[0] == want
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_all_chars(conn, fmt_in, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(1, 256):
c = chr(i)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_int)
def test_load_list_int(conn, obj, want, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
cur.execute("select %s::int[]", (obj,))
assert cur.fetchone()[0] == want
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_all_chars(conn, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
for i in range(1, 256):
res = cur.execute("select row(chr(%s::int))", (i,)).fetchone()[0]
assert res == (chr(i),)
],
)
def test_load_record_binary(conn, want, rec):
- cur = conn.cursor(binary=True)
+ cur = conn.cursor(format=1)
res = cur.execute(f"select row({rec})").fetchone()[0]
assert res == want
for o1, o2 in zip(res, want):
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_composite(conn, testcomp, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
info = composite.fetch_info(conn, "testcomp")
composite.register(info, conn)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_composite_factory(conn, testcomp, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
info = composite.fetch_info(conn, "testcomp")
class MyThing:
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_int(conn, val, pgtype, want, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
assert cur.pgresult.ftype(0) == builtins[pgtype].oid
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_float(conn, val, pgtype, want, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
assert cur.pgresult.ftype(0) == builtins[pgtype].oid
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_float_approx(conn, expr, pgtype, want, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
cur.execute("select %s::%s" % (expr, pgtype))
assert cur.pgresult.fformat(0) == fmt_out
result = cur.fetchone()[0]
@pytest.mark.xfail
def test_load_numeric_binary(conn):
# TODO: numeric binary casting
- cur = conn.cursor(binary=True)
+ cur = conn.cursor(format=1)
res = cur.execute("select 1::numeric").fetchone()[0]
assert res == Decimal(1)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("b", [True, False, None])
def test_roundtrip_bool(conn, b, fmt_in, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
ph = "%s" if fmt_in == Format.TEXT else "%b"
result = cur.execute(f"select {ph}", (b,)).fetchone()[0]
assert cur.pgresult.fformat(0) == fmt_out
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_1char(conn, typename, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
for i in range(1, 256):
cur.execute(f"select chr(%s::int)::{typename}", (i,))
res = cur.fetchone()[0]
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_enc(conn, typename, encoding, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
conn.set_client_encoding(encoding)
(res,) = cur.execute(
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_badenc(conn, typename, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
conn.set_client_encoding("latin1")
with pytest.raises(psycopg3.DatabaseError):
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar"])
def test_load_ascii(conn, typename, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
conn.set_client_encoding("sql_ascii")
(res,) = cur.execute(
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["name", "bpchar"])
def test_load_ascii_encanyway(conn, typename, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
conn.set_client_encoding("sql_ascii")
(res,) = cur.execute(f"select 'aa'::{typename}").fetchone()
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_text_array(conn, typename, fmt_in, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
ph = "%s" if fmt_in == Format.TEXT else "%b"
a = list(map(chr, range(1, 256))) + [eur]
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_text_array_ascii(conn, fmt_in, fmt_out):
conn.set_client_encoding("sql_ascii")
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
a = list(map(chr, range(1, 256))) + [eur]
exp = [s.encode("utf8") for s in a]
ph = "%s" if fmt_in == Format.TEXT else "%b"
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_load_1byte(conn, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
for i in range(0, 256):
cur.execute("select %s::bytea", (fr"\x{i:02x}",))
assert cur.fetchone()[0] == bytes([i])
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_bytea_array(conn, fmt_in, fmt_out):
- cur = conn.cursor(binary=fmt_out == Format.BINARY)
+ cur = conn.cursor(format=fmt_out)
a = [bytes(range(0, 256))]
ph = "%s" if fmt_in == Format.TEXT else "%b"
(res,) = cur.execute(f"select {ph}::bytea[]", (a,)).fetchone()