+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'pool_async.py'
+# DO NOT CHANGE! Change the original file instead.
"""
-psycopg synchronous connection pool
+Psycopg connection pool module.
"""
# Copyright (C) 2021 The Psycopg Team
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, cast, Dict, Generic, Iterator, List
+from typing import Any, Iterator, cast, Dict, Generic, List
from typing import Optional, overload, Sequence, Type, TypeVar
from weakref import ref
from contextlib import contextmanager
from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import ConnectionAttempt, BasePool
-from .sched import Scheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
from ._acompat import current_thread_name
+from .sched import Scheduler
+
logger = logging.getLogger("psycopg.pool")
conninfo: str = "",
*,
open: bool = True,
- connection_class: Type[CT] = cast(Type[CT], Connection[TupleRow]),
+ connection_class: Type[CT] = cast(Type[CT], Connection),
configure: Optional[ConnectionCB[CT]] = None,
reset: Optional[ConnectionCB[CT]] = None,
kwargs: Optional[Dict[str, Any]] = None,
self._reconnect_failed = reconnect_failed
+ # If these are asyncio objects, make sure to create them on open
+ # to attach them to the right loop.
self._lock: Lock
self._sched: Scheduler
self._tasks: Queue[MaintenanceTask]
)
if open:
- self.open()
+ self._open()
def __del__(self) -> None:
# If the '_closed' property is not set we probably failed in __init__.
logger.info("waiting for pool %r initialization", self.name)
if not self._pool_full_event.wait(timeout):
- self.close() # stop all the threads
- raise PoolTimeout(
- f"pool initialization incomplete after {timeout} sec"
- ) from None
+ self.close() # stop all the tasks
+ raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")
with self._lock:
assert self._pool_full_event
self._stats[self._REQUESTS_ERRORS] += 1
raise TooManyRequests(
f"the pool {self.name!r} has already"
- f" {len(self._waiting)} requests waiting"
+ + f" {len(self._waiting)} requests waiting"
)
return conn
def _maybe_grow_pool(self) -> None:
- # Allow only one thread at time to grow the pool (or returning
+ # Allow only one task at time to grow the pool (or returning
# connections might be starved).
if self._nconns >= self._max_size or self._growing:
return
if self._maybe_close_connection(conn):
return
- # Use a worker to perform eventual maintenance work in a separate thread
+ # Use a worker to perform eventual maintenance work in a separate task
if self._reset:
self.run_task(ReturnConnection(self, conn))
else:
because the pool was initialized with *open* = `!True`) but you cannot
currently re-open a closed pool.
"""
+ # Make sure the lock is created after there is an event loop
self._ensure_lock()
with self._lock:
# A lock has been most likely, but not necessarily, created in `open()`.
self._ensure_lock()
+ # Create these objects now to attach them to the right loop.
+ # See #219
self._tasks = Queue()
self._sched = Scheduler()
self._start_initial_tasks()
def _ensure_lock(self) -> None:
+ """Make sure the pool lock is created.
+
+ In async code, also make sure that the loop is running.
+ """
+
try:
self._lock
except AttributeError:
# Schedule a task to shrink the pool if connections over min_size have
# remained unused.
- self.schedule_task(ShrinkPool(self), self.max_idle)
+ self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
def close(self, timeout: float = 5.0) -> None:
"""Close the pool and make it unavailable to new clients.
# Stop the scheduler
self._sched.enter(0, None)
- # Stop the worker threads
- workers, self._workers = self._workers[:], []
- for i in range(len(workers)):
+ # Stop the worker tasks
+ (workers, self._workers) = (self._workers[:], [])
+ for _ in workers:
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
for conn in connections:
conn.close()
- # Wait for the worker threads to terminate
+ # Wait for the worker tasks to terminate
assert self._sched_runner is not None
- sched_runner, self._sched_runner = self._sched_runner, None
+ (sched_runner, self._sched_runner) = (self._sched_runner, None)
gather(sched_runner, *workers, timeout=timeout)
def __enter__(self: _Self) -> _Self:
def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
"""Change the size of the pool during runtime."""
- min_size, max_size = self._check_size(min_size, max_size)
+ (min_size, max_size) = self._check_size(min_size, max_size)
ngrow = max(0, min_size - self._min_size)
logger.info(
- "resizing %r to min_size=%s max_size=%s",
- self.name,
- min_size,
- max_size,
+ "resizing %r to min_size=%s max_size=%s", self.name, min_size, max_size
)
with self._lock:
self._min_size = min_size
self._reconnect_failed(self)
def run_task(self, task: MaintenanceTask) -> None:
- """Run a maintenance task in a worker thread."""
+ """Run a maintenance task in a worker."""
self._tasks.put_nowait(task)
def schedule_task(self, task: MaintenanceTask, delay: float) -> None:
- """Run a maintenance task in a worker thread in the future."""
+ """Run a maintenance task in a worker in the future."""
self._sched.enter(delay, task.tick)
@classmethod
def worker(cls, q: Queue[MaintenanceTask]) -> None:
"""Runner to execute pending maintenance task.
- The function is designed to run as a separate thread.
+ The function is designed to run as a task.
Block on the queue *q*, run a task received. Finish running if a
StopWorker is received.
task = q.get()
if isinstance(task, StopWorker):
- logger.debug("terminating working thread %s", current_thread_name())
+ logger.debug("terminating working task %s", current_thread_name())
return
# Run the task. Make sure don't die in the attempt.
task.run()
except Exception as ex:
logger.warning(
- "task run %s failed: %s: %s",
- task,
- ex.__class__.__name__,
- ex,
+ "task run %s failed: %s: %s", task, ex.__class__.__name__, ex
)
def _connect(self, timeout: Optional[float] = None) -> CT:
sname = TransactionStatus(status).name
raise e.ProgrammingError(
f"connection left in status {sname} by configure function"
- f" {self._configure}: discarded"
+ + f" {self._configure}: discarded"
)
# Set an expiry date, with some randomness to avoid mass reconnection
else:
attempt.update_delay(now)
self.schedule_task(
- AddConnection(self, attempt, growing=growing),
- attempt.delay,
+ AddConnection(self, attempt, growing=growing), attempt.delay
)
return
else:
# No client waiting for a connection: put it back into the pool
self._pool.append(conn)
-
# If we have been asked to wait for pool init, notify the
# waiter if the pool is full.
if self._pool_full_event and len(self._pool) >= self._min_size:
status = conn.pgconn.transaction_status
if status == TransactionStatus.IDLE:
pass
-
elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
# Connection returned with an active transaction
logger.warning("rolling back returned connection: %s", conn)
conn,
)
conn.close()
-
elif status == TransactionStatus.ACTIVE:
# Connection returned during an operation. Bad... just close it.
logger.warning("closing returned connection: %s", conn)
sname = TransactionStatus(status).name
raise e.ProgrammingError(
f"connection left in status {sname} by reset function"
- f" {self._reset}: discarded"
+ + f" {self._reset}: discarded"
)
except Exception as ex:
logger.warning(f"error resetting connection: {ex}")
if to_close:
logger.info(
"shrinking pool %r to %s because %s unused connections"
- " in the last %s sec",
+ + " in the last %s sec",
self.name,
self._nconns,
nconns_min,
def run(self) -> None:
"""Run the task.
- This usually happens in a worker thread. Call the concrete _run()
+ This usually happens in a worker. Call the concrete _run()
implementation, if the pool is still alive.
"""
pool = self.pool()
def tick(self) -> None:
"""Run the scheduled task
- This function is called by the scheduler thread. Use a worker to
+ This function is called by the scheduler task. Use a worker to
run the task for real in order to free the scheduler immediately.
"""
pool = self.pool()
class StopWorker(MaintenanceTask):
- """Signal the maintenance thread to terminate."""
+ """Signal the maintenance worker to terminate."""
def _run(self, pool: ConnectionPool[Any]) -> None:
pass
# the periodic run.
pool.schedule_task(self, pool.max_idle)
pool._shrink_pool()
+
+
+class Schedule(MaintenanceTask):
+ """Schedule a task in the pool scheduler.
+
+ This task is a trampoline to allow to use a sync call (pool.run_task)
+ to execute an async one (pool.schedule_task). It is pretty much no-op
+ in sync code.
+ """
+
+ def __init__(self, pool: ConnectionPool[Any], task: MaintenanceTask, delay: float):
+ super().__init__(pool)
+ self.task = task
+ self.delay = delay
+
+ def _run(self, pool: ConnectionPool[Any]) -> None:
+ pool.schedule_task(self.task, self.delay)
"""
-psycopg asynchronous connection pool
+Psycopg connection pool module.
"""
# Copyright (C) 2021 The Psycopg Team
from __future__ import annotations
-import asyncio
import logging
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncIterator, cast, Generic
-from typing import Dict, List, Optional, overload, Sequence, Type, TypeVar
+from typing import Any, AsyncIterator, cast, Dict, Generic, List
+from typing import Optional, overload, Sequence, Type, TypeVar
from weakref import ref
from contextlib import asynccontextmanager
from ._acompat import current_task_name
from .sched_async import AsyncScheduler
+if True: # ASYNC
+ import asyncio
+
logger = logging.getLogger("psycopg.pool")
self._reconnect_failed = reconnect_failed
- # asyncio objects, created on open to attach them to the right loop.
+ # If these are asyncio objects, make sure to create them on open
+ # to attach them to the right loop.
self._lock: ALock
self._sched: AsyncScheduler
self._tasks: AQueue[MaintenanceTask]
- self._waiting = Deque[AsyncClient[ACT]]()
+ self._waiting = Deque[WaitingClient[ACT]]()
# to notify that the pool is full
self._pool_full_event: Optional[AEvent] = None
if open:
self._open()
+ if False: # ASYNC
+
+ def __del__(self) -> None:
+ # If the '_closed' property is not set we probably failed in __init__.
+ # Don't try anything complicated as probably it won't work.
+ if getattr(self, "_closed", True):
+ return
+
+ self._stop_workers()
+
async def wait(self, timeout: float = 30.0) -> None:
"""
Wait for the pool to be full (with `min_size` connections) after creation.
if not conn:
# No connection available: put the client in the waiting queue
t0 = monotonic()
- pos: AsyncClient[ACT] = AsyncClient()
+ pos: WaitingClient[ACT] = WaitingClient()
self._waiting.append(pos)
self._stats[self._REQUESTS_QUEUED] += 1
self._stats[self._REQUESTS_ERRORS] += 1
raise TooManyRequests(
f"the pool {self.name!r} has already"
- f" {len(self._waiting)} requests waiting"
+ + f" {len(self._waiting)} requests waiting"
)
return conn
In async code, also make sure that the loop is running.
"""
-
- # Throw a RuntimeError if the pool is open outside a running loop.
- asyncio.get_running_loop()
+ if True: # ASYNC
+ # Throw a RuntimeError if the pool is open outside a running loop.
+ asyncio.get_running_loop()
try:
self._lock
def _start_workers(self) -> None:
self._sched_runner = aspawn(self._sched.run, name=f"{self.name}-scheduler")
+ assert not self._workers
for i in range(self.num_workers):
t = aspawn(self.worker, args=(self._tasks,), name=f"{self.name}-worker-{i}")
self._workers.append(t)
async def _stop_workers(
self,
- waiting_clients: Sequence[AsyncClient[ACT]] = (),
+ waiting_clients: Sequence[WaitingClient[ACT]] = (),
connections: Sequence[ACT] = (),
timeout: float | None = None,
) -> None:
# Stop the worker tasks
workers, self._workers = self._workers[:], []
- for w in workers:
+ for _ in workers:
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
ngrow = max(0, min_size - self._min_size)
logger.info(
- "resizing %r to min_size=%s max_size=%s",
- self.name,
- min_size,
- max_size,
+ "resizing %r to min_size=%s max_size=%s", self.name, min_size, max_size
)
async with self._lock:
self._min_size = min_size
if not self._reconnect_failed:
return
- if asyncio.iscoroutinefunction(self._reconnect_failed):
- await self._reconnect_failed(self)
+ if True: # ASYNC
+ if asyncio.iscoroutinefunction(self._reconnect_failed):
+ await self._reconnect_failed(self)
+ else:
+ self._reconnect_failed(self)
else:
self._reconnect_failed(self)
await task.run()
except Exception as ex:
logger.warning(
- "task run %s failed: %s: %s",
- task,
- ex.__class__.__name__,
- ex,
+ "task run %s failed: %s: %s", task, ex.__class__.__name__, ex
)
async def _connect(self, timeout: Optional[float] = None) -> ACT:
sname = TransactionStatus(status).name
raise e.ProgrammingError(
f"connection left in status {sname} by configure function"
- f" {self._configure}: discarded"
+ + f" {self._configure}: discarded"
)
# Set an expiry date, with some randomness to avoid mass reconnection
else:
attempt.update_delay(now)
await self.schedule_task(
- AddConnection(self, attempt, growing=growing),
- attempt.delay,
+ AddConnection(self, attempt, growing=growing), attempt.delay
)
return
sname = TransactionStatus(status).name
raise e.ProgrammingError(
f"connection left in status {sname} by reset function"
- f" {self._reset}: discarded"
+ + f" {self._reset}: discarded"
)
except Exception as ex:
logger.warning(f"error resetting connection: {ex}")
if to_close:
logger.info(
"shrinking pool %r to %s because %s unused connections"
- " in the last %s sec",
+ + " in the last %s sec",
self.name,
self._nconns,
nconns_min,
return rv
-class AsyncClient(Generic[ACT]):
+class WaitingClient(Generic[ACT]):
"""A position in a queue for a client waiting for a connection."""
__slots__ = ("conn", "error", "_cond")
self.conn: Optional[ACT] = None
self.error: Optional[BaseException] = None
- # The AsyncClient behaves in a way similar to an Event, but we need
+ # The WaitingClient behaves in a way similar to an Event, but we need
# to notify reliably the flagger that the waiter has "accepted" the
# message and it hasn't timed out yet, otherwise the pool may give a
# connection to a client that has already timed out getconn(), which
"""Schedule a task in the pool scheduler.
This task is a trampoline to allow to use a sync call (pool.run_task)
- to execute an async one (pool.schedule_task).
+ to execute an async one (pool.schedule_task). It is pretty much no-op
+ in sync code.
"""
def __init__(