]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: add simple test to compare performances in refactoring
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 17 Oct 2022 09:42:33 +0000 (10:42 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 10 Dec 2022 13:53:23 +0000 (13:53 +0000)
This test is inspired to the problem open in #411, and is pretty much a
simple operation in a tight loop. It is used, at the moment, to compare
different ways to test I/O completion strategies with simple and fast
queries under no concurrency.

tests/scripts/bench-411.py [new file with mode: 0644]

diff --git a/tests/scripts/bench-411.py b/tests/scripts/bench-411.py
new file mode 100644 (file)
index 0000000..82ea451
--- /dev/null
@@ -0,0 +1,300 @@
+import os
+import sys
+import time
+import random
+import asyncio
+import logging
+from enum import Enum
+from typing import Any, Dict, List, Generator
+from argparse import ArgumentParser, Namespace
+from contextlib import contextmanager
+
+logger = logging.getLogger()
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s %(levelname)s %(message)s",
+)
+
+
+class Driver(str, Enum):
+    psycopg2 = "psycopg2"
+    psycopg = "psycopg"
+    psycopg_async = "psycopg_async"
+    asyncpg = "asyncpg"
+
+
+ids: List[int] = []
+data: List[Dict[str, Any]] = []
+
+
+def main() -> None:
+
+    args = parse_cmdline()
+
+    ids[:] = range(args.ntests)
+    data[:] = [
+        dict(
+            id=i,
+            name="c%d" % i,
+            description="c%d" % i,
+            q=i * 10,
+            p=i * 20,
+            x=i * 30,
+            y=i * 40,
+        )
+        for i in ids
+    ]
+
+    # Must be done just on end
+    drop_at_the_end = args.drop
+    args.drop = False
+
+    for i, name in enumerate(args.drivers):
+        if i == len(args.drivers) - 1:
+            args.drop = drop_at_the_end
+
+        if name == Driver.psycopg2:
+            import psycopg2  # type: ignore
+
+            run_psycopg2(psycopg2, args)
+
+        elif name == Driver.psycopg:
+            import psycopg
+
+            run_psycopg(psycopg, args)
+
+        elif name == Driver.psycopg_async:
+            import psycopg
+
+            if sys.platform == "win32":
+                if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
+                    asyncio.set_event_loop_policy(
+                        asyncio.WindowsSelectorEventLoopPolicy()
+                    )
+
+            asyncio.run(run_psycopg_async(psycopg, args))
+
+        elif name == Driver.asyncpg:
+            import asyncpg  # type: ignore
+
+            asyncio.run(run_asyncpg(asyncpg, args))
+
+        else:
+            raise AssertionError(f"unknown driver: {name!r}")
+
+        # Must be done just on start
+        args.create = False
+
+
+table = """
+CREATE TABLE customer (
+        id SERIAL NOT NULL,
+        name VARCHAR(255),
+        description VARCHAR(255),
+        q INTEGER,
+        p INTEGER,
+        x INTEGER,
+        y INTEGER,
+        z INTEGER,
+        PRIMARY KEY (id)
+)
+"""
+drop = "DROP TABLE IF EXISTS customer"
+
+insert = """
+INSERT INTO customer (id, name, description, q, p, x, y) VALUES
+(%(id)s, %(name)s, %(description)s, %(q)s, %(p)s, %(x)s, %(y)s)
+"""
+
+select = """
+SELECT customer.id, customer.name, customer.description, customer.q,
+    customer.p, customer.x, customer.y, customer.z
+FROM customer
+WHERE customer.id = %(id)s
+"""
+
+
+@contextmanager
+def time_log(message: str) -> Generator[None, None, None]:
+    start = time.monotonic()
+    yield
+    end = time.monotonic()
+    logger.info(f"Run {message} in {end-start} s")
+
+
+def run_psycopg2(psycopg2: Any, args: Namespace) -> None:
+    logger.info("Running psycopg2")
+
+    if args.create:
+        logger.info(f"inserting {args.ntests} test records")
+        with psycopg2.connect(args.dsn) as conn:
+            with conn.cursor() as cursor:
+                cursor.execute(drop)
+                cursor.execute(table)
+                cursor.executemany(insert, data)
+            conn.commit()
+
+    logger.info(f"running {args.ntests} queries")
+    to_query = random.choices(ids, k=args.ntests)
+    with psycopg2.connect(args.dsn) as conn:
+        with time_log("psycopg2"):
+            for id_ in to_query:
+                with conn.cursor() as cursor:
+                    cursor.execute(select, {"id": id_})
+                    cursor.fetchall()
+                # conn.rollback()
+
+    if args.drop:
+        logger.info("dropping test records")
+        with psycopg2.connect(args.dsn) as conn:
+            with conn.cursor() as cursor:
+                cursor.execute(drop)
+            conn.commit()
+
+
+def run_psycopg(psycopg: Any, args: Namespace) -> None:
+    logger.info("Running psycopg sync")
+
+    if args.create:
+        logger.info(f"inserting {args.ntests} test records")
+        with psycopg.connect(args.dsn) as conn:
+            with conn.cursor() as cursor:
+                cursor.execute(drop)
+                cursor.execute(table)
+                cursor.executemany(insert, data)
+            conn.commit()
+
+    logger.info(f"running {args.ntests} queries")
+    to_query = random.choices(ids, k=args.ntests)
+    with psycopg.connect(args.dsn) as conn:
+        with time_log("psycopg"):
+            for id_ in to_query:
+                with conn.cursor() as cursor:
+                    cursor.execute(select, {"id": id_})
+                    cursor.fetchall()
+                # conn.rollback()
+
+    if args.drop:
+        logger.info("dropping test records")
+        with psycopg.connect(args.dsn) as conn:
+            with conn.cursor() as cursor:
+                cursor.execute(drop)
+            conn.commit()
+
+
+async def run_psycopg_async(psycopg: Any, args: Namespace) -> None:
+    logger.info("Running psycopg async")
+
+    conn: Any
+
+    if args.create:
+        logger.info(f"inserting {args.ntests} test records")
+        async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(drop)
+                await cursor.execute(table)
+                await cursor.executemany(insert, data)
+            await conn.commit()
+
+    logger.info(f"running {args.ntests} queries")
+    to_query = random.choices(ids, k=args.ntests)
+    async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+        with time_log("psycopg_async"):
+            for id_ in to_query:
+                cursor = await conn.execute(select, {"id": id_})
+                await cursor.fetchall()
+                await cursor.close()
+                # await conn.rollback()
+
+    if args.drop:
+        logger.info("dropping test records")
+        async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(drop)
+            await conn.commit()
+
+
+async def run_asyncpg(asyncpg: Any, args: Namespace) -> None:
+    logger.info("Running asyncpg")
+
+    places = dict(id="$1", name="$2", description="$3", q="$4", p="$5", x="$6", y="$7")
+    a_insert = insert % places
+    a_select = select % {"id": "$1"}
+
+    conn: Any
+
+    if args.create:
+        logger.info(f"inserting {args.ntests} test records")
+        conn = await asyncpg.connect(args.dsn)
+        async with conn.transaction():
+            await conn.execute(drop)
+            await conn.execute(table)
+            await conn.executemany(a_insert, [tuple(d.values()) for d in data])
+        await conn.close()
+
+    logger.info(f"running {args.ntests} queries")
+    to_query = random.choices(ids, k=args.ntests)
+    conn = await asyncpg.connect(args.dsn)
+    with time_log("asyncpg"):
+        for id_ in to_query:
+            tr = conn.transaction()
+            await tr.start()
+            await conn.fetch(a_select, id_)
+            # await tr.rollback()
+    await conn.close()
+
+    if args.drop:
+        logger.info("dropping test records")
+        conn = await asyncpg.connect(args.dsn)
+        async with conn.transaction():
+            await conn.execute(drop)
+        await conn.close()
+
+
+def parse_cmdline() -> Namespace:
+    parser = ArgumentParser(description=__doc__)
+    parser.add_argument(
+        "drivers",
+        nargs="+",
+        metavar="DRIVER",
+        type=Driver,
+        help=f"the drivers to test [choices: {', '.join(d.value for d in Driver)}]",
+    )
+
+    parser.add_argument(
+        "--ntests",
+        type=int,
+        default=10_000,
+        help="number of tests to perform [default: %(default)s]",
+    )
+
+    parser.add_argument(
+        "--dsn",
+        default=os.environ.get("PSYCOPG_TEST_DSN", ""),
+        help="database connection string"
+        " [default: %(default)r (from PSYCOPG_TEST_DSN env var)]",
+    )
+
+    parser.add_argument(
+        "--no-create",
+        dest="create",
+        action="store_false",
+        default="True",
+        help="skip data creation before tests (it must exist already)",
+    )
+
+    parser.add_argument(
+        "--no-drop",
+        dest="drop",
+        action="store_false",
+        default="True",
+        help="skip data drop after tests",
+    )
+
+    opt = parser.parse_args()
+
+    return opt
+
+
+if __name__ == "__main__":
+    main()