conn = psycopg.connect()
- conn.execute("select 123.45").fetchone()[0]
+ conn.execute("SELECT 123.45").fetchone()[0]
# Decimal('123.45')
conn.adapters.register_loader("numeric", psycopg.types.numeric.FloatLoader)
- conn.execute("select 123.45").fetchone()[0]
+ conn.execute("SELECT 123.45").fetchone()[0]
# 123.45
In this example the customised adaptation takes effect only on the connection
cur.execute("SELECT %s::text, %s::text", [date(2020, 12, 31), date.max]).fetchone()
# ('2020-12-31', 'infinity')
- cur.execute("select '2020-12-31'::date, 'infinity'::date").fetchone()
+ cur.execute("SELECT '2020-12-31'::date, 'infinity'::date").fetchone()
# (datetime.date(2020, 12, 31), datetime.date(9999, 12, 31))
# =# notify mychan, 'hey';
# NOTIFY
- print(conn.cursor().execute("select 1").fetchone())
+ print(conn.cursor().execute("SELECT 1").fetchone())
# got this: Notify(channel='mychan', payload='hey', pid=961823)
# (1,)
# Activity detected. Is the connection still ok?
try:
- conn.execute("select 1")
+ conn.execute("SELECT 1")
except psycopg.OperationalError:
# You were disconnected: do something useful such as panicking
logger.error("we lost our database!")
# Activity detected. Is the connection still ok?
try:
- await conn.execute("select 1")
+ await conn.execute("SELECT 1")
except psycopg.OperationalError:
# Guess what happened
...
.. code:: python
- conn.execute("select reffunc('curname')")
+ conn.execute("SELECT reffunc('curname')")
then create a server-side cursor with the same name and call directly the
fetch methods, omitting to call `~ServerCursor.execute()` before:
cur = conn.cursor(row_factory=PersonFactory)
cur.execute(
"""
- select id, first_name, last_name, dob
- from (values
+ SELECT id, first_name, last_name, dob
+ FROM (VALUES
(1, 'John', 'Doe', '2000-01-01'::date),
(2, 'Jane', 'White', NULL)
- ) as data (id, first_name, last_name, dob)
- where id = %(id)s;
+ ) AS data (id, first_name, last_name, dob)
+ WHERE id = %(id)s;
""",
{"id": id},
)
# This will not work
table_name = 'my_table'
- cur.execute("insert into %s values (%s, %s)", [table_name, 10, 20])
+ cur.execute("INSERT INTO %s VALUES (%s, %s)", [table_name, 10, 20])
The SQL query should be composed before the arguments are merged, for
instance::
# This works, but it is not optimal
table_name = 'my_table'
cur.execute(
- "insert into %s values (%%s, %%s)" % table_name,
+ "INSERT INTO %s VALUES (%%s, %%s)" % table_name,
[10, 20])
This sort of works, but it is an accident waiting to happen: the table name
# This works, but it is not optimal
table_name = 'my_table'
cur.execute(
- "insert into %s values (%%s, %%s)" % Escaping.escape_identifier(table_name),
+ "INSERT INTO %s VALUES (%%s, %%s)" % Escaping.escape_identifier(table_name),
[10, 20])
This is now safe, but it somewhat ad-hoc. In case, for some reason, it is
from psycopg import sql
cur.execute(
- sql.SQL("insert into {} values (%s, %s)")
+ sql.SQL("INSERT INTO {} VALUES (%s, %s)")
.format(sql.Identifier('my_table')),
[10, 20])
`~psycopg.Cursor.execute()`: such value placeholders will be untouched by
`!format()`::
- query = sql.SQL("select {field} from {table} where {pkey} = %s").format(
+ query = sql.SQL("SELECT {field} FROM {table} WHERE {pkey} = %s").format(
field=sql.Identifier('my_name'),
table=sql.Identifier('some_table'),
pkey=sql.Identifier('id'))
comma-separated list of field names, you can use the `SQL.join()` method to
pass them to the query::
- query = sql.SQL("select {fields} from {table}").format(
+ query = sql.SQL("SELECT {fields} FROM {table}").format(
fields=sql.SQL(',').join([
sql.Identifier('field1'),
sql.Identifier('field2'),
t = TypeInfo.fetch(conn, "mytype")
t.register(conn)
- for record in conn.execute("select mytypearray from mytable"):
+ for record in conn.execute("SELECT mytypearray FROM mytable"):
# records will return lists of "mytype" as string
class MyTypeLoader(Loader):
conn.adapters.register_loader("mytype", MyTypeLoader)
- for record in conn.execute("select mytypearray from mytable"):
+ for record in conn.execute("SELECT mytypearray FROM mytable"):
# records will return lists of MyType instances
expect an :sql:`integer` (aka :sql:`int4`): passing them a :sql:`bigint`
may cause an error::
- cur.execute("select current_date + %s", [1])
+ cur.execute("SELECT current_date + %s", [1])
# UndefinedFunction: operator does not exist: date + bigint
In this case you should add an :sql:`::int` cast to your query or use the
`~psycopg.types.numeric.Int4` wrapper::
- cur.execute("select current_date + %s::int", [1])
+ cur.execute("SELECT current_date + %s::int", [1])
- cur.execute("select current_date + %s", [Int4(1)])
+ cur.execute("SELECT current_date + %s", [Int4(1)])
.. admonition:: TODO
conn = psycopg.connect()
conn.execute(
- "insert into strtest (id, data) values (%s, %s)",
+ "INSERT INTO strtest (id, data) VALUES (%s, %s)",
(1, "Crème Brûlée at 4.99€"))
- conn.execute("select data from strtest where id = 1").fetchone()[0]
+ conn.execute("SELECT data FROM strtest WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
PostgreSQL databases `have an encoding`__, and `the session has an encoding`__
# The Latin-9 encoding can manage some European accented letters
# and the Euro symbol
conn.client_encoding = 'latin9'
- conn.execute("select data from strtest where id = 1").fetchone()[0]
+ conn.execute("SELECT data FROM strtest WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
# The Latin-1 encoding doesn't have a representation for the Euro symbol
conn.client_encoding = 'latin1'
- conn.execute("select data from strtest where id = 1").fetchone()[0]
+ conn.execute("SELECT data FROM strtest WHERE id = 1").fetchone()[0]
# Traceback (most recent call last)
# ...
# UntranslatableCharacter: character with byte sequence 0xe2 0x82 0xac
.. code:: python
conn.client_encoding = "ascii"
- conn.execute("select data from strtest where id = 1").fetchone()[0]
+ conn.execute("SELECT data FROM strtest WHERE id = 1").fetchone()[0]
b'Cr\xc3\xa8me Br\xc3\xbbl\xc3\xa9e at 4.99\xe2\x82\xac'
Alternatively you can cast the unknown encoding data to :sql:`bytea` to
from psycopg.types.json import Jsonb
thing = {"foo": ["bar", 42]}
- conn.execute("insert into mytable values (%s)", [Jsonb(thing)])
+ conn.execute("INSERT INTO mytable VALUES (%s)", [Jsonb(thing)])
By default Psycopg uses the standard library `json.dumps` and `json.loads`
functions to serialize and de-serialize Python objects to JSON. If you want to
# Return floating point values as Decimal, just in one connection
set_json_loads(partial(json.loads, parse_float=Decimal), conn)
- conn.execute("select %s", [Jsonb({"value": 123.45})]).fetchone()[0]
+ conn.execute("SELECT %s", [Jsonb({"value": 123.45})]).fetchone()[0]
# {'value': Decimal('123.45')}
If you need an even more specific dump customisation only for certain objects
uuid_dumps = partial(json.dumps, cls=UUIDEncoder)
obj = {"uuid": uuid4()}
- cnn.execute("insert into objs values %s", [Json(obj, dumps=uuid_dumps)])
+ cnn.execute("INSERT INTO objs VALUES %s", [Json(obj, dumps=uuid_dumps)])
# will insert: {'uuid': '0a40799d-3980-4c65-8315-2956b18ab0e1'}
.. code:: python
- cur.execute("select '[10,20,30]'::jsonb -> 1").fetchone()
+ cur.execute("SELECT '[10,20,30]'::jsonb -> 1").fetchone()
# returns (20,)
- cur.execute("select '[10,20,30]'::jsonb -> %s", [1]).fetchone()
+ cur.execute("SELECT '[10,20,30]'::jsonb -> %s", [1]).fetchone()
# raises an exception:
# UndefinedFunction: operator does not exist: jsonb -> numeric
- cur.execute("select '[10,20,30]'::jsonb -> %s::int", [1]).fetchone()
+ cur.execute("SELECT '[10,20,30]'::jsonb -> %s::int", [1]).fetchone()
# returns (20,)
PostgreSQL will also reject the execution of several queries at once
.. code::
- print(psycopg.connect(DSN).execute("select now()").fetchone()[0])
+ print(psycopg.connect(DSN).execute("SELECT now()").fetchone()[0])
# 2042-07-12 18:15:10.706497+01:00
register_adapters(self, context)
_info_query = """\
-select
- typname as name, oid, typarray as array_oid,
- oid::regtype as alt_name, typdelim as delimiter
-from pg_type t
-where t.oid = %(name)s::regtype
-order by t.oid
+SELECT
+ typname AS name, oid, typarray AS array_oid,
+ oid::regtype AS alt_name, typdelim AS delimiter
+FROM pg_type t
+WHERE t.oid = %(name)s::regtype
+ORDER BY t.oid
"""
register_adapters(self, context)
_info_query = """\
-select t.typname as name, t.oid as oid, t.typarray as array_oid,
- r.rngsubtype as subtype_oid
-from pg_type t
-join pg_range r on t.oid = r.rngtypid
-where t.oid = %(name)s::regtype
+SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
+ r.rngsubtype AS subtype_oid
+FROM pg_type t
+JOIN pg_range r ON t.oid = r.rngtypid
+WHERE t.oid = %(name)s::regtype
"""
register_adapters(self, context, factory)
_info_query = """\
-select
- t.typname as name, t.oid as oid, t.typarray as array_oid,
- coalesce(a.fnames, '{}') as field_names,
- coalesce(a.ftypes, '{}') as field_types
-from pg_type t
-left join (
- select
+SELECT
+ t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
+ coalesce(a.fnames, '{}') AS field_names,
+ coalesce(a.ftypes, '{}') AS field_types
+FROM pg_type t
+LEFT JOIN (
+ SELECT
attrelid,
- array_agg(attname) as fnames,
- array_agg(atttypid) as ftypes
- from (
- select a.attrelid, a.attname, a.atttypid
- from pg_attribute a
- join pg_type t on t.typrelid = a.attrelid
- where t.oid = %(name)s::regtype
- and a.attnum > 0
- and not a.attisdropped
- order by a.attnum
+ array_agg(attname) AS fnames,
+ array_agg(atttypid) AS ftypes
+ FROM (
+ SELECT a.attrelid, a.attname, a.atttypid
+ FROM pg_attribute a
+ JOIN pg_type t ON t.typrelid = a.attrelid
+ WHERE t.oid = %(name)s::regtype
+ AND a.attnum > 0
+ AND NOT a.attisdropped
+ ORDER BY a.attnum
) x
- group by attrelid
-) a on a.attrelid = t.typrelid
-where t.oid = %(name)s::regtype
+ GROUP BY attrelid
+) a ON a.attrelid = t.typrelid
+WHERE t.oid = %(name)s::regtype
"""
def _set_client_encoding_gen(self, name: str) -> PQGen[None]:
self.pgconn.send_query_params(
- b"select set_config('client_encoding', $1, false)",
+ b"SELECT set_config('client_encoding', $1, false)",
[encodings.py2pg(name)],
)
(result,) = yield from execute(self.pgconn)
if self._begin_statement:
return self._begin_statement
- parts = [b"begin"]
+ parts = [b"BEGIN"]
if self.isolation_level is not None:
val = IsolationLevel(self.isolation_level)
- parts.append(b"isolation level")
- parts.append(val.name.lower().replace("_", " ").encode("utf8"))
+ parts.append(b"ISOLATION LEVEL")
+ parts.append(val.name.replace("_", " ").encode("utf8"))
if self.read_only is not None:
- parts.append(b"read only" if self.read_only else b"read write")
+ parts.append(b"READ ONLY" if self.read_only else b"READ WRITE")
if self.deferrable is not None:
parts.append(
- b"deferrable" if self.deferrable else b"not deferrable"
+ b"DEFERRABLE" if self.deferrable else b"NOT DEFERRABLE"
)
self._begin_statement = b" ".join(parts)
if self.pgconn.transaction_status == TransactionStatus.IDLE:
return
- yield from self._exec_command(b"commit")
+ yield from self._exec_command(b"COMMIT")
def _rollback_gen(self) -> PQGen[None]:
"""Generator implementing `Connection.rollback()`."""
if self.pgconn.transaction_status == TransactionStatus.IDLE:
return
- yield from self._exec_command(b"rollback")
+ yield from self._exec_command(b"ROLLBACK")
class Connection(BaseConnection[Row]):
while conns:
conn = conns.pop()
try:
- await conn.execute("select 1")
+ await conn.execute("SELECT 1")
except Exception:
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
while conns:
conn = conns.pop()
try:
- conn.execute("select 1")
+ conn.execute("SELECT 1")
except Exception:
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
# but we must make sure it exists.
if not self.described:
query = sql.SQL(
- "select 1 from pg_catalog.pg_cursors where name = {}"
+ "SELECT 1 FROM pg_catalog.pg_cursors WHERE name = {}"
).format(sql.Literal(self.name))
res = yield from cur._conn._exec_command(query)
if res.ntuples == 0:
return
- query = sql.SQL("close {}").format(sql.Identifier(self.name))
+ query = sql.SQL("CLOSE {}").format(sql.Identifier(self.name))
yield from cur._conn._exec_command(query)
def _fetch_gen(
if num is not None:
howmuch: sql.Composable = sql.Literal(num)
else:
- howmuch = sql.SQL("all")
+ howmuch = sql.SQL("ALL")
- query = sql.SQL("fetch forward {} from {}").format(
+ query = sql.SQL("FETCH FORWARD {} FROM {}").format(
howmuch, sql.Identifier(self.name)
)
res = yield from cur._conn._exec_command(query)
raise ValueError(
f"bad mode: {mode}. It should be 'relative' or 'absolute'"
)
- query = sql.SQL("move{} {} from {}").format(
- sql.SQL(" absolute" if mode == "absolute" else ""),
+ query = sql.SQL("MOVE{} {} FROM {}").format(
+ sql.SQL(" ABSOLUTE" if mode == "absolute" else ""),
sql.Literal(value),
sql.Identifier(self.name),
)
query = sql.SQL(query)
parts = [
- sql.SQL("declare"),
+ sql.SQL("DECLARE"),
sql.Identifier(self.name),
]
if self.scrollable is not None:
- parts.append(sql.SQL("scroll" if self.scrollable else "no scroll"))
- parts.append(sql.SQL("cursor"))
+ parts.append(sql.SQL("SCROLL" if self.scrollable else "NO SCROLL"))
+ parts.append(sql.SQL("CURSOR"))
if self.withhold:
- parts.append(sql.SQL("with hold"))
- parts.append(sql.SQL("for"))
+ parts.append(sql.SQL("WITH HOLD"))
+ parts.append(sql.SQL("FOR"))
parts.append(query)
return sql.SQL(" ").join(parts)
Example::
>>> comp = sql.Composed(
- ... [sql.SQL("insert into "), sql.Identifier("table")])
+ ... [sql.SQL("INSERT INTO "), sql.Identifier("table")])
>>> print(comp.as_string(conn))
- insert into "table"
+ INSERT INTO "table"
`!Composed` objects are iterable (so they can be used in `SQL.join` for
instance).
Example::
- >>> query = sql.SQL("select {0} from {1}").format(
+ >>> query = sql.SQL("SELECT {0} FROM {1}").format(
... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]),
... sql.Identifier('table'))
>>> print(query.as_string(conn))
- select "foo", "bar" from "table"
+ SELECT "foo", "bar" FROM "table"
"""
_obj: str
Example::
- >>> print(sql.SQL("select * from {} where {} = %s")
+ >>> print(sql.SQL("SELECT * FROM {} WHERE {} = %s")
... .format(sql.Identifier('people'), sql.Identifier('id'))
... .as_string(conn))
- select * from "people" where "id" = %s
+ SELECT * FROM "people" WHERE "id" = %s
- >>> print(sql.SQL("select * from {tbl} where name = {name}")
+ >>> print(sql.SQL("SELECT * FROM {tbl} WHERE name = {name}")
... .format(tbl=sql.Identifier('people'), name="O'Rourke"))
... .as_string(conn))
- select * from "people" where name = 'O''Rourke'
+ SELECT * FROM "people" WHERE name = 'O''Rourke'
"""
rv: List[Composable] = []
Example::
- >>> query = sql.SQL("select {} from {}").format(
+ >>> query = sql.SQL("SELECT {} FROM {}").format(
... sql.Identifier("table", "field"),
... sql.Identifier("schema", "table"))
>>> print(query.as_string(conn))
- select "table"."field" from "schema"."table"
+ SELECT "table"."field" FROM "schema"."table"
"""
>>> names = ['foo', 'bar', 'baz']
- >>> q1 = sql.SQL("insert into table ({}) values ({})").format(
+ >>> q1 = sql.SQL("INSERT INTO my_table ({}) VALUES ({})").format(
... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(sql.Placeholder() * len(names)))
>>> print(q1.as_string(conn))
- insert into table ("foo", "bar", "baz") values (%s, %s, %s)
+ INSERT INTO my_table ("foo", "bar", "baz") VALUES (%s, %s, %s)
- >>> q2 = sql.SQL("insert into table ({}) values ({})").format(
+ >>> q2 = sql.SQL("INSERT INTO my_table ({}) VALUES ({})").format(
... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(map(sql.Placeholder, names)))
>>> print(q2.as_string(conn))
- insert into table ("foo", "bar", "baz") values (%(foo)s, %(bar)s, %(baz)s)
+ INSERT INTO my_table ("foo", "bar", "baz") VALUES (%(foo)s, %(bar)s, %(baz)s)
"""
if self._savepoint_name:
commands.append(
- sql.SQL("savepoint {}")
+ sql.SQL("SAVEPOINT {}")
.format(sql.Identifier(self._savepoint_name))
.as_bytes(self._conn)
)
commands = []
if self._savepoint_name and not self._outer_transaction:
commands.append(
- sql.SQL("release {}")
+ sql.SQL("RELEASE {}")
.format(sql.Identifier(self._savepoint_name))
.as_bytes(self._conn)
)
if self._outer_transaction:
assert not self._conn._savepoints
- commands.append(b"commit")
+ commands.append(b"COMMIT")
return self._conn._exec_command(b"; ".join(commands))
commands = []
if self._savepoint_name and not self._outer_transaction:
commands.append(
- sql.SQL("rollback to {n}; release {n}")
+ sql.SQL("ROLLBACK TO {n}; RELEASE {n}")
.format(n=sql.Identifier(self._savepoint_name))
.as_bytes(self._conn)
)
if self._outer_transaction:
assert not self._conn._savepoints
- commands.append(b"rollback")
+ commands.append(b"ROLLBACK")
yield from self._conn._exec_command(b"; ".join(commands))
# Using Transaction explicitly becase conn.transaction() enters the contetx
assert not commands
with conn.transaction() as tx:
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
assert not tx.savepoint_name
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ["COMMIT"]
# Case 1 (with a transaction already started)
conn.cursor().execute("select 1")
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with conn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_1"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_1"']
assert tx.savepoint_name == "_pg3_1"
- assert commands.popall() == ['release "_pg3_1"']
+ assert commands.popall() == ['RELEASE "_pg3_1"']
conn.rollback()
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 2
with conn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ['begin; savepoint "foo"']
+ assert commands.popall() == ['BEGIN; SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name provided)
with conn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with conn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['savepoint "bar"']
+ assert commands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
- assert commands.popall() == ['release "bar"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
with conn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with conn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_2"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
- assert commands.popall() == ['release "_pg3_2"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
def test_named_savepoints_exception_exit(conn, commands):
# Case 1
with pytest.raises(ExpectedException):
with conn.transaction() as tx:
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
assert not tx.savepoint_name
raise ExpectedException
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 2
with pytest.raises(ExpectedException):
with conn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ['begin; savepoint "foo"']
+ assert commands.popall() == ['BEGIN; SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
raise ExpectedException
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 3 (with savepoint name provided)
with conn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
with conn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['savepoint "bar"']
+ assert commands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
raise ExpectedException
- assert commands.popall() == ['rollback to "bar"; release "bar"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['ROLLBACK TO "bar"; RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
with conn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
with conn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_2"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
raise ExpectedException
- assert commands.popall() == ['rollback to "_pg3_2"; release "_pg3_2"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['ROLLBACK TO "_pg3_2"; RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
def test_named_savepoints_with_repeated_names_works(conn):
# Case 1
# Using Transaction explicitly becase conn.transaction() enters the contetx
async with aconn.transaction() as tx:
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
assert not tx.savepoint_name
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ["COMMIT"]
# Case 1 (with a transaction already started)
await aconn.cursor().execute("select 1")
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
async with aconn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_1"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_1"']
assert tx.savepoint_name == "_pg3_1"
- assert commands.popall() == ['release "_pg3_1"']
+ assert commands.popall() == ['RELEASE "_pg3_1"']
await aconn.rollback()
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 2
async with aconn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ['begin; savepoint "foo"']
+ assert commands.popall() == ['BEGIN; SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name provided)
async with aconn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
async with aconn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['savepoint "bar"']
+ assert commands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
- assert commands.popall() == ['release "bar"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
async with aconn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
async with aconn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_2"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
- assert commands.popall() == ['release "_pg3_2"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
async def test_named_savepoints_exception_exit(aconn, acommands):
# Case 1
with pytest.raises(ExpectedException):
async with aconn.transaction() as tx:
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
assert not tx.savepoint_name
raise ExpectedException
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 2
with pytest.raises(ExpectedException):
async with aconn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ['begin; savepoint "foo"']
+ assert commands.popall() == ['BEGIN; SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
raise ExpectedException
- assert commands.popall() == ["rollback"]
+ assert commands.popall() == ["ROLLBACK"]
# Case 3 (with savepoint name provided)
async with aconn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
async with aconn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['savepoint "bar"']
+ assert commands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
raise ExpectedException
- assert commands.popall() == ['rollback to "bar"; release "bar"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['ROLLBACK TO "bar"; RELEASE "bar"']
+ assert commands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
async with aconn.transaction():
- assert commands.popall() == ["begin"]
+ assert commands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
async with aconn.transaction() as tx:
- assert commands.popall() == ['savepoint "_pg3_2"']
+ assert commands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
raise ExpectedException
- assert commands.popall() == ['rollback to "_pg3_2"; release "_pg3_2"']
- assert commands.popall() == ["commit"]
+ assert commands.popall() == ['ROLLBACK TO "_pg3_2"; RELEASE "_pg3_2"']
+ assert commands.popall() == ["COMMIT"]
async def test_named_savepoints_with_repeated_names_works(aconn):