# should have a lock and hold it before calling and consuming them.
@classmethod
- def _connect_gen(cls, conninfo: str = "") -> PQGenConn[Self]:
+ def _connect_gen(
+ cls, conninfo: str = "", *, timeout: float = 0.0
+ ) -> PQGenConn[Self]:
"""Generator to connect to the database and create a new instance."""
- pgconn = yield from generators.connect(conninfo)
+ pgconn = yield from generators.connect(conninfo, timeout=timeout)
conn = cls(pgconn)
return conn
from . import pq
from . import errors as e
from . import waiting
-from .abc import AdaptContext, ConnDict, ConnParam, Params, PQGen, PQGenConn, Query, RV
+from .abc import AdaptContext, ConnDict, ConnParam, Params, PQGen, Query, RV
from ._tpc import Xid
from .rows import Row, RowFactory, tuple_row, args_row
from .adapt import AdaptersMap
for attempt in attempts:
try:
conninfo = make_conninfo("", **attempt)
- rv = cls._wait_conn(cls._connect_gen(conninfo), timeout=timeout)
- break
+ gen = cls._connect_gen(conninfo, timeout=timeout)
+ rv = waiting.wait_conn(gen, timeout=_WAIT_INTERVAL)
except e._NO_TRACEBACK as ex:
if len(attempts) > 1:
logger.debug(
str(ex),
)
last_ex = ex
+ else:
+ break
if not rv:
assert last_ex
pass # as expected
raise
- @classmethod
- def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
- """Consume a connection generator."""
- return waiting.wait_conn(gen, timeout)
-
def _set_autocommit(self, value: bool) -> None:
self.set_autocommit(value)
from . import pq
from . import errors as e
from . import waiting
-from .abc import AdaptContext, ConnDict, ConnParam, Params, PQGen, PQGenConn, Query, RV
+from .abc import AdaptContext, ConnDict, ConnParam, Params, PQGen, Query, RV
from ._tpc import Xid
from .rows import Row, AsyncRowFactory, tuple_row, args_row
from .adapt import AdaptersMap
for attempt in attempts:
try:
conninfo = make_conninfo("", **attempt)
- rv = await cls._wait_conn(cls._connect_gen(conninfo), timeout=timeout)
- break
+ gen = cls._connect_gen(conninfo, timeout=timeout)
+ rv = await waiting.wait_conn_async(gen, timeout=_WAIT_INTERVAL)
except e._NO_TRACEBACK as ex:
if len(attempts) > 1:
logger.debug(
str(ex),
)
last_ex = ex
+ else:
+ break
if not rv:
assert last_ex
pass # as expected
raise
- @classmethod
- async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
- """Consume a connection generator."""
- return await waiting.wait_conn_async(gen, timeout)
-
def _set_autocommit(self, value: bool) -> None:
if True: # ASYNC
self._no_set_async("autocommit")
# Copyright (C) 2020 The Psycopg Team
import logging
+from time import monotonic
from typing import List, Optional, Union
from . import pq
logger = logging.getLogger(__name__)
-def _connect(conninfo: str) -> PQGenConn[PGconn]:
+def _connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[PGconn]:
"""
Generator to create a database connection without blocking.
-
"""
+ deadline = monotonic() + timeout if timeout else 0.0
+
conn = pq.PGconn.connect_start(conninfo.encode())
while True:
if conn.status == BAD:
)
status = conn.connect_poll()
- if status == POLL_OK:
+
+ if status == POLL_READING or status == POLL_WRITING:
+ wait = WAIT_R if status == POLL_READING else WAIT_W
+ while True:
+ ready = yield conn.socket, wait
+ if deadline and monotonic() > deadline:
+ raise e.ConnectionTimeout("connection timeout expired")
+ if ready:
+ break
+
+ elif status == POLL_OK:
break
- elif status == POLL_READING:
- yield conn.socket, WAIT_R
- elif status == POLL_WRITING:
- yield conn.socket, WAIT_W
elif status == POLL_FAILED:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
if not timeout:
timeout = None
with DefaultSelector() as sel:
+ sel.register(fileno, s)
while True:
- sel.register(fileno, s)
rlist = sel.select(timeout=timeout)
- sel.unregister(fileno)
if not rlist:
- raise e.ConnectionTimeout("connection timeout expired")
+ gen.send(READY_NONE)
+ continue
+
+ sel.unregister(fileno)
ready = rlist[0][1]
fileno, s = gen.send(ready)
+ sel.register(fileno, s)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
loop.add_writer(fileno, wakeup, READY_W)
try:
if timeout:
- await wait_for(ev.wait(), timeout)
+ try:
+ await wait_for(ev.wait(), timeout)
+ except TimeoutError:
+ pass
else:
await ev.wait()
finally:
loop.remove_writer(fileno)
fileno, s = gen.send(ready)
- except TimeoutError:
- raise e.ConnectionTimeout("connection timeout expired")
-
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
def get_loader(self, oid: int, format: pq.Format) -> abc.Loader: ...
# Generators
-def connect(conninfo: str) -> abc.PQGenConn[PGconn]: ...
+def connect(conninfo: str, *, timeout: float = 0.0) -> abc.PQGenConn[PGconn]: ...
def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
def send(pgconn: PGconn) -> abc.PQGen[None]: ...
def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
from cpython.object cimport PyObject_CallFunctionObjArgs
from typing import List
+from time import monotonic
from psycopg import errors as e
from psycopg.pq import abc, error_message
cdef int READY_W = Ready.W
cdef int READY_RW = Ready.RW
-def connect(conninfo: str) -> PQGenConn[abc.PGconn]:
+def connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[abc.PGconn]:
"""
Generator to create a database connection without blocking.
-
"""
+ cdef int deadline = monotonic() + timeout if timeout else 0.0
+
cdef pq.PGconn conn = pq.PGconn.connect_start(conninfo.encode())
cdef libpq.PGconn *pgconn_ptr = conn._pgconn_ptr
cdef int conn_status = libpq.PQstatus(pgconn_ptr)
cdef int poll_status
+ cdef object wait, ready
while True:
if conn_status == libpq.CONNECTION_BAD:
with nogil:
poll_status = libpq.PQconnectPoll(pgconn_ptr)
- if poll_status == libpq.PGRES_POLLING_OK:
+ if poll_status == libpq.PGRES_POLLING_READING \
+ or poll_status == libpq.PGRES_POLLING_WRITING:
+ wait = WAIT_R if poll_status == libpq.PGRES_POLLING_READING else WAIT_W
+ while True:
+ ready = yield (libpq.PQsocket(pgconn_ptr), wait)
+ if deadline and monotonic() > deadline:
+ raise e.ConnectionTimeout("connection timeout expired")
+ if ready:
+ break
+
+ elif poll_status == libpq.PGRES_POLLING_OK:
break
- elif poll_status == libpq.PGRES_POLLING_READING:
- yield (libpq.PQsocket(pgconn_ptr), WAIT_R)
- elif poll_status == libpq.PGRES_POLLING_WRITING:
- yield (libpq.PQsocket(pgconn_ptr), WAIT_W)
elif poll_status == libpq.PGRES_POLLING_FAILED:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
):
got_conninfo: str
- def fake_connect(conninfo):
+ def fake_connect(conninfo, *, timeout=0.0):
nonlocal got_conninfo
got_conninfo = conninfo
return pgconn
],
)
def test_connect_badargs(conn_cls, monkeypatch, pgconn, args, kwargs, exctype):
-
- def fake_connect(conninfo):
- return pgconn
- yield
-
- monkeypatch.setattr(psycopg.generators, "connect", fake_connect)
with pytest.raises(exctype):
conn_cls.connect(*args, **kwargs)
):
got_conninfo: str
- def fake_connect(conninfo):
+ def fake_connect(conninfo, *, timeout=0.0):
nonlocal got_conninfo
got_conninfo = conninfo
return pgconn
],
)
async def test_connect_badargs(aconn_cls, monkeypatch, pgconn, args, kwargs, exctype):
- def fake_connect(conninfo):
- return pgconn
- yield
-
- monkeypatch.setattr(psycopg.generators, "connect", fake_connect)
with pytest.raises(exctype):
await aconn_cls.connect(*args, **kwargs)
got_conninfo: str
- def mock_connect(conninfo):
+ def mock_connect(conninfo, *, timeout):
nonlocal got_conninfo
got_conninfo = conninfo
- return orig_connect(dsn_env)
+ return orig_connect(dsn_env, timeout=timeout)
setpgenv({})
monkeypatch.setattr(psycopg.generators, "connect", mock_connect)
def test_connect_args(monkeypatch, pgconn, args, kwargs, want, setpgenv, fake_resolve):
got_conninfo: str
- def fake_connect(conninfo):
+ def fake_connect(conninfo, *, timeout=0.0):
nonlocal got_conninfo
got_conninfo = conninfo
return pgconn
],
)
def test_connect_badargs(monkeypatch, pgconn, args, kwargs, exctype):
- def fake_connect(conninfo):
- return pgconn
- yield
-
with pytest.raises(exctype):
psycopg.connect(*args, **kwargs)