from typing import Any, Dict, List, Generator
from argparse import ArgumentParser, Namespace
from contextlib import contextmanager
+from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger()
logging.basicConfig(
cursor.executemany(insert, data)
conn.commit()
- logger.info(f"running {args.ntests} queries")
- to_query = random.choices(ids, k=args.ntests)
- with psycopg2.connect(args.dsn) as conn:
- with time_log("psycopg2"):
- for id_ in to_query:
- with conn.cursor() as cursor:
- cursor.execute(select, {"id": id_})
- cursor.fetchall()
- # conn.rollback()
+ def run(i):
+ logger.info(f"thread {i} running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ with psycopg2.connect(args.dsn) as conn:
+ with time_log("psycopg2"):
+ for id_ in to_query:
+ with conn.cursor() as cursor:
+ cursor.execute(select, {"id": id_})
+ cursor.fetchall()
+ # conn.rollback()
+
+ if args.concurrency == 1:
+ run(0)
+ else:
+ with ThreadPoolExecutor(max_workers=args.concurrency) as executor:
+ list(executor.map(run, range(args.concurrency)))
if args.drop:
logger.info("dropping test records")
cursor.executemany(insert, data)
conn.commit()
- logger.info(f"running {args.ntests} queries")
- to_query = random.choices(ids, k=args.ntests)
- with psycopg.connect(args.dsn) as conn:
- with time_log("psycopg"):
- for id_ in to_query:
- with conn.cursor() as cursor:
- cursor.execute(select, {"id": id_})
- cursor.fetchall()
- # conn.rollback()
+ def run(i):
+ logger.info(f"thread {i} running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ with psycopg.connect(args.dsn) as conn:
+ with time_log("psycopg"):
+ for id_ in to_query:
+ with conn.cursor() as cursor:
+ cursor.execute(select, {"id": id_})
+ cursor.fetchall()
+ # conn.rollback()
+
+ if args.concurrency == 1:
+ run(0)
+ else:
+ with ThreadPoolExecutor(max_workers=args.concurrency) as executor:
+ list(executor.map(run, range(args.concurrency)))
if args.drop:
logger.info("dropping test records")
await cursor.executemany(insert, data)
await conn.commit()
- logger.info(f"running {args.ntests} queries")
- to_query = random.choices(ids, k=args.ntests)
- async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
- with time_log("psycopg_async"):
- for id_ in to_query:
- cursor = await conn.execute(select, {"id": id_})
- await cursor.fetchall()
- await cursor.close()
- # await conn.rollback()
+ async def run(i):
+ logger.info(f"task {i} running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+ with time_log("psycopg_async"):
+ for id_ in to_query:
+ cursor = await conn.execute(select, {"id": id_})
+ await cursor.fetchall()
+ await cursor.close()
+ # await conn.rollback()
+
+ if args.concurrency == 1:
+ await run(0)
+ else:
+ tasks = [run(i) for i in range(args.concurrency)]
+ await asyncio.gather(*tasks)
if args.drop:
logger.info("dropping test records")
await conn.executemany(a_insert, [tuple(d.values()) for d in data])
await conn.close()
- logger.info(f"running {args.ntests} queries")
- to_query = random.choices(ids, k=args.ntests)
- conn = await asyncpg.connect(args.dsn)
- with time_log("asyncpg"):
- for id_ in to_query:
- tr = conn.transaction()
- await tr.start()
- await conn.fetch(a_select, id_)
- # await tr.rollback()
- await conn.close()
+ async def run(i):
+ logger.info(f"task {i} running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ conn = await asyncpg.connect(args.dsn)
+ with time_log("asyncpg"):
+ for id_ in to_query:
+ # tr = conn.transaction()
+ # await tr.start()
+ await conn.fetch(a_select, id_)
+ # await tr.rollback()
+ await conn.close()
+
+ if args.concurrency == 1:
+ await run(0)
+ else:
+ tasks = [run(i) for i in range(args.concurrency)]
+ await asyncio.gather(*tasks)
if args.drop:
logger.info("dropping test records")
parser.add_argument(
"--ntests",
+ "-n",
type=int,
default=10_000,
help="number of tests to perform [default: %(default)s]",
)
+ parser.add_argument(
+ "--concurrency",
+ "-c",
+ type=int,
+ default=1,
+ help="number of parallel tasks [default: %(default)s]",
+ )
+
parser.add_argument(
"--dsn",
default=os.environ.get("PSYCOPG_TEST_DSN", ""),