# TODO: loaders don't need to be refreshed
cur.pgresult = res
- nrows = res.ntuples
- cur._pos += nrows
- return cur._tx.load_rows(0, nrows)
+ return cur._tx.load_rows(0, res.ntuples)
def _make_declare_statement(
self, query: Query, scrollable: bool, hold: bool
def fetchone(self) -> Optional[Sequence[Any]]:
with self._conn.lock:
recs = self._conn.wait(self._helper._fetch_gen(1))
- return recs[0] if recs else None
+ if recs:
+ self._pos += 1
+ return recs[0]
+ else:
+ return None
def fetchmany(self, size: int = 0) -> Sequence[Sequence[Any]]:
if not size:
size = self.arraysize
with self._conn.lock:
recs = self._conn.wait(self._helper._fetch_gen(size))
+ self._pos += len(recs)
return recs
def fetchall(self) -> Sequence[Sequence[Any]]:
with self._conn.lock:
recs = self._conn.wait(self._helper._fetch_gen(None))
+ self._pos += len(recs)
return recs
def __iter__(self) -> Iterator[Sequence[Any]]:
with self._conn.lock:
recs = self._conn.wait(self._helper._fetch_gen(self.itersize))
for rec in recs:
+ self._pos += 1
yield rec
if len(recs) < self.itersize:
break
async def fetchone(self) -> Optional[Sequence[Any]]:
async with self._conn.lock:
recs = await self._conn.wait(self._helper._fetch_gen(1))
- return recs[0] if recs else None
+ if recs:
+ self._pos += 1
+ return recs[0]
+ else:
+ return None
async def fetchmany(self, size: int = 0) -> Sequence[Sequence[Any]]:
if not size:
size = self.arraysize
async with self._conn.lock:
recs = await self._conn.wait(self._helper._fetch_gen(size))
+ self._pos += len(recs)
return recs
async def fetchall(self) -> Sequence[Sequence[Any]]:
async with self._conn.lock:
recs = await self._conn.wait(self._helper._fetch_gen(None))
+ self._pos += len(recs)
return recs
async def __aiter__(self) -> AsyncIterator[Sequence[Any]]:
self._helper._fetch_gen(self.itersize)
)
for rec in recs:
+ self._pos += 1
yield rec
if len(recs) < self.itersize:
break
+def test_funny_name(conn):
+ cur = conn.cursor("1-2-3")
+ cur.execute("select generate_series(1, 3) as bar")
+ assert cur.fetchall() == [(1,), (2,), (3,)]
+ assert cur.name == "1-2-3"
+
+
def test_description(conn):
cur = conn.cursor("foo")
assert cur.name == "foo"
assert recs == [(2,), (3,)]
+def test_iter_rownumber(conn):
+ with conn.cursor("foo") as cur:
+ cur.execute("select generate_series(1, %s) as bar", (3,))
+ for row in cur:
+ assert cur.rownumber == row[0]
+
+
def test_itersize(conn, commands):
with conn.cursor("foo") as cur:
assert cur.itersize == 100
pytestmark = pytest.mark.asyncio
+async def test_funny_name(aconn):
+ cur = await aconn.cursor("1-2-3")
+ await cur.execute("select generate_series(1, 3) as bar")
+ assert await cur.fetchall() == [(1,), (2,), (3,)]
+ assert cur.name == "1-2-3"
+
+
async def test_description(aconn):
cur = await aconn.cursor("foo")
assert cur.name == "foo"
assert recs == [(2,), (3,)]
+async def test_iter_rownumber(aconn):
+ async with await aconn.cursor("foo") as cur:
+ await cur.execute("select generate_series(1, %s) as bar", (3,))
+ async for row in cur:
+ assert cur.rownumber == row[0]
+
+
async def test_itersize(aconn, acommands):
async with await aconn.cursor("foo") as cur:
assert cur.itersize == 100