# Copyright (C) 2020-2021 The Psycopg Team
from enum import IntEnum, auto
-from typing import Optional, Sequence, Tuple, TYPE_CHECKING
+from typing import Iterator, Optional, Sequence, Tuple, TYPE_CHECKING
from collections import OrderedDict
from .pq import ExecStatus
+from ._compat import Deque
from ._queries import PostgresQuery
if TYPE_CHECKING:
# Counter to generate prepared statements names
self._prepared_idx = 0
+ self._maint_commands = Deque[bytes]()
+
@staticmethod
def key(query: PostgresQuery) -> Key:
return (query.query, query.types)
def _should_discard(
self, prep: Prepare, results: Sequence["PGresult"]
- ) -> Optional[bytes]:
+ ) -> bool:
"""Check if we need to discard our entire state: it should happen on
rollback or on dropping objects, because the same object may get
recreated and postgres would fail internal lookups.
cmdstat.startswith(b"DROP ") or cmdstat == b"ROLLBACK"
):
return self.clear()
- return None
+ return False
@staticmethod
def _check_results(results: Sequence["PGresult"]) -> bool:
return True
- def _rotate(self) -> Optional[bytes]:
+ def _rotate(self) -> None:
"""Evict an old value from the cache.
If it was prepared, deallocate it. Do it only once: if the cache was
if len(self._names) > self.prepared_max:
name = self._names.popitem(last=False)[1]
- return b"DEALLOCATE " + name
- else:
- return None
+ self._maint_commands.append(b"DEALLOCATE " + name)
def maybe_add_to_cache(
self, query: PostgresQuery, prep: Prepare, name: bytes
prep: Prepare,
name: bytes,
results: Sequence["PGresult"],
- ) -> Optional[bytes]:
+ ) -> None:
"""Validate cached entry with 'key' by checking query 'results'.
Possibly return a command to perform maintainance on database side.
Note: this method is only called in pipeline mode.
"""
- cmd = self._should_discard(prep, results)
- if cmd:
- return cmd
+ if self._should_discard(prep, results):
+ return
if not self._check_results(results):
self._names.pop(key, None)
self._counts.pop(key, None)
- return None
+ else:
+ self._rotate()
- return self._rotate()
+ def clear(self) -> bool:
+ """Clear the cache of the maintenance commands.
- def clear(self) -> Optional[bytes]:
+ Clear the internal state and prepare a command to clear the state of
+ the server.
+ """
if self._names:
self._names.clear()
- return b"DEALLOCATE ALL"
+ self._maint_commands.clear()
+ self._maint_commands.append(b"DEALLOCATE ALL")
+ return True
else:
- return None
+ return False
+
+ def get_maintenance_commands(self) -> Iterator[bytes]:
+ """
+ Iterate over the commands needed to align the server state to our state
+ """
+ while self._maint_commands:
+ yield self._maint_commands.popleft()
self._execute_results(results)
self._last_query = query
+ for cmd in self._conn._prepared.get_maintenance_commands():
+ yield from self._conn._exec_command(cmd)
+
def _executemany_gen(
self, query: Query, params_seq: Iterable[Params]
) -> PQGen[None]:
self._last_query = query
+ for cmd in self._conn._prepared.get_maintenance_commands():
+ yield from self._conn._exec_command(cmd)
+
def _maybe_prepare_gen(
self,
pgq: PostgresQuery,
results = yield from execute(self._pgconn)
# Update the prepare state of the query.
- # If an operation requires to flush our prepared statements cache, do it.
+ # If an operation requires to flush our prepared statements cache,
+ # it will be added to the maintenance commands to execute later.
key = self._conn._prepared.maybe_add_to_cache(pgq, prep, name)
if key is not None:
- cmd = self._conn._prepared.validate(key, prep, name, results)
- if cmd:
- yield from self._conn._exec_command(cmd)
+ self._conn._prepared.validate(key, prep, name, results)
return results