They are stable under the current black and look an improvement.
# ProgrammingError would have been more appropriate, but this is
# what the raise if the libpq fails connect in the same case.
raise e.OperationalError(
- f"cannot match {len(hosts_in)} hosts with {len(ports_in)}"
- " port numbers"
+ f"cannot match {len(hosts_in)} hosts with {len(ports_in)} port numbers"
)
ports_out = []
# ProgrammingError would have been more appropriate, but this is
# what the raise if the libpq fails connect in the same case.
raise e.OperationalError(
- f"cannot match {len(hosts_in)} hosts with {len(ports_in)}"
- " port numbers"
+ f"cannot match {len(hosts_in)} hosts with {len(ports_in)} port numbers"
)
out = []
sequence = False
else:
raise TypeError(
- f"query parameters should be a sequence or a mapping,"
+ "query parameters should be a sequence or a mapping,"
f" got {type(vars).__name__}"
)
return [vars[item] for item in order or ()] # type: ignore[call-overload]
except KeyError:
raise e.ProgrammingError(
- f"query parameter missing:"
+ "query parameter missing:"
f" {', '.join(sorted(i for i in order or () if i not in vars))}"
)
if ph == b"%(":
raise e.ProgrammingError(
- f"incomplete placeholder:"
+ "incomplete placeholder:"
f" '{query[m.span(0)[0]:].split()[0].decode(encoding)}'"
)
elif ph == b"% ":
)
elif ph[-1:] not in b"sbt":
raise e.ProgrammingError(
- f"only '%s', '%b', '%t' are allowed as placeholders, got"
+ "only '%s', '%b', '%t' are allowed as placeholders, got"
f" '{m.group(0).decode(encoding)}'"
)
warn(
f"connection {self} was deleted while still open."
- f" Please use 'with' or '.close()' to close the connection",
+ " Please use 'with' or '.close()' to close the connection",
ResourceWarning,
)
if self.pgconn.status == ConnStatus.BAD:
raise e.OperationalError("the connection is closed")
raise e.InterfaceError(
- f"cannot execute operations: the connection is"
+ "cannot execute operations: the connection is"
f" in status {self.pgconn.status}"
)
if self.pgconn.transaction_status != TransactionStatus.IDLE:
raise e.ProgrammingError(
- f"can't start two-phase transaction: connection in status"
+ "can't start two-phase transaction: connection in status"
f" {TransactionStatus(self.pgconn.transaction_status).name}"
)
)
if self._tpc[1]:
raise e.ProgrammingError(
- "'tpc_prepare()' cannot be used during a prepared"
- " two-phase transaction"
+ "'tpc_prepare()' cannot be used during a prepared two-phase transaction"
)
xid = self._tpc[0]
self._tpc = (xid, True)
_unpack_int4 = struct.Struct("!i").unpack_from
_binary_signature = (
- # Signature, flags, extra length
- b"PGCOPY\n\xff\r\n\0"
- b"\x00\x00\x00\x00"
- b"\x00\x00\x00\x00"
+ b"PGCOPY\n\xff\r\n\0" # Signature
+ b"\x00\x00\x00\x00" # flags
+ b"\x00\x00\x00\x00" # extra length
)
_binary_trailer = b"\xff\xff"
_binary_null = b"\xff\xff\xff\xff"
)
else:
raise e.InternalError(
- f"unexpected result status from query:"
- f" {ExecStatus(result.status).name}"
+ f"unexpected result status from query: {ExecStatus(result.status).name}"
)
def _set_current_result(self, i: int, format: Optional[Format] = None) -> None:
def PQhostaddr(pgconn: PGconn_struct) -> bytes:
if not _PQhostaddr:
raise NotSupportedError(
- f"PQhostaddr requires libpq from PostgreSQL 12,"
+ "PQhostaddr requires libpq from PostgreSQL 12,"
f" {libpq_version} available instead"
)
def PQsetTraceFlags(pgconn: PGconn_struct, flags: int) -> None:
if not _PQsetTraceFlags:
raise NotSupportedError(
- f"PQsetTraceFlags requires libpq from PostgreSQL 14,"
+ "PQsetTraceFlags requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
) -> Optional[bytes]:
if not _PQencryptPasswordConn:
raise NotSupportedError(
- f"PQencryptPasswordConn requires libpq from PostgreSQL 10,"
+ "PQencryptPasswordConn requires libpq from PostgreSQL 10,"
f" {libpq_version} available instead"
)
def PQpipelineStatus(pgconn: PGconn_struct) -> int:
if not _PQpipelineStatus:
raise NotSupportedError(
- f"PQpipelineStatus requires libpq from PostgreSQL 14,"
+ "PQpipelineStatus requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
return _PQpipelineStatus(pgconn)
def PQenterPipelineMode(pgconn: PGconn_struct) -> int:
if not _PQenterPipelineMode:
raise NotSupportedError(
- f"PQenterPipelineMode requires libpq from PostgreSQL 14,"
+ "PQenterPipelineMode requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
return _PQenterPipelineMode(pgconn)
def PQexitPipelineMode(pgconn: PGconn_struct) -> int:
if not _PQexitPipelineMode:
raise NotSupportedError(
- f"PQexitPipelineMode requires libpq from PostgreSQL 14,"
+ "PQexitPipelineMode requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
return _PQexitPipelineMode(pgconn)
def PQpipelineSync(pgconn: PGconn_struct) -> int:
if not _PQpipelineSync:
raise NotSupportedError(
- f"PQpipelineSync requires libpq from PostgreSQL 14,"
+ "PQpipelineSync requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
return _PQpipelineSync(pgconn)
def PQsendFlushRequest(pgconn: PGconn_struct) -> int:
if not _PQsendFlushRequest:
raise NotSupportedError(
- f"PQsendFlushRequest requires libpq from PostgreSQL 14,"
+ "PQsendFlushRequest requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
return _PQsendFlushRequest(pgconn)
if not self.closed:
warn(
f"the server-side cursor {self} was deleted while still open."
- f" Please use 'with' or '.close()' to close the cursor properly",
+ " Please use 'with' or '.close()' to close the cursor properly",
ResourceWarning,
)
if not self.closed:
warn(
f"the server-side cursor {self} was deleted while still open."
- f" Please use 'with' or '.close()' to close the cursor properly",
+ " Please use 'with' or '.close()' to close the cursor properly",
ResourceWarning,
)
joiner = SQL(joiner)
elif not isinstance(joiner, SQL):
raise TypeError(
- f"Composed.join() argument must be strings or SQL,"
+ "Composed.join() argument must be strings or SQL,"
f" got {joiner!r} instead"
)
def load(self, data: Buffer) -> Multirange[T]:
if not data or data[0] != _START_INT:
raise e.DataError(
- f"malformed multirange starting with"
+ "malformed multirange starting with"
f" {bytes(data[:1]).decode('utf8', 'replace')}"
)
parser.addoption(
"--no-collect-ok",
action="store_true",
- help="If no test is collected, exit with 0 instead of 5"
- " (useful with --lfnf=none).",
+ help=(
+ "If no test collected, exit with 0 instead of 5 (useful with --lfnf=none)."
+ ),
)
parser.addoption(
"--test-dsn",
metavar="DSN",
default=os.environ.get("PSYCOPG_TEST_DSN"),
- help="Connection string to run database tests requiring a connection"
- " [you can also use the PSYCOPG_TEST_DSN env var].",
+ help=(
+ "Connection string to run database tests requiring a connection"
+ " [you can also use the PSYCOPG_TEST_DSN env var]."
+ ),
)
parser.addoption(
"--pq-tracefile",
if len(r) > 200:
r = f"{r[:200]}... ({len(r)} chars)"
raise Exception(
- f"value {r!r} at record {i} column0 {j}"
- f" failed insert: {e}"
+ f"value {r!r} at record {i} column0 {j} failed insert: {e}"
) from None
# just in case, but hopefully we should have triggered the problem
if len(r) > 200:
r = f"{r[:200]}... ({len(r)} chars)"
raise Exception(
- f"value {r!r} at record {i} column0 {j}"
- f" failed insert: {e}"
+ f"value {r!r} at record {i} column0 {j} failed insert: {e}"
) from None
# just in case, but hopefully we should have triggered the problem
esc.escape_string(data)
-@pytest.mark.parametrize("data", [(b"hello\00world"), (b"\00\00\00\00")])
+@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"])
def test_escape_bytea(pgconn, data):
exp = rb"\x" + b"".join(b"%02x" % c for c in data)
esc = pq.Escaping(pgconn)
assert rv == exp
-@pytest.mark.parametrize("data", [(b"hello\00world"), (b"\00\00\00\00")])
+@pytest.mark.parametrize("data", [b"hello\00world", b"\00\00\00\00"])
def test_unescape_bytea(pgconn, data):
enc = rb"\x" + b"".join(b"%02x" % c for c in data)
esc = pq.Escaping(pgconn)
),
(
"host=_pg._tcp.bar.com",
- (
- "host=db1.example.com,db4.example.com,db3.example.com,db2.example.com"
- " port=5432,5432,5433,5432"
- ),
+ "host=db1.example.com,db4.example.com,db3.example.com,db2.example.com"
+ " port=5432,5432,5433,5432",
None,
),
(
"host=service.foo.com port=srv",
- ("host=service.example.com port=15432"),
+ "host=service.example.com port=15432",
None,
),
# No resolution
conn.execute("CREATE TYPE prepenum AS ENUM ('foo', 'bar', 'baz')")
conn.execute("CREATE TABLE preptable(id integer, bar prepenum[])")
conn.cursor().execute(
- "INSERT INTO preptable (bar) "
- "VALUES (%(enum_col)s::prepenum[])",
+ "INSERT INTO preptable (bar) VALUES (%(enum_col)s::prepenum[])",
{"enum_col": ["foo"]},
)
raise ZeroDivisionError()
cmds = commands.popall()
assert len(cmds) == 2
for cmd in cmds:
- assert ("fetch forward 2") in cmd.lower()
+ assert "fetch forward 2" in cmd.lower()
def test_cant_scroll_by_default(conn):
cmds = acommands.popall()
assert len(cmds) == 2
for cmd in cmds:
- assert ("fetch forward 2") in cmd.lower()
+ assert "fetch forward 2" in cmd.lower()
async def test_cant_scroll_by_default(aconn):