]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Fix description for cursors returning no column
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 4 Aug 2021 13:28:51 +0000 (14:28 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 5 Aug 2021 20:32:04 +0000 (21:32 +0100)
psycopg/psycopg/cursor.py
tests/test_cursor.py
tests/test_server_cursor.py
tests/test_server_cursor_async.py

index df48cd3ac1fc82f201924889448eab2eb613fd23..7ecd894fad9fc0915b1ccfd23d8e33e202b89cf7 100644 (file)
@@ -123,9 +123,18 @@ class BaseCursor(Generic[ConnectionType, Row]):
         `!None` if the current resultset didn't return tuples.
         """
         res = self.pgresult
-        if not (res and res.nfields):
+
+        # We return columns if we have nfields, but also if we don't but
+        # the query said we got tuples (mostly to handle the super useful
+        # query "SELECT ;"
+        if res and (
+            res.nfields
+            or res.status == ExecStatus.TUPLES_OK
+            or res.status == ExecStatus.SINGLE_TUPLE
+        ):
+            return [Column(self, i) for i in range(res.nfields)]
+        else:
             return None
-        return [Column(self, i) for i in range(res.nfields)]
 
     @property
     def rowcount(self) -> int:
index c1117898759e60d4965aafd6b222c7d3b242c750..597e56a42af3920422ba3ccde4f9bde25c98af9d 100644 (file)
@@ -290,6 +290,11 @@ def test_iter_stop(conn):
 
 def test_row_factory(conn):
     cur = conn.cursor(row_factory=my_row_factory)
+
+    cur.execute("reset search_path")
+    with pytest.raises(psycopg.ProgrammingError):
+        cur.fetchone()
+
     cur.execute("select 'foo' as bar")
     (r,) = cur.fetchone()
     assert r == "FOObar"
@@ -442,6 +447,12 @@ def test_stream_row_factory(conn):
     assert next(it).a == 2
 
 
+def test_stream_no_col(conn):
+    cur = conn.cursor()
+    it = iter(cur.stream("select"))
+    assert list(it) == [()]
+
+
 @pytest.mark.parametrize(
     "query",
     [
@@ -548,6 +559,11 @@ class TestColumn:
         unpickled = pickle.loads(pickled)
         assert [tuple(d) for d in description] == [tuple(d) for d in unpickled]
 
+    def test_no_col_query(self, conn):
+        cur = conn.execute("select")
+        assert cur.description == []
+        assert cur.fetchall() == [()]
+
 
 def test_str(conn):
     cur = conn.cursor()
index ecd61e9e84e21013b906b4f0b1d0d5514eff1157..dfd13b19a1f7b2930dbe82cbb349145b0225a4e9 100644 (file)
@@ -189,6 +189,13 @@ def test_nextset(conn):
         assert not cur.nextset()
 
 
+def test_no_result(conn):
+    with conn.cursor("foo") as cur:
+        cur.execute("select generate_series(1, %s) as bar where false", (3,))
+        assert len(cur.description) == 1
+        assert cur.fetchall() == []
+
+
 def test_row_factory(conn):
     n = 0
 
index af85b68e9731e4f62d37830fa956bd1501d32c7f..4b724af32c19be1e7225f23f70db7beeaa3146c9 100644 (file)
@@ -195,6 +195,15 @@ async def test_nextset(aconn):
         assert not cur.nextset()
 
 
+async def test_no_result(aconn):
+    async with aconn.cursor("foo") as cur:
+        await cur.execute(
+            "select generate_series(1, %s) as bar where false", (3,)
+        )
+        assert len(cur.description) == 1
+        assert (await cur.fetchall()) == []
+
+
 async def test_row_factory(aconn):
     n = 0