From: Daniele Varrazzo Date: Wed, 19 Oct 2022 15:55:33 +0000 (+0100) Subject: perf: avoid lookups of Wait and Ready enum values X-Git-Tag: pool-3.1.4~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=43159ace3131832383cb8969d625ca58a1795e16;p=thirdparty%2Fpsycopg.git perf: avoid lookups of Wait and Ready enum values --- diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 1f5eed2ad..96a27d56f 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -41,6 +41,13 @@ COPY_IN = pq.ExecStatus.COPY_IN COPY_BOTH = pq.ExecStatus.COPY_BOTH PIPELINE_SYNC = pq.ExecStatus.PIPELINE_SYNC +WAIT_R = Wait.R +WAIT_W = Wait.W +WAIT_RW = Wait.RW +READY_R = Ready.R +READY_W = Ready.W +READY_RW = Ready.RW + logger = logging.getLogger(__name__) @@ -62,9 +69,9 @@ def _connect(conninfo: str) -> PQGenConn[PGconn]: if status == POLL_OK: break elif status == POLL_READING: - yield conn.socket, Wait.R + yield conn.socket, WAIT_R elif status == POLL_WRITING: - yield conn.socket, Wait.W + yield conn.socket, WAIT_W elif status == POLL_FAILED: encoding = conninfo_encoding(conninfo) raise e.OperationalError( @@ -110,8 +117,8 @@ def _send(pgconn: PGconn) -> PQGen[None]: if f == 0: break - ready = yield Wait.RW - if ready & Ready.R: + ready = yield WAIT_RW + if ready & READY_R: # This call may read notifies: they will be saved in the # PGconn buffer and passed to Python later, in `fetch()`. pgconn.consume_input() @@ -159,12 +166,12 @@ def _fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]: Return a result from the database (whether success or error). """ if pgconn.is_busy(): - yield Wait.R + yield WAIT_R while True: pgconn.consume_input() if not pgconn.is_busy(): break - yield Wait.R + yield WAIT_R # Consume notifies while True: @@ -188,9 +195,9 @@ def _pipeline_communicate( results = [] while True: - ready = yield Wait.RW + ready = yield WAIT_RW - if ready & Ready.R: + if ready & READY_R: pgconn.consume_input() while True: n = pgconn.notifies() @@ -213,7 +220,7 @@ def _pipeline_communicate( else: res.append(r) - if ready & Ready.W: + if ready & READY_W: pgconn.flush() if not commands: break @@ -223,7 +230,7 @@ def _pipeline_communicate( def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]: - yield Wait.R + yield WAIT_R pgconn.consume_input() ns = [] @@ -244,7 +251,7 @@ def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]: break # would block - yield Wait.R + yield WAIT_R pgconn.consume_input() if nbytes > 0: @@ -272,17 +279,17 @@ def copy_to(pgconn: PGconn, buffer: bytes) -> PQGen[None]: # into smaller ones. We prefer to do it there instead of here in order to # do it upstream the queue decoupling the writer task from the producer one. while pgconn.put_copy_data(buffer) == 0: - yield Wait.W + yield WAIT_W def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]: # Retry enqueuing end copy message until successful while pgconn.put_copy_end(error) == 0: - yield Wait.W + yield WAIT_W # Repeat until it the message is flushed to the server while True: - yield Wait.W + yield WAIT_W f = pgconn.flush() if f == 0: break diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 25e828fc4..def14c6aa 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -32,6 +32,14 @@ class Ready(IntEnum): RW = EVENT_READ | EVENT_WRITE +WAIT_R = Wait.R +WAIT_W = Wait.W +WAIT_RW = Wait.RW +READY_R = Ready.R +READY_W = Ready.W +READY_RW = Ready.RW + + def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: """ Wait for a generator using the best strategy available. @@ -125,16 +133,16 @@ async def wait_async(gen: PQGen[RV], fileno: int) -> RV: try: s = next(gen) while True: - reader = s & Wait.R - writer = s & Wait.W + reader = s & WAIT_R + writer = s & WAIT_W if not reader and not writer: raise e.InternalError(f"bad poll status: {s}") ev.clear() ready = 0 # type: ignore[assignment] if reader: - loop.add_reader(fileno, wakeup, Ready.R) + loop.add_reader(fileno, wakeup, READY_R) if writer: - loop.add_writer(fileno, wakeup, Ready.W) + loop.add_writer(fileno, wakeup, READY_W) try: await ev.wait() finally: @@ -179,16 +187,16 @@ async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) - if not timeout: timeout = None while True: - reader = s & Wait.R - writer = s & Wait.W + reader = s & WAIT_R + writer = s & WAIT_W if not reader and not writer: raise e.InternalError(f"bad poll status: {s}") ev.clear() ready = 0 # type: ignore[assignment] if reader: - loop.add_reader(fileno, wakeup, Ready.R) + loop.add_reader(fileno, wakeup, READY_R) if writer: - loop.add_writer(fileno, wakeup, Ready.W) + loop.add_writer(fileno, wakeup, READY_W) try: await wait_for(ev.wait(), timeout) finally: @@ -233,9 +241,9 @@ def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> ev = fileevs[0][1] ready = 0 if ev & ~select.EPOLLOUT: - ready = Ready.R + ready = READY_R if ev & ~select.EPOLLIN: - ready |= Ready.W + ready |= READY_W assert s & ready s = gen.send(ready) evmask = poll_evmasks[s] diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index b1fdfd1a1..5a9215282 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -19,6 +19,7 @@ cdef object WAIT_W = Wait.W cdef object WAIT_R = Wait.R cdef object WAIT_RW = Wait.RW cdef int READY_R = Ready.R +cdef int READY_W = Ready.W def connect(conninfo: str) -> PQGenConn[abc.PGconn]: """ @@ -219,7 +220,7 @@ def pipeline_communicate( while True: ready = yield WAIT_RW - if ready & Ready.R: + if ready & READY_R: pgconn.consume_input() with nogil: cires = libpq.PQconsumeInput(pgconn_ptr) @@ -261,7 +262,7 @@ def pipeline_communicate( res.append(r) - if ready & Ready.W: + if ready & READY_W: pgconn.flush() if not commands: break