With some benchmark, it seems that on Linux and Windows performances are worse.
See #746 for details.
- Fix excessive stripping of error message prefixes (:ticket:`#752`).
- Allow to specify the ``connect_timeout`` connection parameter as float
(:ticket:`#796`).
+- Improve COPY performance on macOS (:ticket:`#745`).
Current release
from . import pq
from . import errors as e
from ._compat import Self
-from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
+from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
from ._acompat import spawn, gather, Queue, Worker
if len(data) <= MAX_BUFFER_SIZE:
# Most used path: we don't need to split the buffer in smaller
# bits, so don't make a copy.
- self.connection.wait(copy_to(self._pgconn, data))
+ self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
else:
# Copy a buffer too large in chunks to avoid causing a memory
# error in the libpq, which may cause an infinite loop (#255).
for i in range(0, len(data), MAX_BUFFER_SIZE):
self.connection.wait(
- copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+ copy_to(
+ self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+ )
)
def finish(self, exc: BaseException | None = None) -> None:
data = self._queue.get()
if not data:
break
- self.connection.wait(copy_to(self._pgconn, data))
+ self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
except BaseException as ex:
# Propagate the error to the main thread.
self._worker_error = ex
from . import pq
from . import errors as e
from ._compat import Self
-from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
+from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
from ._acompat import aspawn, agather, AQueue, AWorker
if len(data) <= MAX_BUFFER_SIZE:
# Most used path: we don't need to split the buffer in smaller
# bits, so don't make a copy.
- await self.connection.wait(copy_to(self._pgconn, data))
+ await self.connection.wait(copy_to(self._pgconn, data, flush=PREFER_FLUSH))
else:
# Copy a buffer too large in chunks to avoid causing a memory
# error in the libpq, which may cause an infinite loop (#255).
for i in range(0, len(data), MAX_BUFFER_SIZE):
await self.connection.wait(
- copy_to(self._pgconn, data[i : i + MAX_BUFFER_SIZE])
+ copy_to(
+ self._pgconn, data[i : i + MAX_BUFFER_SIZE], flush=PREFER_FLUSH
+ )
)
async def finish(self, exc: BaseException | None = None) -> None:
data = await self._queue.get()
if not data:
break
- await self.connection.wait(copy_to(self._pgconn, data))
+ await self.connection.wait(
+ copy_to(self._pgconn, data, flush=PREFER_FLUSH)
+ )
except BaseException as ex:
# Propagate the error to the main thread.
self._worker_error = ex
from __future__ import annotations
import re
+import sys
import struct
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Match
# Each buffer should be around BUFFER_SIZE size.
QUEUE_SIZE = 1024
+# On certain systems, memmove seems particularly slow and flushing often is
+# more performing than accumulating a larger buffer. See #746 for details.
+PREFER_FLUSH = sys.platform == "darwin"
+
class BaseCopy(Generic[ConnectionType]):
"""
return result
-def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
+def copy_to(pgconn: PGconn, buffer: Buffer, flush: bool = True) -> PQGen[None]:
# Retry enqueuing data until successful.
#
# WARNING! This can cause an infinite loop if the buffer is too large. (see
if ready:
break
- # Repeat until it the message is flushed to the server
- while True:
+ # Flushing often has a good effect on macOS because memcpy operations
+ # seem expensive on this platform so accumulating a large buffer has a
+ # bad effect (see #745).
+ if flush:
+ # Repeat until it the message is flushed to the server
while True:
- ready = yield WAIT_W
- if ready:
+ while True:
+ ready = yield WAIT_W
+ if ready:
+ break
+ f = pgconn.flush()
+ if f == 0:
break
- f = pgconn.flush()
- if f == 0:
- break
def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
def test_worker_error_propagated(conn, monkeypatch):
- def copy_to_broken(pgconn, buffer):
+ def copy_to_broken(pgconn, buffer, flush=True):
raise ZeroDivisionError
yield
async def test_worker_error_propagated(aconn, monkeypatch):
- def copy_to_broken(pgconn, buffer):
+ def copy_to_broken(pgconn, buffer, flush=True):
raise ZeroDivisionError
yield