It's not needed to force a results fetch when exiting a nested pipeline
as this would naturally happen either later in the outer pipeline or
eventually at the end of the outermost one. Rather, just do a Sync, as
it allows outer pipeline to resume normal execution in case the inner
one got into aborted state.
During previous refactorings, we made Pipeline.sync() also fetch results
from the server. But this somehow breaks the semantics of the
synchronization point as defined by Postgres because the user might be
interested in emitting Sync message as a way to solely close the current
series of queries in the pipeline: i.e., flush queries from client to
server and reset the pipeline error state. In this respect, the 'fetch'
step should be explicit.
BasePipeline._sync_gen() is changed to only emit a Sync and a new
_exit_gen() method is introduced doing what _sync_gen() previously did.
Accordingly, the warning emitted when calling this _exit_gen() at
pipeline exit is adjusted to say "terminating" instead of "syncing".
The details shown are only the communication bowels. So we go from:
Traceback (most recent call last):
File "trace.py", line 301, in <module>
conn.execute("select pg_sleep(0.2)")
File "/usr/lib/python3.8/contextlib.py", line 120, in __exit__
next(self.gen)
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 881, in pipeline
yield pipeline
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 197, in __exit__
self.sync()
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 183, in sync
self._conn.wait(self._sync_gen())
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 896, in wait
return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
File "/home/piro/dev/psycopg3/psycopg/psycopg/waiting.py", line 237, in wait_epoll
s = gen.send(ready)
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 92, in _sync_gen
yield from self._communicate_gen()
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 105, in _communicate_gen
self._process_results(queued, results)
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 149, in _process_results
raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
psycopg.errors.UndefinedColumn: column "foo" does not exist
LINE 1: select foo
^
to:
Traceback (most recent call last):
File "trace.py", line 301, in <module>
conn.execute("select pg_sleep(0.2)")
File "/usr/lib/python3.8/contextlib.py", line 120, in __exit__
next(self.gen)
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 881, in pipeline
yield pipeline
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 205, in __exit__
raise exc2.with_traceback(None)
psycopg.errors.UndefinedColumn: column "foo" does not exist
LINE 1: select foo
^
which shows much more clearly that the traceback was raised at pipeline
block exit.
perf: avoid unnecessary recvfrom() in cursor.stream()
Call PQisBusy() before PQconsumeInput() on fetching results. If not
busy, don't call PQconsumeInput() at all but just go to fetching results
and notifications.
This is especially useful in single-row mode because most of the times
the libpq can produce several results after a single network fetch.
Previously we were calling PQconsumeInput() also when results were
already on the client and there was nothing new to read, which forced
the libpq to run a select() to tell apart a lack of data from an EOF,
see `the grumble`_, and caused the overhead reported in #286.
refactor(enum): rename python_type -> enum, enum_labels -> labels on EnumInfo
The previous name were lifted from the composite adapters. However, that
class would have had a more ambiguous "type" attribute, and "types" for
the fields, hence the decision of using "python_" and "fields_" prefix
to disambiguate.
In the enum adapters context, enum is not ambiguous, so a more natural
name seems preferred.
This removes the need for enums to be str-based. It also removes the
asymmetry whereby automatically generated enums were *not* string based,
but pure enums.
- New version scheme is "starting from PG 10", not "after PG 10" (also,
chunky bit moved from docstring to docs as for most other docs).
- The transaction status is of the session, not the server.
Daniele Varrazzo [Tue, 29 Mar 2022 23:57:06 +0000 (01:57 +0200)]
fix: consistent sync/exit and error management in pipeline contexts
Don't clobber an exception on exit of the nested block too. In order to
simplify the code, make the pipeline count the number of time it is
entered, and call _exit() only the last time it exits.
Drop assert that we have left pipeline mode leaving the block. If we get
in unrecoverable state, we will have not. By now we should probably just
close the connection; however, leaving it this way is a better
indication that the connection is broken because of something about
pipeline mode; closing it would hide it, and even if we raised a
warning, it would be much easier to miss it than to miss the exceptions
raised in broken state.
Daniele Varrazzo [Tue, 29 Mar 2022 23:54:56 +0000 (01:54 +0200)]
fix: forbid COPY in pipeline mode
COPY is not supported. Attempting it puts the connection in
unrecoverable state, with pipeline sync failing and pipeline exit
complaining that there are still results. So let's try to not get in
that state.
Daniele Varrazzo [Tue, 29 Mar 2022 20:03:49 +0000 (22:03 +0200)]
fix: restore sending a Sync on block exit but not on executemany end
It was removed a few commits ago, but as Denis suggests, it makes sense
to keep it. Reintroduce it (and test for it), but make sure executemany
doesn't send extra sync.
Daniele Varrazzo [Tue, 29 Mar 2022 12:29:11 +0000 (14:29 +0200)]
fix: make no-tuple results available after executemany returning
There could be useful info there, if the user asks for it. Now this
doesn't slow down the happy execute path, because the default of
returning is False.
Daniele Varrazzo [Tue, 29 Mar 2022 12:26:26 +0000 (14:26 +0200)]
fix: only flush the pipeline after executemany if returning
This has the side effect of breaking rowcount, as before, but can send
many executemany in the same pipeline.
`rowcount` is correct if `returning=True` is set instead, which is a
thing we can at least document, and makes sense: "if you want a result,
you flush the pipeline, dude".
Daniele Varrazzo [Tue, 29 Mar 2022 12:21:03 +0000 (14:21 +0200)]
refactor: rename pipeline.communicate() -> sync()
This has the async/sync interface, is is more apt to expose it as public
interface to say "call this to restore the state" (as the changed tests
does).
Expose the try/finally logic behind sync() as the _sync_gen() method,
which can be useful to call if sync has to be performed inside a lock.
Denis Laxalde [Tue, 29 Mar 2022 07:44:35 +0000 (09:44 +0200)]
fix: keep Cursor._execmany_returning set until reset
Cursor's _execmany_returning attribute is now initialized at _reset()
and set during _executemany_gen_pipeline(). This way, the attribute is
kept for further results fetch that may occur outside executemany()
context: namely, this is needed because _execmany_returning is used by
_set_results_from_pipeline() which would be called by fetch*() methods.
As a consequence, in _fetch_pipeline(), actual fetch is skipped when
coming from executemany(..., returning=False) as the pgresult would
never be set. This ensures a consistent behavior by raising "no result
available" when calling fetch*() from a non-returning executemany().
Denis Laxalde [Tue, 29 Mar 2022 07:07:44 +0000 (09:07 +0200)]
fix: sync and fetch nested pipelines when they exit
When running a nested pipeline, typically in executemany(), we now call
Pipeline.communicate() at exit similarly to a single (unnested) pipeline
but still keep the surrounding pipeline open (in contrast with unnested
one).
This resolves the issue around rowcount with returning executemany().
Accordingly, we check that rowcount is correct in test_executemany()
pipeline tests and drop the previous xfailed test.
Denis Laxalde [Tue, 29 Mar 2022 07:00:07 +0000 (09:00 +0200)]
test: add missing returning=True to executemany() in pipeline tests
Query in test_executemany() has a returning clause and we fetch results
from cursor after execution, so it seems the returning option to
executemany() is needed. These tests probably do not fail because of the
TODO in Cursor._set_results_from_pipeline().
Daniele Varrazzo [Mon, 28 Mar 2022 16:11:06 +0000 (18:11 +0200)]
fix: drop unneeded flush after executemany
As per @dlax analysis, it is not needed. It was part of an attempt to
fix the problem of rowcount broken when executemany is called from within
a pipeline block, but that doesn't work anyway.
Daniele Varrazzo [Sun, 27 Mar 2022 03:55:51 +0000 (05:55 +0200)]
perf: base executemany on pipeline
This changeset also fix several glitches of executemany() run in
pipeline mode, around the management of returned value and rowcount.
These glitches still appear if executemany() is run in an explicit
pipeline() block, because certain events only happen at (outermost)
pipeline block exit.
Daniele Varrazzo [Sun, 27 Mar 2022 15:01:11 +0000 (17:01 +0200)]
refactor: move pipeline finalisation code to Pipeline.__exit__
This code has more internal knowledge of the Pipeline object than the
Connection object.
For some reason I don't understand, had to declare 'command_queue' and
'result_queue' types explicitly to the class definition. Leaving just the
definitions in '__init__()' causes mypy (0.940) to complain in 'cursor.py'.
Denis Laxalde [Mon, 11 Oct 2021 15:16:39 +0000 (17:16 +0200)]
Add support for pipeline mode in execute()/fetch*()
When activated on the connection, a pipeline active pipeline handles
a queue of commands to send and a queue of results to process.
The command queue simply contains Callable[[], None], which are built
from partial applications of pgconn.send_*() methods et al.
The queue of results to process either contains a None, when respective
command returns no tuple, or a tuple with respective cursor and query
information needed to maintain automatic prepared statement.
Everywhere we run the execute() generator in non-pipeline mode, we now
enqueue items in the pipeline queues. Then we run
pipeline_communicate(), through the _communicate_gen() method of
BasePipeline, in BaseCursor._execute(many)_gen().
Since pipeline_communicate() may not fetch all results, we need a
dedicated fetch (forced) step upon call to cursor.fetch*(); this is done
by Cursor._fetch_pipeline() called in fetch*() methods. This calls
PQsendFlushRequest() in order to avoid blocking on PQgetResult().
At exit of pipeline mode, we unconditionally emit a PQpipelineSync()
call in order to restore the connection in a usable state in case of
error and we force results fetch after sending any pending commands
(e.g. commands not emitted through an execute() call).
The pipeline-demo.py test script is updated to include examples using
the high-level API. This only works with the 'python' of libpq bindings
because we monkeypatch the pgconn attribute of the connection.
Denis Laxalde [Thu, 2 Dec 2021 14:13:08 +0000 (15:13 +0100)]
Avoid multiple commands in transaction code
In pipeline mode, command strings containing multiple SQL commands are
disallowed so we remove all such usages from transaction code.
Accordingly, all generator methods in transaction classes now do not
return anything (the result was not used previously anyways).
In tests, the 'commands' list defined in patch_exec() is now filled by
appending instead of inserting so that we keep the natural order of
commands in assertions.
Denis Laxalde [Mon, 11 Oct 2021 15:16:39 +0000 (17:16 +0200)]
Handle pipeline-mode errors when checking results
In BaseCursor._check_result(), we now handle FATAL_ERROR and (the
pipeline-mode specific error) PIPELINE_ABORTED in preparation for
supporting the pipeline mode where we'd check results upon fetch*()
instead of upon execute() currently.
Similarly, BaseCursor._raise_for_result() handles PIPELINE_ABORTED
result status.
Denis Laxalde [Mon, 11 Oct 2021 09:35:06 +0000 (11:35 +0200)]
Add pipeline_communicate() generator
This generator will be used as the send operations during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state. Queries to be
sent are in a queue of Callable[[], None] which are built from partial
application of pgconn.send_query() and similar functions.
We add a couple of unit tests, in test_generators.py along with a test
script is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. Typically, we'd
get:
enter pipeline
sent BEGIN TRANSACTION
sent DROP TABLE IF EXISTS pq_pipeline_demo
sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare'
sent prepared 'prepare' with [b'10000', b'4611686018427387904']
sent prepared 'prepare' with [b'9999', b'4611686018427387904']
sent prepared 'prepare' with [b'9998', b'4611686018427387904']
sent prepared 'prepare' with [b'9997', b'4611686018427387904']
sent prepared 'prepare' with [b'9996', b'4611686018427387904']
sent prepared 'prepare' with [b'9995', b'4611686018427387904']
sent prepared 'prepare' with [b'9994', b'4611686018427387904']
sent prepared 'prepare' with [b'9993', b'4611686018427387904']
sent prepared 'prepare' with [b'9992', b'4611686018427387904']
...
sent prepared 'prepare' with [b'9690', b'4611686018427387904']
sent prepared 'prepare' with [b'9689', b'4611686018427387904']
sent prepared 'prepare' with [b'9688', b'4611686018427387904']
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
sent prepared 'prepare' with [b'9687', b'4611686018427387904']
sent prepared 'prepare' with [b'9686', b'4611686018427387904']
sent prepared 'prepare' with [b'9685', b'4611686018427387904']
sent prepared 'prepare' with [b'9684', b'4611686018427387904']
sent prepared 'prepare' with [b'9683', b'4611686018427387904']
...
sent prepared 'prepare' with [b'2', b'4611686018427387904']
sent prepared 'prepare' with [b'1', b'4611686018427387904']
sent COMMIT
pipeline sync sent
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
got PIPELINE_SYNC results
exit pipeline
We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.