import datetime
import logging
import logging.handlers
+import re
from sqlalchemy import BigInteger
from sqlalchemy import bindparam
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import TypeDecorator
+from sqlalchemy.dialects.postgresql import asyncpg as asyncpg_dialect
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import HSTORE
from sqlalchemy.dialects.postgresql import JSONB
eq_(cargs, [])
eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
+ def test_psycopg2_disconnect(self):
+ class Error(Exception):
+ pass
+
+ dbapi = mock.Mock()
+ dbapi.Error = Error
+
+ dialect = psycopg2_dialect.dialect(dbapi=dbapi)
+
+ for error in [
+ # these error messages from libpq: interfaces/libpq/fe-misc.c
+ # and interfaces/libpq/fe-secure.c.
+ "terminating connection",
+ "closed the connection",
+ "connection not open",
+ "could not receive data from server",
+ "could not send data to server",
+ # psycopg2 client errors, psycopg2/connection.h,
+ # psycopg2/cursor.h
+ "connection already closed",
+ "cursor already closed",
+ # not sure where this path is originally from, it may
+ # be obsolete. It really says "losed", not "closed".
+ "losed the connection unexpectedly",
+ # these can occur in newer SSL
+ "connection has been closed unexpectedly",
+ "SSL error: decryption failed or bad record mac",
+ "SSL SYSCALL error: Bad file descriptor",
+ "SSL SYSCALL error: EOF detected",
+ "SSL SYSCALL error: Operation timed out",
+ "SSL SYSCALL error: Bad address",
+ ]:
+ eq_(dialect.is_disconnect(Error(error), None, None), True)
+
+ eq_(dialect.is_disconnect("not an error", None, None), False)
+
+
+class MultiHostConnectTest(fixtures.TestBase):
+ def working_combinations():
+ psycopg_combinations = [
+ (
+ "postgresql+psycopg2://USER:PASS@/DB?host=hostA",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA",
+ },
+ ),
+ (
+ "postgresql+psycopg2://USER:PASS@/DB?host=hostA:",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA",
+ },
+ ),
+ (
+ "postgresql+psycopg2://USER:PASS@/DB?host=hostA:1234",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA",
+ "port": "1234",
+ },
+ ),
+ (
+ "postgresql+psycopg2://USER:PASS@/DB"
+ "?host=hostA&host=hostB&host=hostC",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA,hostB,hostC",
+ "port": ",,",
+ },
+ ),
+ (
+ "postgresql+psycopg2://USER:PASS@/DB"
+ "?host=hostA&host=hostB:222&host=hostC:333",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA,hostB,hostC",
+ "port": ",222,333",
+ },
+ ),
+ (
+ "postgresql+psycopg2://USER:PASS@/DB?"
+ "host=hostA:111&host=hostB:222&host=hostC:333",
+ {
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ "host": "hostA,hostB,hostC",
+ "port": "111,222,333",
+ },
+ ),
+ (
+ "postgresql+psycopg2:///"
+ "?host=hostA:111&host=hostB:222&host=hostC:333",
+ {"host": "hostA,hostB,hostC", "port": "111,222,333"},
+ ),
+ (
+ "postgresql+psycopg2:///"
+ "?host=hostA:111&host=hostB:222&host=hostC:333",
+ {"host": "hostA,hostB,hostC", "port": "111,222,333"},
+ ),
+ (
+ "postgresql+psycopg2:///"
+ "?host=hostA,hostB,hostC&port=111,222,333",
+ {"host": "hostA,hostB,hostC", "port": "111,222,333"},
+ ),
+ (
+ "postgresql+asyncpg://USER:PASS@/DB"
+ "?host=hostA,hostB,&port=111,222,333",
+ {
+ "host": "hostA,hostB,",
+ "port": "111,222,333",
+ "dbname": "DB",
+ "user": "USER",
+ "password": "PASS",
+ },
+ ),
+ ]
+ for url_string, expected_psycopg in psycopg_combinations:
+ expected_asyncpg = dict(expected_psycopg)
+ if "dbname" in expected_asyncpg:
+ expected_asyncpg["database"] = expected_asyncpg.pop("dbname")
+ if "host" in expected_asyncpg:
+ expected_asyncpg["host"] = expected_asyncpg["host"].split(",")
+ if "port" in expected_asyncpg:
+ expected_asyncpg["port"] = [
+ int(p) if re.match(r"^\d+$", p) else None
+ for p in expected_psycopg["port"].split(",")
+ ]
+ yield url_string, expected_psycopg, expected_asyncpg
+
+ @testing.combinations_list(
+ working_combinations(),
+ argnames="url_string,expected_psycopg,expected_asyncpg",
+ )
+ @testing.combinations(
+ psycopg2_dialect.dialect(),
+ psycopg_dialect.dialect(),
+ asyncpg_dialect.dialect(),
+ argnames="dialect",
+ )
+ def test_multi_hosts(
+ self, dialect, url_string, expected_psycopg, expected_asyncpg
+ ):
+ url_string = url_string.replace("psycopg2", dialect.driver)
+
+ u = url.make_url(url_string)
+
+ if dialect.driver == "asyncpg":
+ if (
+ "port" in expected_asyncpg
+ and not all(expected_asyncpg["port"])
+ or (
+ "host" in expected_asyncpg
+ and isinstance(expected_asyncpg["host"], list)
+ and "port" not in expected_asyncpg
+ )
+ ):
+ with expect_raises_message(
+ exc.ArgumentError,
+ "All ports are required to be present"
+ " for asyncpg multiple host URL",
+ ):
+ dialect.create_connect_args(u)
+ return
+ elif "host" in expected_asyncpg and not all(
+ expected_asyncpg["host"]
+ ):
+ with expect_raises_message(
+ exc.ArgumentError,
+ "All hosts are required to be present"
+ " for asyncpg multiple host URL",
+ ):
+ dialect.create_connect_args(u)
+ return
+ expected = expected_asyncpg
+ else:
+ expected = expected_psycopg
+
+ cargs, cparams = dialect.create_connect_args(u)
+ eq_(cparams, expected)
+ eq_(cargs, [])
+
@testing.combinations(
- (
- "postgresql+psycopg2://USER:PASS@/DB?host=hostA",
- {
- "dbname": "DB",
- "user": "USER",
- "password": "PASS",
- "host": "hostA",
- },
- ),
(
"postgresql+psycopg2://USER:PASS@/DB"
- "?host=hostA&host=hostB&host=hostC",
- {
- "dbname": "DB",
- "user": "USER",
- "password": "PASS",
- "host": "hostA,hostB,hostC",
- "port": ",,",
- },
+ "?host=hostA:111&host=hostB:vvv&host=hostC:333",
),
(
"postgresql+psycopg2://USER:PASS@/DB"
- "?host=hostA&host=hostB:portB&host=hostC:portC",
- {
- "dbname": "DB",
- "user": "USER",
- "password": "PASS",
- "host": "hostA,hostB,hostC",
- "port": ",portB,portC",
- },
+ "?host=hostA,hostB:,hostC&port=111,vvv,333",
),
(
- "postgresql+psycopg2://USER:PASS@/DB?"
- "host=hostA:portA&host=hostB:portB&host=hostC:portC",
- {
- "dbname": "DB",
- "user": "USER",
- "password": "PASS",
- "host": "hostA,hostB,hostC",
- "port": "portA,portB,portC",
- },
+ "postgresql+psycopg2://USER:PASS@/DB"
+ "?host=hostA:xyz&host=hostB:123",
),
+ ("postgresql+psycopg2://USER:PASS@/DB?host=hostA:xyz",),
+ ("postgresql+psycopg2://USER:PASS@/DB?host=hostA&port=xyz",),
+ argnames="url_string",
+ )
+ @testing.combinations(
+ psycopg2_dialect.dialect(),
+ psycopg_dialect.dialect(),
+ asyncpg_dialect.dialect(),
+ argnames="dialect",
+ )
+ def test_non_int_port_disallowed(self, dialect, url_string):
+ url_string = url_string.replace("psycopg2", dialect.driver)
+
+ u = url.make_url(url_string)
+
+ with expect_raises_message(
+ exc.ArgumentError,
+ r"Received non-integer port arguments: \((?:'.*?',?)+\)",
+ ):
+ dialect.create_connect_args(u)
+
+ @testing.combinations(
+ ("postgresql+psycopg2://USER:PASS@hostfixed/DB?port=111",),
+ ("postgresql+psycopg2://USER:PASS@hostfixed/DB?host=hostA:111",),
(
- "postgresql+psycopg2:///"
- "?host=hostA:portA&host=hostB:portB&host=hostC:portC",
- {"host": "hostA,hostB,hostC", "port": "portA,portB,portC"},
+ "postgresql+psycopg2://USER:PASS@hostfixed/DB"
+ "?host=hostA&port=111",
),
+ ("postgresql+psycopg2://USER:PASS@hostfixed/DB" "?host=hostA",),
+ argnames="url_string",
+ )
+ @testing.combinations(
+ psycopg2_dialect.dialect(),
+ psycopg_dialect.dialect(),
+ asyncpg_dialect.dialect(),
+ argnames="dialect",
+ )
+ def test_dont_use_fixed_host(self, dialect, url_string):
+ url_string = url_string.replace("psycopg2", dialect.driver)
+
+ u = url.make_url(url_string)
+ with expect_raises_message(
+ exc.ArgumentError,
+ "Can't combine fixed host and multihost URL formats",
+ ):
+ dialect.create_connect_args(u)
+
+ @testing.combinations(
(
- "postgresql+psycopg2:///"
- "?host=hostA:portA&host=hostB:portB&host=hostC:portC",
- {"host": "hostA,hostB,hostC", "port": "portA,portB,portC"},
+ "postgresql+psycopg2://USER:PASS@/DB"
+ "?host=hostA,hostC&port=111,222,333",
),
+ ("postgresql+psycopg2://USER:PASS@/DB" "?host=hostA&port=111,222",),
+ ("postgresql+psycopg2://USER:PASS@/DB?port=111",),
(
- "postgresql+psycopg2:///"
- "?host=hostA,hostB,hostC&port=portA,portB,portC",
- {"host": "hostA,hostB,hostC", "port": "portA,portB,portC"},
+ "postgresql+asyncpg://USER:PASS@/DB"
+ "?host=hostA,hostB,hostC&port=111,333",
),
- argnames="url_string,expected",
+ argnames="url_string",
)
@testing.combinations(
psycopg2_dialect.dialect(),
psycopg_dialect.dialect(),
+ asyncpg_dialect.dialect(),
argnames="dialect",
)
- def test_psycopg_multi_hosts(self, dialect, url_string, expected):
+ def test_num_host_port_doesnt_match(self, dialect, url_string):
+ url_string = url_string.replace("psycopg2", dialect.driver)
+
u = url.make_url(url_string)
- cargs, cparams = dialect.create_connect_args(u)
- eq_(cargs, [])
- eq_(cparams, expected)
+
+ with expect_raises_message(
+ exc.ArgumentError, "number of hosts and ports don't match"
+ ):
+ dialect.create_connect_args(u)
@testing.combinations(
"postgresql+psycopg2:///?host=H&host=H&port=5432,5432",
@testing.combinations(
psycopg2_dialect.dialect(),
psycopg_dialect.dialect(),
+ asyncpg_dialect.dialect(),
argnames="dialect",
)
- def test_psycopg_no_mix_hosts(self, dialect, url_string):
+ def test_dont_mix_multihost_formats(self, dialect, url_string):
+ url_string = url_string.replace("psycopg2", dialect.name)
+
+ u = url.make_url(url_string)
+
with expect_raises_message(
exc.ArgumentError, "Can't mix 'multihost' formats together"
):
- u = url.make_url(url_string)
dialect.create_connect_args(u)
- def test_psycopg2_disconnect(self):
- class Error(Exception):
- pass
-
- dbapi = mock.Mock()
- dbapi.Error = Error
-
- dialect = psycopg2_dialect.dialect(dbapi=dbapi)
-
- for error in [
- # these error messages from libpq: interfaces/libpq/fe-misc.c
- # and interfaces/libpq/fe-secure.c.
- "terminating connection",
- "closed the connection",
- "connection not open",
- "could not receive data from server",
- "could not send data to server",
- # psycopg2 client errors, psycopg2/connection.h,
- # psycopg2/cursor.h
- "connection already closed",
- "cursor already closed",
- # not sure where this path is originally from, it may
- # be obsolete. It really says "losed", not "closed".
- "losed the connection unexpectedly",
- # these can occur in newer SSL
- "connection has been closed unexpectedly",
- "SSL error: decryption failed or bad record mac",
- "SSL SYSCALL error: Bad file descriptor",
- "SSL SYSCALL error: EOF detected",
- "SSL SYSCALL error: Operation timed out",
- "SSL SYSCALL error: Bad address",
- ]:
- eq_(dialect.is_disconnect(Error(error), None, None), True)
-
- eq_(dialect.is_disconnect("not an error", None, None), False)
-
class BackendDialectTest(fixtures.TestBase):
__backend__ = True
- @testing.only_on(["+psycopg", "+psycopg2"])
+ @testing.only_on(["+psycopg", "+psycopg2", "+asyncpg"])
@testing.combinations(
- "host=H:P&host=H:P&host=H:P",
- "host=H:P&host=H&host=H",
- "host=H:P&host=H&host=H:P",
- "host=H&host=H:P&host=H",
- "host=H,H,H&port=P,P,P",
+ ("postgresql+D://U:PS@/DB?host=H:P&host=H:P&host=H:P", True),
+ ("postgresql+D://U:PS@/DB?host=H:P&host=H&host=H", False),
+ ("postgresql+D://U:PS@/DB?host=H:P&host=H&host=H:P", False),
+ ("postgresql+D://U:PS@/DB?host=H&host=H:P&host=H", False),
+ ("postgresql+D://U:PS@/DB?host=H,H,H&port=P,P,P", True),
+ ("postgresql+D://U:PS@H:P/DB", True),
+ argnames="pattern,has_all_ports",
)
- def test_connect_psycopg_multiple_hosts(self, pattern):
- """test the fix for #4392"""
+ def test_multiple_host_real_connect(
+ self, testing_engine, pattern, has_all_ports
+ ):
+ """test the fix for #4392.
+
+ Additionally add multiple host tests for #10004's additional
+ use cases
+
+ """
tdb_url = testing.db.url
host = "localhost"
port = str(tdb_url.port) if tdb_url.port else "5432"
- query_str = pattern.replace("H", host).replace("P", port)
url_string = (
- f"{tdb_url.drivername}://{tdb_url.username}:"
- f"{tdb_url.password}@/{tdb_url.database}?{query_str}"
+ pattern.replace("DB", tdb_url.database)
+ .replace("postgresql+D", tdb_url.drivername)
+ .replace("U", tdb_url.username)
+ .replace("PS", tdb_url.password)
+ .replace("H", host)
+ .replace("P", port)
)
- e = create_engine(url_string)
+ if testing.against("+asyncpg") and not has_all_ports:
+ with expect_raises_message(
+ exc.ArgumentError,
+ "All ports are required to be present "
+ "for asyncpg multiple host URL",
+ ):
+ testing_engine(url_string)
+ return
+
+ e = testing_engine(url_string)
with e.connect() as conn:
eq_(conn.exec_driver_sql("select 1").scalar(), 1)