]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add script to test the pool performance
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 9 May 2021 23:04:37 +0000 (01:04 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 9 May 2021 23:04:37 +0000 (01:04 +0200)
tests/scripts/spiketest.py [new file with mode: 0644]

diff --git a/tests/scripts/spiketest.py b/tests/scripts/spiketest.py
new file mode 100644 (file)
index 0000000..00030b2
--- /dev/null
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+"""
+Run a connection pool spike test.
+
+The test is inspired to the `spike analysis`__ illustrated by HikariCP
+
+.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/
+       Welcome-To-The-Jungle.md
+
+"""
+
+import sys
+import time
+import threading
+
+import psycopg3.pool
+
+import logging
+
+
+def main():
+    opt = parse_cmdline()
+    if opt.loglevel:
+        loglevel = getattr(logging, opt.loglevel.upper())
+        logging.basicConfig(
+            level=loglevel, format="%(asctime)s %(levelname)s %(message)s"
+        )
+
+        logging.getLogger("psycopg2.pool").setLevel(loglevel)
+
+    with psycopg3.pool.ConnectionPool(
+        opt.dsn,
+        min_size=opt.min_size,
+        max_size=opt.max_size,
+        connection_class=DelayedConnection,
+        kwargs={"conn_delay": 0.150},
+    ) as pool:
+        pool.wait()
+        measurer = Measurer(pool)
+
+        # Create and start all the thread: they will get stuck on the event
+        ev = threading.Event()
+        threads = [
+            threading.Thread(
+                target=worker, args=(pool, 0.002, ev), daemon=True
+            )
+            for i in range(opt.num_clients)
+        ]
+        [t.start() for t in threads]
+        time.sleep(0.2)
+
+        # Release the threads!
+        measurer.start(0.00025)
+        t0 = time.time()
+        ev.set()
+
+        # Wait for the threads to finish
+        [t.join() for t in threads]
+        t1 = time.time()
+        measurer.stop()
+
+    print(f"time: {(t1 - t0) * 1000} msec")
+    print("active,idle,total,waiting")
+    recs = [
+        f'{m["pool_size"] - m["pool_available"]}'
+        f',{m["pool_available"]}'
+        f',{m["pool_size"]}'
+        f',{m["requests_waiting"]}'
+        for m in measurer.measures
+    ]
+    print("\n".join(recs))
+
+
+def worker(p, t, ev):
+    ev.wait()
+    with p.connection():
+        time.sleep(t)
+
+
+class Measurer:
+    def __init__(self, pool):
+        self.pool = pool
+        self.worker = None
+        self.stopped = False
+        self.measures = []
+
+    def start(self, interval):
+        self.worker = threading.Thread(
+            target=self._run, args=(interval,), daemon=True
+        )
+        self.worker.start()
+
+    def stop(self):
+        self.stopped = True
+        if self.worker:
+            self.worker.join()
+            self.worker = None
+
+    def _run(self, interval):
+        while not self.stopped:
+            self.measures.append(self.pool.get_stats())
+            time.sleep(interval)
+
+
+class DelayedConnection(psycopg3.Connection):
+    """A connection adding a delay to the connection time."""
+
+    @classmethod
+    def connect(cls, conninfo, conn_delay=0, **kwargs):
+        t0 = time.time()
+        conn = super().connect(conninfo, **kwargs)
+        t1 = time.time()
+        wait = max(0.0, conn_delay - (t1 - t0))
+        if wait:
+            time.sleep(wait)
+        return conn
+
+
+def parse_cmdline():
+    from argparse import ArgumentParser
+
+    parser = ArgumentParser(description=__doc__)
+    parser.add_argument(
+        "--dsn", default="", help="connection string to the database"
+    )
+    parser.add_argument(
+        "--min_size",
+        default=5,
+        type=int,
+        help="minimum number of connections in the pool",
+    )
+    parser.add_argument(
+        "--max_size",
+        default=20,
+        type=int,
+        help="maximum number of connections in the pool",
+    )
+    parser.add_argument(
+        "--num-clients",
+        default=50,
+        type=int,
+        help="number of threads making a request",
+    )
+    parser.add_argument(
+        "--loglevel",
+        default=None,
+        choices=("DEBUG", "INFO", "WARNING", "ERROR"),
+        help="level to log at [default: no log]",
+    )
+
+    opt = parser.parse_args()
+
+    return opt
+
+
+if __name__ == "__main__":
+    sys.exit(main())