It's not needed to force a results fetch when exiting a nested pipeline
as this would naturally happen either later in the outer pipeline or
eventually at the end of the outermost one. Rather, just do a Sync, as
it allows outer pipeline to resume normal execution in case the inner
one got into aborted state.
During previous refactorings, we made Pipeline.sync() also fetch results
from the server. But this somehow breaks the semantics of the
synchronization point as defined by Postgres because the user might be
interested in emitting Sync message as a way to solely close the current
series of queries in the pipeline: i.e., flush queries from client to
server and reset the pipeline error state. In this respect, the 'fetch'
step should be explicit.
BasePipeline._sync_gen() is changed to only emit a Sync and a new
_exit_gen() method is introduced doing what _sync_gen() previously did.
Accordingly, the warning emitted when calling this _exit_gen() at
pipeline exit is adjusted to say "terminating" instead of "syncing".
The details shown are only the communication bowels. So we go from:
Traceback (most recent call last):
File "trace.py", line 301, in <module>
conn.execute("select pg_sleep(0.2)")
File "/usr/lib/python3.8/contextlib.py", line 120, in __exit__
next(self.gen)
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 881, in pipeline
yield pipeline
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 197, in __exit__
self.sync()
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 183, in sync
self._conn.wait(self._sync_gen())
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 896, in wait
return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
File "/home/piro/dev/psycopg3/psycopg/psycopg/waiting.py", line 237, in wait_epoll
s = gen.send(ready)
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 92, in _sync_gen
yield from self._communicate_gen()
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 105, in _communicate_gen
self._process_results(queued, results)
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 149, in _process_results
raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
psycopg.errors.UndefinedColumn: column "foo" does not exist
LINE 1: select foo
^
to:
Traceback (most recent call last):
File "trace.py", line 301, in <module>
conn.execute("select pg_sleep(0.2)")
File "/usr/lib/python3.8/contextlib.py", line 120, in __exit__
next(self.gen)
File "/home/piro/dev/psycopg3/psycopg/psycopg/connection.py", line 881, in pipeline
yield pipeline
File "/home/piro/dev/psycopg3/psycopg/psycopg/_pipeline.py", line 205, in __exit__
raise exc2.with_traceback(None)
psycopg.errors.UndefinedColumn: column "foo" does not exist
LINE 1: select foo
^
which shows much more clearly that the traceback was raised at pipeline
block exit.
perf: avoid unnecessary recvfrom() in cursor.stream()
Call PQisBusy() before PQconsumeInput() on fetching results. If not
busy, don't call PQconsumeInput() at all but just go to fetching results
and notifications.
This is especially useful in single-row mode because most of the times
the libpq can produce several results after a single network fetch.
Previously we were calling PQconsumeInput() also when results were
already on the client and there was nothing new to read, which forced
the libpq to run a select() to tell apart a lack of data from an EOF,
see `the grumble`_, and caused the overhead reported in #286.
refactor(enum): rename python_type -> enum, enum_labels -> labels on EnumInfo
The previous name were lifted from the composite adapters. However, that
class would have had a more ambiguous "type" attribute, and "types" for
the fields, hence the decision of using "python_" and "fields_" prefix
to disambiguate.
In the enum adapters context, enum is not ambiguous, so a more natural
name seems preferred.
This removes the need for enums to be str-based. It also removes the
asymmetry whereby automatically generated enums were *not* string based,
but pure enums.
- New version scheme is "starting from PG 10", not "after PG 10" (also,
chunky bit moved from docstring to docs as for most other docs).
- The transaction status is of the session, not the server.
Daniele Varrazzo [Tue, 29 Mar 2022 23:57:06 +0000 (01:57 +0200)]
fix: consistent sync/exit and error management in pipeline contexts
Don't clobber an exception on exit of the nested block too. In order to
simplify the code, make the pipeline count the number of time it is
entered, and call _exit() only the last time it exits.
Drop assert that we have left pipeline mode leaving the block. If we get
in unrecoverable state, we will have not. By now we should probably just
close the connection; however, leaving it this way is a better
indication that the connection is broken because of something about
pipeline mode; closing it would hide it, and even if we raised a
warning, it would be much easier to miss it than to miss the exceptions
raised in broken state.
Daniele Varrazzo [Tue, 29 Mar 2022 23:54:56 +0000 (01:54 +0200)]
fix: forbid COPY in pipeline mode
COPY is not supported. Attempting it puts the connection in
unrecoverable state, with pipeline sync failing and pipeline exit
complaining that there are still results. So let's try to not get in
that state.
Daniele Varrazzo [Tue, 29 Mar 2022 20:03:49 +0000 (22:03 +0200)]
fix: restore sending a Sync on block exit but not on executemany end
It was removed a few commits ago, but as Denis suggests, it makes sense
to keep it. Reintroduce it (and test for it), but make sure executemany
doesn't send extra sync.
Daniele Varrazzo [Tue, 29 Mar 2022 12:29:11 +0000 (14:29 +0200)]
fix: make no-tuple results available after executemany returning
There could be useful info there, if the user asks for it. Now this
doesn't slow down the happy execute path, because the default of
returning is False.
Daniele Varrazzo [Tue, 29 Mar 2022 12:26:26 +0000 (14:26 +0200)]
fix: only flush the pipeline after executemany if returning
This has the side effect of breaking rowcount, as before, but can send
many executemany in the same pipeline.
`rowcount` is correct if `returning=True` is set instead, which is a
thing we can at least document, and makes sense: "if you want a result,
you flush the pipeline, dude".
Daniele Varrazzo [Tue, 29 Mar 2022 12:21:03 +0000 (14:21 +0200)]
refactor: rename pipeline.communicate() -> sync()
This has the async/sync interface, is is more apt to expose it as public
interface to say "call this to restore the state" (as the changed tests
does).
Expose the try/finally logic behind sync() as the _sync_gen() method,
which can be useful to call if sync has to be performed inside a lock.
Denis Laxalde [Tue, 29 Mar 2022 07:44:35 +0000 (09:44 +0200)]
fix: keep Cursor._execmany_returning set until reset
Cursor's _execmany_returning attribute is now initialized at _reset()
and set during _executemany_gen_pipeline(). This way, the attribute is
kept for further results fetch that may occur outside executemany()
context: namely, this is needed because _execmany_returning is used by
_set_results_from_pipeline() which would be called by fetch*() methods.
As a consequence, in _fetch_pipeline(), actual fetch is skipped when
coming from executemany(..., returning=False) as the pgresult would
never be set. This ensures a consistent behavior by raising "no result
available" when calling fetch*() from a non-returning executemany().
Denis Laxalde [Tue, 29 Mar 2022 07:07:44 +0000 (09:07 +0200)]
fix: sync and fetch nested pipelines when they exit
When running a nested pipeline, typically in executemany(), we now call
Pipeline.communicate() at exit similarly to a single (unnested) pipeline
but still keep the surrounding pipeline open (in contrast with unnested
one).
This resolves the issue around rowcount with returning executemany().
Accordingly, we check that rowcount is correct in test_executemany()
pipeline tests and drop the previous xfailed test.
Denis Laxalde [Tue, 29 Mar 2022 07:00:07 +0000 (09:00 +0200)]
test: add missing returning=True to executemany() in pipeline tests
Query in test_executemany() has a returning clause and we fetch results
from cursor after execution, so it seems the returning option to
executemany() is needed. These tests probably do not fail because of the
TODO in Cursor._set_results_from_pipeline().
Daniele Varrazzo [Mon, 28 Mar 2022 16:11:06 +0000 (18:11 +0200)]
fix: drop unneeded flush after executemany
As per @dlax analysis, it is not needed. It was part of an attempt to
fix the problem of rowcount broken when executemany is called from within
a pipeline block, but that doesn't work anyway.
Daniele Varrazzo [Sun, 27 Mar 2022 03:55:51 +0000 (05:55 +0200)]
perf: base executemany on pipeline
This changeset also fix several glitches of executemany() run in
pipeline mode, around the management of returned value and rowcount.
These glitches still appear if executemany() is run in an explicit
pipeline() block, because certain events only happen at (outermost)
pipeline block exit.
Daniele Varrazzo [Sun, 27 Mar 2022 15:01:11 +0000 (17:01 +0200)]
refactor: move pipeline finalisation code to Pipeline.__exit__
This code has more internal knowledge of the Pipeline object than the
Connection object.
For some reason I don't understand, had to declare 'command_queue' and
'result_queue' types explicitly to the class definition. Leaving just the
definitions in '__init__()' causes mypy (0.940) to complain in 'cursor.py'.