.. autoattribute:: rowcount
.. autoattribute:: rownumber
- .. autoattribute:: query
+ .. attribute:: _query
- The query will be in PostgreSQL format (with ``$1``, ``$2``...
- parameters), the parameters will *not* be merged to the query: see
- `params`.
+ An helper object used to convert queries and parameters before sending
+ them to PostgreSQL.
- .. autoattribute:: params
+ .. note::
+ This attribute is exposed because it might be helpful to debug
+ problems when the communication between Python and PostgreSQL
+ doesn't work as expected. For this reason, the attribute is
+ available when a query fails too.
+
+ .. warning::
+ You shouldn't consider it part of the public interface of the
+ object: it might change without warnings.
+
+ Except this warning, I guess.
+
+ If you would like to build reliable features using this object,
+ please get in touch so we can try and design an useful interface
+ for it.
+
+ Among the properties currently exposed by this object:
+
+ - `!query` (`!bytes`): the query effectively sent to PostgreSQL. It
+ will have Python placeholders (``%s``\-style) replaced with
+ PostgreSQL ones (``$1``, ``$2``\-style).
+
+ - `!params` (sequence of `!bytes`): the parameters passed to
+ PostgreSQL, adapted to the database format.
+
+ - `!types` (sequence of `!int`): the OID of the parameters passed to
+ PostgreSQL.
- The parameters are adapted to PostgreSQL format.
+ - `!formats` (sequence of `pq.Format`): whether the parameter format
+ is text or binary.
The `!ServerCursor` class
"""
__slots__ = """
- params types formats
- _tx _want_formats _parts query _encoding _order
+ query params types formats
+ _tx _want_formats _parts _encoding _order
""".split()
def __init__(self, transformer: "Transformer"):
if sys.version_info >= (3, 7):
__slots__ = """
_conn format _adapters arraysize _closed _results pgresult _pos
- _iresult _rowcount _pgq _tx _last_query _row_factory _make_row
+ _iresult _rowcount _query _tx _last_query _row_factory _make_row
__weakref__
""".split()
self._pos = 0
self._iresult = 0
self._rowcount = -1
- self._pgq: Optional[PostgresQuery] = None
+ self._query: Optional[PostgresQuery] = None
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
"""`True` if the cursor is closed."""
return self._closed
- @property
- def query(self) -> Optional[bytes]:
- """The last query sent to the server, if available."""
- return self._pgq.query if self._pgq else None
-
- @property
- def params(self) -> Optional[List[Optional[bytes]]]:
- """The last set of parameters sent to the server, if available."""
- return self._pgq.params if self._pgq else None
-
@property
def description(self) -> Optional[List[Column]]:
"""
for params in params_seq:
if first:
pgq = self._convert_query(query, params)
- self._pgq = pgq
+ self._query = pgq
first = False
else:
pgq.dump(params)
This is not a generator, but a normal non-blocking function.
"""
- self._pgq = query
+ self._query = query
if query.params or no_pqexec or self.format == Format.BINARY:
self._conn.pgconn.send_query_params(
query.query,
def test_copy_query(conn):
cur = conn.cursor()
with cur.copy("copy (select 1) to stdout") as copy:
- assert cur.query == b"copy (select 1) to stdout"
- assert cur.params is None
+ assert cur._query.query == b"copy (select 1) to stdout"
+ assert cur._query.params is None
list(copy)
async def test_copy_query(aconn):
cur = aconn.cursor()
async with cur.copy("copy (select 1) to stdout") as copy:
- assert cur.query == b"copy (select 1) to stdout"
- assert cur.params is None
+ assert cur._query.query == b"copy (select 1) to stdout"
+ assert cur._query.params is None
async for record in copy:
pass
def test_query_params_execute(conn):
cur = conn.cursor()
- assert cur.query is None
- assert cur.params is None
+ assert cur._query is None
cur.execute("select %t, %s::text", [1, None])
- assert cur.query == b"select $1, $2::text"
- assert cur.params == [b"1", None]
+ assert cur._query.query == b"select $1, $2::text"
+ assert cur._query.params == [b"1", None]
cur.execute("select 1")
- assert cur.query == b"select 1"
- assert cur.params is None
+ assert cur._query.query == b"select 1"
+ assert cur._query.params is None
with pytest.raises(psycopg.DataError):
cur.execute("select %t::int", ["wat"])
- assert cur.query == b"select $1::int"
- assert cur.params == [b"wat"]
+ assert cur._query.query == b"select $1::int"
+ assert cur._query.params == [b"wat"]
def test_query_params_executemany(conn):
cur = conn.cursor()
cur.executemany("select %t, %t", [[1, 2], [3, 4]])
- assert cur.query == b"select $1, $2"
- assert cur.params == [b"3", b"4"]
-
- with pytest.raises((psycopg.DataError, TypeError)):
- cur.executemany("select %t::int", [[1], ["x"], [2]])
- assert cur.query == b"select $1::int"
- # TODO: cannot really check this: after introduced row_dumpers, this
- # fails dumping, not query passing.
- # assert cur.params == [b"x"]
+ assert cur._query.query == b"select $1, $2"
+ assert cur._query.params == [b"3", b"4"]
def test_stream(conn):
async def test_query_params_execute(aconn):
cur = aconn.cursor()
- assert cur.query is None
- assert cur.params is None
+ assert cur._query is None
await cur.execute("select %t, %s::text", [1, None])
- assert cur.query == b"select $1, $2::text"
- assert cur.params == [b"1", None]
+ assert cur._query.query == b"select $1, $2::text"
+ assert cur._query.params == [b"1", None]
await cur.execute("select 1")
- assert cur.query == b"select 1"
- assert cur.params is None
+ assert cur._query.query == b"select 1"
+ assert cur._query.params is None
with pytest.raises(psycopg.DataError):
await cur.execute("select %t::int", ["wat"])
- assert cur.query == b"select $1::int"
- assert cur.params == [b"wat"]
+ assert cur._query.query == b"select $1::int"
+ assert cur._query.params == [b"wat"]
async def test_query_params_executemany(aconn):
cur = aconn.cursor()
await cur.executemany("select %t, %t", [[1, 2], [3, 4]])
- assert cur.query == b"select $1, $2"
- assert cur.params == [b"3", b"4"]
-
- with pytest.raises((psycopg.DataError, TypeError)):
- await cur.executemany("select %t::int", [[1], ["x"], [2]])
- assert cur.query == b"select $1::int"
- # TODO: cannot really check this: after introduced row_dumpers, this
- # fails dumping, not query passing.
- # assert cur.params == [b"x"]
+ assert cur._query.query == b"select $1, $2"
+ assert cur._query.params == [b"3", b"4"]
async def test_stream(aconn):
def test_query_params(conn):
with conn.cursor("foo") as cur:
- assert cur.query is None
- assert cur.params is None
+ assert cur._query is None
cur.execute("select generate_series(1, %s) as bar", (3,))
- assert b"declare" in cur.query.lower()
- assert b"(1, $1)" in cur.query.lower()
- assert cur.params == [bytes([0, 3])] # 3 as binary int2
+ assert b"declare" in cur._query.query.lower()
+ assert b"(1, $1)" in cur._query.query.lower()
+ assert cur._query.params == [bytes([0, 3])] # 3 as binary int2
def test_close(conn, recwarn, retries):
async def test_query_params(aconn):
async with aconn.cursor("foo") as cur:
- assert cur.query is None
- assert cur.params is None
+ assert cur._query is None
await cur.execute("select generate_series(1, %s) as bar", (3,))
- assert b"declare" in cur.query.lower()
- assert b"(1, $1)" in cur.query.lower()
- assert cur.params == [bytes([0, 3])] # 3 as binary int2
+ assert b"declare" in cur._query.query.lower()
+ assert b"(1, $1)" in cur._query.query.lower()
+ assert cur._query.params == [bytes([0, 3])] # 3 as binary int2
async def test_close(aconn, recwarn, retries):