Daniele Varrazzo [Sun, 27 Mar 2022 15:01:11 +0000 (17:01 +0200)]
refactor: move pipeline finalisation code to Pipeline.__exit__
This code has more internal knowledge of the Pipeline object than the
Connection object.
For some reason I don't understand, had to declare 'command_queue' and
'result_queue' types explicitly to the class definition. Leaving just the
definitions in '__init__()' causes mypy (0.940) to complain in 'cursor.py'.
Denis Laxalde [Mon, 11 Oct 2021 15:16:39 +0000 (17:16 +0200)]
Add support for pipeline mode in execute()/fetch*()
When activated on the connection, a pipeline active pipeline handles
a queue of commands to send and a queue of results to process.
The command queue simply contains Callable[[], None], which are built
from partial applications of pgconn.send_*() methods et al.
The queue of results to process either contains a None, when respective
command returns no tuple, or a tuple with respective cursor and query
information needed to maintain automatic prepared statement.
Everywhere we run the execute() generator in non-pipeline mode, we now
enqueue items in the pipeline queues. Then we run
pipeline_communicate(), through the _communicate_gen() method of
BasePipeline, in BaseCursor._execute(many)_gen().
Since pipeline_communicate() may not fetch all results, we need a
dedicated fetch (forced) step upon call to cursor.fetch*(); this is done
by Cursor._fetch_pipeline() called in fetch*() methods. This calls
PQsendFlushRequest() in order to avoid blocking on PQgetResult().
At exit of pipeline mode, we unconditionally emit a PQpipelineSync()
call in order to restore the connection in a usable state in case of
error and we force results fetch after sending any pending commands
(e.g. commands not emitted through an execute() call).
The pipeline-demo.py test script is updated to include examples using
the high-level API. This only works with the 'python' of libpq bindings
because we monkeypatch the pgconn attribute of the connection.
Denis Laxalde [Thu, 2 Dec 2021 14:13:08 +0000 (15:13 +0100)]
Avoid multiple commands in transaction code
In pipeline mode, command strings containing multiple SQL commands are
disallowed so we remove all such usages from transaction code.
Accordingly, all generator methods in transaction classes now do not
return anything (the result was not used previously anyways).
In tests, the 'commands' list defined in patch_exec() is now filled by
appending instead of inserting so that we keep the natural order of
commands in assertions.
Denis Laxalde [Mon, 11 Oct 2021 15:16:39 +0000 (17:16 +0200)]
Handle pipeline-mode errors when checking results
In BaseCursor._check_result(), we now handle FATAL_ERROR and (the
pipeline-mode specific error) PIPELINE_ABORTED in preparation for
supporting the pipeline mode where we'd check results upon fetch*()
instead of upon execute() currently.
Similarly, BaseCursor._raise_for_result() handles PIPELINE_ABORTED
result status.
Denis Laxalde [Mon, 11 Oct 2021 09:35:06 +0000 (11:35 +0200)]
Add pipeline_communicate() generator
This generator will be used as the send operations during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state. Queries to be
sent are in a queue of Callable[[], None] which are built from partial
application of pgconn.send_query() and similar functions.
We add a couple of unit tests, in test_generators.py along with a test
script is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. Typically, we'd
get:
enter pipeline
sent BEGIN TRANSACTION
sent DROP TABLE IF EXISTS pq_pipeline_demo
sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare'
sent prepared 'prepare' with [b'10000', b'4611686018427387904']
sent prepared 'prepare' with [b'9999', b'4611686018427387904']
sent prepared 'prepare' with [b'9998', b'4611686018427387904']
sent prepared 'prepare' with [b'9997', b'4611686018427387904']
sent prepared 'prepare' with [b'9996', b'4611686018427387904']
sent prepared 'prepare' with [b'9995', b'4611686018427387904']
sent prepared 'prepare' with [b'9994', b'4611686018427387904']
sent prepared 'prepare' with [b'9993', b'4611686018427387904']
sent prepared 'prepare' with [b'9992', b'4611686018427387904']
...
sent prepared 'prepare' with [b'9690', b'4611686018427387904']
sent prepared 'prepare' with [b'9689', b'4611686018427387904']
sent prepared 'prepare' with [b'9688', b'4611686018427387904']
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
sent prepared 'prepare' with [b'9687', b'4611686018427387904']
sent prepared 'prepare' with [b'9686', b'4611686018427387904']
sent prepared 'prepare' with [b'9685', b'4611686018427387904']
sent prepared 'prepare' with [b'9684', b'4611686018427387904']
sent prepared 'prepare' with [b'9683', b'4611686018427387904']
...
sent prepared 'prepare' with [b'2', b'4611686018427387904']
sent prepared 'prepare' with [b'1', b'4611686018427387904']
sent COMMIT
pipeline sync sent
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
got PIPELINE_SYNC results
exit pipeline
We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.
Daniele Varrazzo [Mon, 28 Mar 2022 16:54:26 +0000 (18:54 +0200)]
fix: default executemany to not fetching results
This is a change from before, but the feature has not been released yet,
so we are still in time to change it.
The change addresses some uneasy feeling about unexpected increased
memory usage (see #164) and may prove a win in combination with the
optimization happening in pipeline mode.
So, if the user is not asking for it, let's not provide it.
Daniele Varrazzo [Mon, 28 Mar 2022 17:45:26 +0000 (19:45 +0200)]
docs: use monotype style for parameter names
The convention of using italic is more a math thing. In the currently
rendered docs, ``param`` also get rendered in bold rather than italic,
so there is just no connection to anything else on the page.
A :param:`name` role would be nice. Maybe rendered in
monospace+italic...
Daniele Varrazzo [Fri, 25 Mar 2022 15:57:28 +0000 (16:57 +0100)]
fix(copy): chunk large buffers before queuing, not after
This is conceptually a better place to do it, because the queue has the
function of applying backpressure on the data generator. Splitting large
buffers later would flood the libpq without effectively slowing down the
producer.
Also, reduce the size of the chunks appended to the libpq from 1Mb to
128K. This makes an *immense* difference: the too large chunk probably
triggers some quadraticity in the libpq. The test script found in #255,
piped in `ts -s`, shows that pushing a block of data of about 1Gb size
(which will fail in Postgres anyway), with the smaller size, will take
about 9s. With the larger size, it takes 4.10m to get to waiting for
PQputCopyEnd, and other almost 6 minutes to receive the error message
from the server.
00:00:47 putting 1048576 (or less) bytes in queue size 1023
00:00:47 writing copy end
00:00:47 got 1048576 bytes from queue size 1023
...
00:01:25 got 1048576 bytes from queue size 640
...
00:01:54 got 1048576 bytes from queue size 512
...
00:03:00 got 1048576 bytes from queue size 256
...
00:04:12 got 0 bytes from queue size 0
00:04:12 wait for copy end
00:09:59 Traceback (most recent call last):
...
Adding a few prints (see #255 for details) also shows that the time
spent in PQputCopyData increases, going from ~15 entries/sec processed
when the writer has just finished pushing data in the queue, down to ~4
items/sec towards the end.
Considering that a reduction of 10-20% of the input size causes a
decrease of the processing time of about 50%, there is definitely
something quadratic going on there. It might be possible to improve the
libpq, but for the moment it's better to try and coexist nicely with the
current state.
Daniele Varrazzo [Sun, 20 Mar 2022 00:32:25 +0000 (01:32 +0100)]
fix: fix loading of text arrays with dimension information
The dimension information is a prefix such as ``[0:2]=`` in front of the
array. We just discard it when loading to lists, because for Python they
are always 0-based.
fix: don't raise error accessing Cursor.description after COPY_OUT
COPY_OUT result advertises the number of columns but not their names (or
types). Use a surrogate name for description (which is more useful than
returning `None`, because at lest it tells how many columns were
emitted).
Daniele Varrazzo [Tue, 22 Feb 2022 03:02:13 +0000 (04:02 +0100)]
fix: Cancel query on Ctrl-C
On KeyboardInterrupt, send a cancel to the server and keep waiting for
the result of the cancel, which is expected to raise a QueryCanceled,
then re-raise KeyboardInterrupt.
Before this, the connection was left in ACTIVE state, so it couldn't be rolled
back.
Only fixed on sync connections. Left a failing test for async
connections; the test fails with an output from the script such as:
error ignored in rollback on <psycopg.AsyncConnection [ACTIVE] ...>:
sending query failed: another command is already in progress
Traceback (most recent call last):
File "<string>", line 27, in <module>
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
self.run_forever()
File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
self._run_once()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.8/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
And the except branch in `AsyncConnection.wait()` is not reached.
Daniele Varrazzo [Sat, 19 Feb 2022 17:19:31 +0000 (18:19 +0100)]
refactor(pool): update some debug logging
Drop debug logging on `MaintenanceTask.init()`, which is not
particularly useful. Add debug entries on tasks `run()` early bailout,
which is a more interesting condition to follow (and would have allowed
to spot #230 more easily).
Daniele Varrazzo [Sat, 19 Feb 2022 16:34:37 +0000 (17:34 +0100)]
fix(pool): set the open flag before starting the maintenance tasks
Failing to do so we might trigger the test in `MaintenanceTask.run()`
and find the pool closed, so discard the operation. It usually doesn't
happen, but with a few combination of IO operation it does happen: see
https://github.com/psycopg/psycopg/issues/230 for details.
Daniele Varrazzo [Sat, 19 Feb 2022 14:39:37 +0000 (15:39 +0100)]
test(pool): add test to show deadlock on logging
This deadlock seems reproducible at least on Python 3.8 and 3.10 on
Linux. It is caused by the logging statement in
``MaintenanceTask.__init__``: even just a ``print()`` there causes the lock.
See https://github.com/psycopg/psycopg/issues/230 for more details.