This allows to run the test suite with -Werror.
self.failUnless(con.InternalError is drv.InternalError)
self.failUnless(con.ProgrammingError is drv.ProgrammingError)
self.failUnless(con.NotSupportedError is drv.NotSupportedError)
+ con.close()
def test_commit(self):
con.rollback()
except self.driver.NotSupportedError:
pass
+ con.close()
def test_cursor(self):
con = self._connect()
with pytest.raises(ValueError):
p.putconn(conn)
+ conn.close()
+
def test_putconn_wrong_pool(dsn):
with pool.ConnectionPool(dsn, min_size=1) as p1:
with pytest.raises(ValueError):
await p.putconn(conn)
+ await conn.close()
+
async def test_putconn_wrong_pool(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1) as p1:
assert cur.fetchone() == ("hellogb",)
cur = conn.execute("select %t", [MyStr("hello")])
assert cur.fetchone() == ("hellogt",)
+ conn.close()
def test_dump_connection_ctx(conn):
assert cur.fetchone() == ("hellogt",)
cur = conn.cursor(binary=True).execute("select 'hello'::text")
assert cur.fetchone() == ("hellogb",)
+ conn.close()
def test_load_connection_ctx(conn):
nconn.cursor().execute("notify foo, '1'")
time.sleep(0.25)
nconn.cursor().execute("notify foo, '2'")
+ nconn.close()
conn.autocommit = True
conn.cursor().execute("listen foo")
conn.execute("select 1")
t1 = time.time()
assert 0.3 < t1 - t0 < 0.6
+
+ conn.close()
+ conn2.close()
await aconn.execute("select 1")
t1 = time.time()
assert 0.3 < t1 - t0 < 0.6
+
+ await aconn.close()
+ await conn2.close()
conn = Connection.connect(dsn)
assert not conn.closed
assert conn.pgconn.status == conn.ConnStatus.OK
+ conn.close()
def test_connect_str_subclass(dsn):
conn = Connection.connect(MyString(dsn))
assert not conn.closed
assert conn.pgconn.status == conn.ConnStatus.OK
+ conn.close()
def test_connect_bad():
def test_autocommit_connect(dsn):
conn = Connection.connect(dsn, autocommit=True)
assert conn.autocommit
+ conn.close()
def test_autocommit_intrans(conn):
yield
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
- psycopg.Connection.connect(*args, **kwargs)
+ conn = psycopg.Connection.connect(*args, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
+ conn.close()
@pytest.mark.parametrize(
def test_row_factory(dsn):
defaultconn = Connection.connect(dsn)
assert defaultconn.row_factory is tuple_row # type: ignore[comparison-overlap]
+ defaultconn.close()
conn = Connection.connect(dsn, row_factory=my_row_factory)
assert conn.row_factory is my_row_factory # type: ignore[comparison-overlap]
cur3 = conn.execute("select 'vale'")
r = cur3.fetchone()
assert r and r == ("vale",) # type: ignore[comparison-overlap]
+ conn.close()
def test_str(conn):
assert cur.fetchone()[0] == "hellot" # type: ignore[index]
cur = conn.execute("select %b", ["hello"])
assert cur.fetchone()[0] == "hellob" # type: ignore[index]
+ conn.close()
def test_connect_context_copy(dsn, conn):
assert cur.fetchone()[0] == "hellot" # type: ignore[index]
cur = conn2.execute("select %b", ["hello"])
assert cur.fetchone()[0] == "hellob" # type: ignore[index]
+ conn2.close()
conn = await AsyncConnection.connect(dsn)
assert not conn.closed
assert conn.pgconn.status == conn.ConnStatus.OK
+ await conn.close()
async def test_connect_bad():
conn = await AsyncConnection.connect(MyString(dsn))
assert not conn.closed
assert conn.pgconn.status == conn.ConnStatus.OK
+ await conn.close()
@pytest.mark.slow
async def test_autocommit_connect(dsn):
aconn = await psycopg.AsyncConnection.connect(dsn, autocommit=True)
assert aconn.autocommit
+ await aconn.close()
async def test_autocommit_intrans(aconn):
yield
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
- await psycopg.AsyncConnection.connect(*args, **kwargs)
+ conn = await psycopg.AsyncConnection.connect(*args, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
+ await conn.close()
@pytest.mark.parametrize(
async def test_row_factory(dsn):
defaultconn = await AsyncConnection.connect(dsn)
assert defaultconn.row_factory is tuple_row # type: ignore[comparison-overlap]
+ await defaultconn.close()
conn = await AsyncConnection.connect(dsn, row_factory=my_row_factory)
assert conn.row_factory is my_row_factory # type: ignore[comparison-overlap]
cur3 = await conn.execute("select 'vale'")
r = await cur3.fetchone()
assert r and r == ("vale",) # type: ignore[comparison-overlap]
+ await conn.close()
async def test_str(aconn):
assert (await cur.fetchone())[0] == "hellot" # type: ignore[index]
cur = await conn.execute("select %b", ["hello"])
assert (await cur.fetchone())[0] == "hellob" # type: ignore[index]
+ await conn.close()
async def test_connect_context_copy(dsn, aconn):
assert (await cur.fetchone())[0] == "hellot" # type: ignore[index]
cur = await aconn2.execute("select %b", ["hello"])
assert (await cur.fetchone())[0] == "hellob" # type: ignore[index]
+ await aconn2.close()
@pytest.mark.skipif(sys.platform != "win32", reason="windows only test")
conn = psycopg.connect(dsn)
assert conn.info.parameter_status("client_encoding") == out
assert conn.info.encoding == codec
+ conn.close()
def test_set_encoding_unsupported(self, conn):
cur = conn.cursor()
monkeypatch.setattr(psycopg.connection, "connect", mock_connect)
- psycopg.connect(*args, **kwargs)
+ conn = psycopg.connect(*args, **kwargs)
assert got_conninfo == want_conninfo
+ conn.close()
def test_version(mypy):
yield
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
- psycopg.connect(*args, **kwargs)
+ conn = psycopg.connect(*args, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
+ conn.close()
@pytest.mark.parametrize(
cur.execute("select generate_series(1, 3) as bar")
assert cur.fetchall() == [(1,), (2,), (3,)]
assert cur.name == "1-2-3"
+ cur.close()
def test_repr(conn):
cur = conn.cursor("my-name")
assert "ServerCursor" in repr(cur)
assert "my-name" in repr(cur)
+ cur.close()
def test_connection(conn):
cur = conn.cursor("foo")
assert cur.connection is conn
+ cur.close()
def test_description(conn):
assert cur.description[0].name == "bar"
assert cur.description[0].type_code == cur.adapters.types["int4"].oid
assert cur.pgresult.ntuples == 0
+ cur.close()
def test_format(conn):
cur = conn.cursor("foo")
assert cur.format == Format.TEXT
+ cur.close()
cur = conn.cursor("foo", binary=True)
assert cur.format == Format.BINARY
+ cur.close()
def test_query_params(conn):
assert cur.fetchone() == (2,)
assert cur.pgresult.fformat(0) == 1
assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+ cur.close()
def test_execute_binary(conn):
assert cur.fetchone() == (1,)
assert cur.pgresult.fformat(0) == 0
assert cur.pgresult.get_value(0, 0) == b"1"
+ cur.close()
def test_binary_cursor_text_override(conn):
assert cur.fetchone() == (1,)
assert cur.pgresult.fformat(0) == 1
assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ cur.close()
def test_close(conn, recwarn, retries):
cur = conn.cursor("foo")
with pytest.raises(e.ProgrammingError):
cur.execute(stmt)
+ cur.close()
def test_executemany(conn):
cur = conn.cursor("foo")
with pytest.raises(e.NotSupportedError):
cur.executemany("select %s", [(1,), (2,)])
+ cur.close()
def test_fetchone(conn):
cur.scroll(0, "absolute")
cur.row_factory = dict_row
assert cur.fetchone() == {"x": 1}
+ cur.close()
def test_rownumber(conn):
assert cur.rownumber == 12
cur.fetchall()
assert cur.rownumber == 42
+ cur.close()
def test_iter(conn):
assert cur.scrollable is None
with pytest.raises(e.ProgrammingError):
cur.scroll(0)
+ cur.close()
def test_scroll(conn):
with pytest.raises(ValueError):
cur.scroll(9, mode="wat")
+ cur.close()
def test_scrollable(conn):
curs.scroll(-1)
assert i == curs.fetchone()[0]
curs.scroll(-1)
+ curs.close()
def test_non_scrollable(conn):
curs.scroll(5)
with pytest.raises(e.OperationalError):
curs.scroll(-1)
+ curs.close()
@pytest.mark.parametrize("kwargs", [{}, {"withhold": False}])
assert cur2.fetchone() == (1,)
assert cur2.fetchmany(3) == [(2,), (3,), (4,)]
assert cur2.fetchall() == [(5,), (6,)]
+ cur2.close()
def test_stolen_cursor_close(conn):
await cur.execute("select generate_series(1, 3) as bar")
assert await cur.fetchall() == [(1,), (2,), (3,)]
assert cur.name == "1-2-3"
+ await cur.close()
async def test_repr(aconn):
cur = aconn.cursor("my-name")
assert "AsyncServerCursor" in repr(cur)
assert "my-name" in repr(cur)
+ await cur.close()
async def test_connection(aconn):
cur = aconn.cursor("foo")
assert cur.connection is aconn
+ await cur.close()
async def test_description(aconn):
assert cur.description[0].name == "bar"
assert cur.description[0].type_code == cur.adapters.types["int4"].oid
assert cur.pgresult.ntuples == 0
+ await cur.close()
async def test_format(aconn):
cur = aconn.cursor("foo")
assert cur.format == Format.TEXT
+ await cur.close()
cur = aconn.cursor("foo", binary=True)
assert cur.format == Format.BINARY
+ await cur.close()
async def test_query_params(aconn):
assert (await cur.fetchone()) == (2,)
assert cur.pgresult.fformat(0) == 1
assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
+ await cur.close()
async def test_execute_binary(aconn):
assert (await cur.fetchone()) == (1,)
assert cur.pgresult.fformat(0) == 0
assert cur.pgresult.get_value(0, 0) == b"1"
+ await cur.close()
async def test_binary_cursor_text_override(aconn):
assert (await cur.fetchone()) == (1,)
assert cur.pgresult.fformat(0) == 1
assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x01"
+ await cur.close()
async def test_close(aconn, recwarn, retries):
cur = aconn.cursor("foo")
with pytest.raises(e.ProgrammingError):
await cur.execute(stmt)
+ await cur.close()
async def test_executemany(aconn):
cur = aconn.cursor("foo")
with pytest.raises(e.NotSupportedError):
await cur.executemany("select %s", [(1,), (2,)])
+ await cur.close()
async def test_fetchone(aconn):
await cur.scroll(0, "absolute")
cur.row_factory = dict_row
assert await cur.fetchone() == {"x": 1}
+ await cur.close()
async def test_rownumber(aconn):
assert cur.rownumber == 12
await cur.fetchall()
assert cur.rownumber == 42
+ await cur.close()
async def test_iter(aconn):
assert cur.scrollable is None
with pytest.raises(e.ProgrammingError):
await cur.scroll(0)
+ await cur.close()
async def test_scroll(aconn):
with pytest.raises(ValueError):
await cur.scroll(9, mode="wat")
+ await cur.close()
async def test_scrollable(aconn):
await curs.scroll(-1)
assert i == (await curs.fetchone())[0]
await curs.scroll(-1)
+ await curs.close()
async def test_non_scrollable(aconn):
await curs.scroll(5)
with pytest.raises(e.OperationalError):
await curs.scroll(-1)
+ await curs.close()
@pytest.mark.parametrize("kwargs", [{}, {"withhold": False}])
assert await cur2.fetchone() == (1,)
assert await cur2.fetchmany(3) == [(2,), (3,), (4,)]
assert await cur2.fetchall() == [(5,), (6,)]
+ await cur2.close()
async def test_stolen_cursor_close(aconn):
cur = conn.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
assert cur.fetchone() == (None, {}, {"a": "b"})
+ conn.close()
ab = list(map(chr, range(32, 128)))