hp = HostPort(host=host, port=port)
out.append(hp)
- if srv_found:
- return out
- else:
- return []
+ return out if srv_found else []
def _resolve_srv(self, hp: HostPort) -> List[HostPort]:
try:
async def _resolve_srv_async(self, hp: HostPort) -> List[HostPort]:
try:
- ans = resolver.resolve(hp.host, "SRV")
+ ans = await async_resolver.resolve(hp.host, "SRV")
except DNSException:
ans = ()
return self._get_solved_entries(hp, ans)
@pytest.mark.asyncio
@pytest.mark.parametrize("conninfo, want, env", samples_ok)
-async def test_srv_async(conninfo, want, env, fake_srv, retries, monkeypatch):
+async def test_srv_async(conninfo, want, env, afake_srv, retries, monkeypatch):
if env:
for k, v in env.items():
monkeypatch.setenv(k, v)
@pytest.mark.asyncio
@pytest.mark.parametrize("conninfo, env", samples_bad)
-async def test_srv_bad_async(conninfo, env, fake_srv, monkeypatch):
+async def test_srv_bad_async(conninfo, env, afake_srv, monkeypatch):
if env:
for k, v in env.items():
monkeypatch.setenv(k, v)
@pytest.fixture
def fake_srv(monkeypatch):
+ f = get_fake_srv_function(monkeypatch)
+ monkeypatch.setattr(psycopg._dns.resolver, "resolve", f)
+
+
+@pytest.fixture
+def afake_srv(monkeypatch):
+ f = get_fake_srv_function(monkeypatch)
+
+ async def af(qname, rdtype):
+ return f(qname, rdtype)
+
+ monkeypatch.setattr(psycopg._dns.async_resolver, "resolve", af)
+
+
+def get_fake_srv_function(monkeypatch):
import_dnspython()
from dns.rdtypes.IN.A import A
return rv
- async def afake_srv_(qname, rdtype):
- return fake_srv(qname, rdtype)
-
- monkeypatch.setattr(psycopg._dns.resolver, "resolve", fake_srv_)
- monkeypatch.setattr(psycopg._dns.async_resolver, "resolve", afake_srv_)
+ return fake_srv_