def __iter__(self) -> Iterator[Buffer]:
"""Implement block-by-block iteration on :sql:`COPY TO`."""
- while True:
- if not (data := self.read()):
- break
+ while data := self.read():
yield data
def read(self) -> Buffer:
Note that the records returned will be tuples of unparsed strings or
bytes, unless data types are specified using `set_types()`.
"""
- while True:
- if (record := self.read_row()) is None:
- break
+ while (record := self.read_row()) is not None:
yield record
def read_row(self) -> tuple[Any, ...] | None:
The function is designed to be run in a separate task.
"""
try:
- while True:
- if not (data := self._queue.get()):
- break
+ while data := self._queue.get():
self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
except BaseException as ex:
# Propagate the error to the main thread.
async def __aiter__(self) -> AsyncIterator[Buffer]:
"""Implement block-by-block iteration on :sql:`COPY TO`."""
- while True:
- if not (data := (await self.read())):
- break
+ while data := (await self.read()):
yield data
async def read(self) -> Buffer:
Note that the records returned will be tuples of unparsed strings or
bytes, unless data types are specified using `set_types()`.
"""
- while True:
- if (record := (await self.read_row())) is None:
- break
+ while (record := (await self.read_row())) is not None:
yield record
async def read_row(self) -> tuple[Any, ...] | None:
The function is designed to be run in a separate task.
"""
try:
- while True:
- if not (data := (await self._queue.get())):
- break
+ while data := (await self._queue.get()):
await self.connection.wait(
copy_to(self._pgconn, data, flush=PREFER_FLUSH)
)
def load(pos: int) -> Row | None:
return self._tx.load_row(pos, self._make_row)
- while True:
- if (row := load(self._pos)) is None:
- break
+ while (row := load(self._pos)) is not None:
self._pos += 1
yield row
def load(pos: int) -> Row | None:
return self._tx.load_row(pos, self._make_row)
- while True:
- if (row := load(self._pos)) is None:
- break
+ while (row := load(self._pos)) is not None:
self._pos += 1
yield row
After this generator has finished you may want to cycle using `fetch()`
to retrieve the results available.
"""
- while True:
- if pgconn.flush() == 0:
- break
+ while pgconn.flush() != 0:
- while True:
- if ready := (yield WAIT_RW):
- break
+ while not (ready := (yield WAIT_RW)):
+ continue
if ready & READY_R:
# This call may read notifies: they will be saved in the
Return a result from the database (whether success or error).
"""
if pgconn.is_busy():
- while True:
- if (yield WAIT_R):
- break
+ while not (yield WAIT_R):
+ continue
while True:
pgconn.consume_input()
if not pgconn.is_busy():
break
- while True:
- if (yield WAIT_R):
- break
+ while not (yield WAIT_R):
+ continue
_consume_notifies(pgconn)
results = []
while True:
- while True:
- if ready := (yield WAIT_RW):
- break
+ while not (ready := (yield WAIT_RW)):
+ continue
if ready & READY_R:
pgconn.consume_input()
def _consume_notifies(pgconn: PGconn) -> None:
# Consume notifies
- while True:
- if not (n := pgconn.notifies()):
- break
+ while n := pgconn.notifies():
if pgconn.notify_handler:
pgconn.notify_handler(n)
pgconn.consume_input()
ns = []
- while True:
- if n := pgconn.notifies():
- ns.append(n)
- if pgconn.notify_handler:
- pgconn.notify_handler(n)
- else:
- break
+ while n := pgconn.notifies():
+ ns.append(n)
+ if pgconn.notify_handler:
+ pgconn.notify_handler(n)
return ns
break
# would block
- while True:
- if (yield WAIT_R):
- break
+ while not (yield WAIT_R):
+ continue
pgconn.consume_input()
if nbytes > 0:
# into smaller ones. We prefer to do it there instead of here in order to
# do it upstream the queue decoupling the writer task from the producer one.
while pgconn.put_copy_data(buffer) == 0:
- while True:
- if (yield WAIT_W):
- break
+ while not (yield WAIT_W):
+ continue
# Flushing often has a good effect on macOS because memcpy operations
# seem expensive on this platform so accumulating a large buffer has a
if flush:
# Repeat until it the message is flushed to the server
while True:
- while True:
- if (yield WAIT_W):
- break
+ while not (yield WAIT_W):
+ continue
if pgconn.flush() == 0:
break
def copy_end(pgconn: PGconn, error: bytes | None) -> PQGen[PGresult]:
# Retry enqueuing end copy message until successful
while pgconn.put_copy_end(error) == 0:
- while True:
- if (yield WAIT_W):
- break
+ while not (yield WAIT_W):
+ continue
# Repeat until it the message is flushed to the server
while True:
- while True:
- if (yield WAIT_W):
- break
+ while not (yield WAIT_W):
+ continue
if pgconn.flush() == 0:
break
# send loop
waited_on_send = 0
- while True:
- if pgconn.flush() == 0:
- break
+ while pgconn.flush() != 0:
waited_on_send += 1
)
with cur.copy(query) as copy:
copy.set_types(["text"])
- while True:
- if not (row := copy.read_row()):
- break
+ while row := copy.read_row():
assert len(row) == 1
rows.append(row[0])
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
rows = []
- while True:
- if not (row := copy.read_row()):
- break
+ while row := copy.read_row():
rows.append(row)
ref = [tuple((py_to_raw(i, format) for i in record)) for record in sample_records]
copy.set_types(faker.types_names)
if method == "read":
- while True:
- if not copy.read():
- break
+ while copy.read():
+ pass
elif method == "iter":
list(copy)
elif method == "row":
- while True:
- if copy.read_row() is None:
- break
+ while copy.read_row() is not None:
+ pass
elif method == "rows":
list(copy.rows())
def blocks(self):
f = self.file()
- while True:
- if not (block := f.read(self.block_size)):
- break
+ while block := f.read(self.block_size):
yield block
def assert_data(self):
def sha(self, f):
m = hashlib.sha256()
- while True:
- if not (block := f.read()):
- break
+ while block := f.read():
if isinstance(block, str):
block = block.encode()
m.update(block)
)
async with cur.copy(query) as copy:
copy.set_types(["text"])
- while True:
- if not (row := (await copy.read_row())):
- break
+ while row := (await copy.read_row()):
assert len(row) == 1
rows.append(row[0])
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
rows = []
- while True:
- if not (row := (await copy.read_row())):
- break
+ while row := (await copy.read_row()):
rows.append(row)
ref = [tuple(py_to_raw(i, format) for i in record) for record in sample_records]
copy.set_types(faker.types_names)
if method == "read":
- while True:
- if not (await copy.read()):
- break
+ while await copy.read():
+ pass
elif method == "iter":
await alist(copy)
elif method == "row":
- while True:
- if (await copy.read_row()) is None:
- break
+ while (await copy.read_row()) is not None:
+ pass
elif method == "rows":
await alist(copy.rows())
def blocks(self):
f = self.file()
- while True:
- if not (block := f.read(self.block_size)):
- break
+ while block := f.read(self.block_size):
yield block
async def assert_data(self):
def sha(self, f):
m = hashlib.sha256()
- while True:
- if not (block := f.read()):
- break
+ while block := f.read():
if isinstance(block, str):
block = block.encode()
m.update(block)
cur.execute(faker.select_stmt)
if fetch == "one":
- while True:
- if cur.fetchone() is None:
- break
+ while cur.fetchone() is not None:
+ pass
elif fetch == "many":
- while True:
- if not cur.fetchmany(3):
- break
+ while cur.fetchmany(3):
+ pass
elif fetch == "all":
cur.fetchall()
elif fetch == "iter":
await cur.execute(faker.select_stmt)
if fetch == "one":
- while True:
- if (await cur.fetchone()) is None:
- break
+ while (await cur.fetchone()) is not None:
+ pass
elif fetch == "many":
- while True:
- if not (await cur.fetchmany(3)):
- break
+ while await cur.fetchmany(3):
+ pass
elif fetch == "all":
await cur.fetchall()
elif fetch == "iter":
cur.execute(faker.select_stmt)
if fetch == "one":
- while True:
- if cur.fetchone() is None:
- break
+ while cur.fetchone() is not None:
+ pass
elif fetch == "many":
- while True:
- if not cur.fetchmany(3):
- break
+ while cur.fetchmany(3):
+ pass
elif fetch == "all":
cur.fetchall()
elif fetch == "iter":
await cur.execute(faker.select_stmt)
if fetch == "one":
- while True:
- if (await cur.fetchone()) is None:
- break
+ while (await cur.fetchone()) is not None:
+ pass
elif fetch == "many":
- while True:
- if not (await cur.fetchmany(3)):
- break
+ while await cur.fetchmany(3):
+ pass
elif fetch == "all":
await cur.fetchall()
elif fetch == "iter":
cur.execute(ph(cur, faker.select_stmt))
if fetch == "one":
- while True:
- if cur.fetchone() is None:
- break
+ while cur.fetchone() is not None:
+ pass
elif fetch == "many":
- while True:
- if not cur.fetchmany(3):
- break
+ while cur.fetchmany(3):
+ pass
elif fetch == "all":
cur.fetchall()
elif fetch == "iter":
await cur.execute(ph(cur, faker.select_stmt))
if fetch == "one":
- while True:
- if (await cur.fetchone()) is None:
- break
+ while (await cur.fetchone()) is not None:
+ pass
elif fetch == "many":
- while True:
- if not (await cur.fetchmany(3)):
- break
+ while await cur.fetchmany(3):
+ pass
elif fetch == "all":
await cur.fetchall()
elif fetch == "iter":
cur.execute("select generate_series(1, 3) as x")
recs = cur.fetchall()
cur.scroll(0, "absolute")
- while True:
- if not (rec := cur.fetchone()):
- break
+ while rec := cur.fetchone():
recs.append(rec)
assert recs == [[1, -1], [1, -2], [1, -3]] * 2
await cur.execute("select generate_series(1, 3) as x")
recs = await cur.fetchall()
await cur.scroll(0, "absolute")
- while True:
- if not (rec := (await cur.fetchone())):
- break
+ while rec := (await cur.fetchone()):
recs.append(rec)
assert recs == [[1, -1], [1, -2], [1, -3]] * 2