class Driver(str, Enum):
psycopg2 = "psycopg2"
+ psycopg2_green = "psycopg2_green"
psycopg = "psycopg"
psycopg_async = "psycopg_async"
asyncpg = "asyncpg"
run_psycopg2(psycopg2, args)
+ elif name == Driver.psycopg2_green:
+ import psycopg2
+ import psycopg2.extras # type: ignore
+
+ run_psycopg2_green(psycopg2, args)
+
elif name == Driver.psycopg:
import psycopg
conn.commit()
+def run_psycopg2_green(psycopg2: Any, args: Namespace) -> None:
+ logger.info("Running psycopg2_green")
+
+ psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
+
+ if args.create:
+ logger.info(f"inserting {args.ntests} test records")
+ with psycopg2.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ cursor.execute(table)
+ cursor.executemany(insert, data)
+ conn.commit()
+
+ def run(i):
+ logger.info(f"thread {i} running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ with psycopg2.connect(args.dsn) as conn:
+ with time_log("psycopg2"):
+ for id_ in to_query:
+ with conn.cursor() as cursor:
+ cursor.execute(select, {"id": id_})
+ cursor.fetchall()
+ # conn.rollback()
+
+ if args.concurrency == 1:
+ run(0)
+ else:
+ with ThreadPoolExecutor(max_workers=args.concurrency) as executor:
+ list(executor.map(run, range(args.concurrency)))
+
+ if args.drop:
+ logger.info("dropping test records")
+ with psycopg2.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ conn.commit()
+
+ psycopg2.extensions.set_wait_callback(None)
+
+
def run_psycopg(psycopg: Any, args: Namespace) -> None:
logger.info("Running psycopg sync")