# Copyright (C) 2020 The Psycopg Team
import re
+import sys
import queue
import struct
import asyncio
# Each buffer should be around BUFFER_SIZE size.
QUEUE_SIZE = 1024
+# On certain systems, memmove seems particularly slow and flushing often is
+# more performing than accumulating a larger buffer. See #746 for details.
+PREFER_FLUSH = sys.platform == "darwin"
+
class BaseCopy(Generic[ConnectionType]):
"""
if len(data) <= MAX_BUFFER_SIZE:
# Most used path: we don't need to split the buffer in smaller
# bits, so don't make a copy.
- self.connection.wait(copy_to(self._pgconn, data))
+ self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
else:
# Copy a buffer too large in chunks to avoid causing a memory
# error in the libpq, which may cause an infinite loop (#255).
for i in range(0, len(data), MAX_BUFFER_SIZE):
self.connection.wait(
- copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+ copy_to(
+ self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+ )
)
def finish(self, exc: Optional[BaseException] = None) -> None:
data = self._queue.get(block=True, timeout=24 * 60 * 60)
if not data:
break
- self.connection.wait(copy_to(self._pgconn, data))
+ self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
except BaseException as ex:
# Propagate the error to the main thread.
self._worker_error = ex
if len(data) <= MAX_BUFFER_SIZE:
# Most used path: we don't need to split the buffer in smaller
# bits, so don't make a copy.
- await self.connection.wait(copy_to(self._pgconn, data))
+ await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
else:
# Copy a buffer too large in chunks to avoid causing a memory
# error in the libpq, which may cause an infinite loop (#255).
for i in range(0, len(data), MAX_BUFFER_SIZE):
await self.connection.wait(
- copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+ copy_to(
+ self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+ )
)
async def finish(self, exc: Optional[BaseException] = None) -> None:
return result
-def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
+def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]:
# Retry enqueuing data until successful.
#
# WARNING! This can cause an infinite loop if the buffer is too large. (see
while pgconn.put_copy_data(buffer) == 0:
yield WAIT_W
- # Repeat until it the message is flushed to the server
- while True:
+ # Flushing often has a good effect on macOS because memcpy operations
+ # seem expensive on this platform so accumulating a large buffer has a
+ # bad effect (see #745).
+ if flush:
+ # Repeat until it the message is flushed to the server
while True:
- ready = yield WAIT_W
- if ready:
+ while True:
+ ready = yield WAIT_W
+ if ready:
+ break
+ f = pgconn.flush()
+ if f == 0:
break
- f = pgconn.flush()
- if f == 0:
- break
def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: