import time
-
class Profiler(object):
tests = []
- def __init__(self, setup, options):
+ def __init__(self, options, setup=None, setup_once=None):
self.setup = setup
+ self.setup_once = setup_once
self.test = options.test
self.dburl = options.dburl
self.runsnake = options.runsnake
else:
tests = self.tests
+ if self.setup_once:
+ print("Running setup once...")
+ self.setup_once(self.dburl, self.echo, self.num)
print("Tests to run: %s" % ", ".join([t.__name__ for t in tests]))
for test in tests:
self._run_test(test)
self.stats.append(TestResult(self, fn, total_time=total))
def _run_test(self, fn):
- self.setup(self.dburl, self.echo)
+ if self.setup:
+ self.setup(self.dburl, self.echo, self.num)
if self.profile or self.runsnake or self.dump:
self._run_with_profile(fn)
else:
self._run_with_time(fn)
@classmethod
- def main(cls, setup):
+ def main(cls, num, setup=None, setup_once=None):
parser = argparse.ArgumentParser()
parser.add_argument(
help="database URL, default sqlite:///profile.db"
)
parser.add_argument(
- '--num', type=int, default=100000,
- help="Number of iterations/items/etc for tests, default 100000"
+ '--num', type=int, default=num,
+ help="Number of iterations/items/etc for tests; "
+ "default is %d module-specific" % num
)
parser.add_argument(
'--profile', action='store_true',
help='invoke runsnakerun (implies --profile)')
parser.add_argument(
'--echo', action='store_true',
- help="Echo SQL output"
- )
+ help="Echo SQL output")
args = parser.parse_args()
args.profile = args.profile or args.dump or args.runsnake
- Profiler(setup, args).run()
+ Profiler(args, setup=setup, setup_once=setup_once).run()
class TestResult(object):
from . import Profiler
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import Column, Integer, String, create_engine
+from sqlalchemy import Column, Integer, String, create_engine, bindparam
from sqlalchemy.orm import Session
Base = declarative_base()
description = Column(String(255))
-def setup_database(dburl, echo):
+def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
@Profiler.profile
-def test_sqlite_raw(n):
- """pysqlite's pure C API inserting rows in bulk, no pure Python at all"""
- conn = engine.raw_connection()
+def test_dbapi_raw(n):
+ """The DBAPI's pure C API inserting rows in bulk, no pure Python at all"""
+
+ conn = engine.pool._creator()
cursor = conn.cursor()
- cursor.executemany(
- "INSERT INTO customer (name, description) VALUES(:name, :description)",
- [
+ compiled = Customer.__table__.insert().values(
+ name=bindparam('name'),
+ description=bindparam('description')).\
+ compile(dialect=engine.dialect)
+
+ if compiled.positional:
+ args = (
+ ('customer name %d' % i, 'customer description %d' % i)
+ for i in range(n))
+ else:
+ args = (
dict(
name='customer name %d' % i,
description='customer description %d' % i
)
for i in range(n)
- ]
+ )
+
+ cursor.executemany(
+ str(compiled),
+ list(args)
)
conn.commit()
-
+ conn.close()
if __name__ == '__main__':
- Profiler.main(setup=setup_database)
+ Profiler.main(setup=setup_database, num=100000)
--- /dev/null
+"""In this series of tests, we are looking at time to load 1M very small
+and simple rows.
+
+"""
+from . import Profiler
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine, literal_column
+from sqlalchemy.orm import Session, Bundle
+
+Base = declarative_base()
+engine = None
+
+
+class Customer(Base):
+ __tablename__ = "customer"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(255))
+ description = Column(String(255))
+
+
+def setup_database(dburl, echo, num):
+ global engine
+ engine = create_engine(dburl, echo=echo)
+ Base.metadata.drop_all(engine)
+ Base.metadata.create_all(engine)
+
+ s = Session(engine)
+ for chunk in range(0, num, 10000):
+ s.bulk_insert_mappings(Customer, [
+ {
+ 'name': 'customer name %d' % i,
+ 'description': 'customer description %d' % i
+ } for i in range(chunk, chunk + 10000)
+ ])
+ s.commit()
+
+
+@Profiler.profile
+def test_orm_full_objects(n):
+ """Load fully tracked objects using the ORM."""
+
+ sess = Session(engine)
+ # avoid using all() so that we don't have the overhead of building
+ # a large list of full objects in memory
+ for obj in sess.query(Customer).yield_per(1000).limit(n):
+ pass
+
+
+@Profiler.profile
+def test_orm_bundles(n):
+ """Load lightweight "bundle" objects using the ORM."""
+
+ sess = Session(engine)
+ bundle = Bundle('customer',
+ Customer.id, Customer.name, Customer.description)
+ for row in sess.query(bundle).yield_per(10000).limit(n):
+ pass
+
+
+@Profiler.profile
+def test_orm_columns(n):
+ """Load individual columns into named tuples using the ORM."""
+
+ sess = Session(engine)
+ for row in sess.query(
+ Customer.id, Customer.name,
+ Customer.description).yield_per(10000).limit(n):
+ pass
+
+
+@Profiler.profile
+def test_core_fetchall(n):
+ """Load Core result rows using Core / fetchall."""
+
+ with engine.connect() as conn:
+ result = conn.execute(Customer.__table__.select().limit(n)).fetchall()
+ for row in result:
+ data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_core_fetchchunks_w_streaming(n):
+ """Load Core result rows using Core with fetchmany and
+ streaming results."""
+
+ with engine.connect() as conn:
+ result = conn.execution_options(stream_results=True).\
+ execute(Customer.__table__.select().limit(n))
+ while True:
+ chunk = result.fetchmany(10000)
+ if not chunk:
+ break
+ for row in chunk:
+ data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_core_fetchchunks(n):
+ """Load Core result rows using Core / fetchmany."""
+
+ with engine.connect() as conn:
+ result = conn.execute(Customer.__table__.select().limit(n))
+ while True:
+ chunk = result.fetchmany(10000)
+ if not chunk:
+ break
+ for row in chunk:
+ data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_dbapi_fetchall(n):
+ """Load DBAPI cursor rows using fetchall()"""
+
+ _test_dbapi_raw(n, True)
+
+
+@Profiler.profile
+def test_dbapi_fetchchunks(n):
+ """Load DBAPI cursor rows using fetchmany()
+ (usually doesn't limit memory)"""
+
+ _test_dbapi_raw(n, False)
+
+
+def _test_dbapi_raw(n, fetchall):
+ compiled = Customer.__table__.select().limit(n).\
+ compile(
+ dialect=engine.dialect,
+ compile_kwargs={"literal_binds": True})
+
+ sql = str(compiled)
+
+ import pdb
+ pdb.set_trace()
+ conn = engine.raw_connection()
+ cursor = conn.cursor()
+ cursor.execute(sql)
+
+ if fetchall:
+ for row in cursor.fetchall():
+ # ensure that we fully fetch!
+ data = row[0], row[1], row[2]
+ else:
+ while True:
+ chunk = cursor.fetchmany(10000)
+ if not chunk:
+ break
+ for row in chunk:
+ data = row[0], row[1], row[2]
+ conn.close()
+
+if __name__ == '__main__':
+ Profiler.main(setup_once=setup_database, num=1000000)
--- /dev/null
+"""In this series of tests, we're looking at a method that inserts a row
+within a distinct transaction, and afterwards returns to essentially a
+"closed" state. This would be analogous to an API call that starts up
+a database connection, inserts the row, commits and closes.
+
+"""
+from . import Profiler
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine, bindparam, pool
+from sqlalchemy.orm import Session
+
+Base = declarative_base()
+engine = None
+
+
+class Customer(Base):
+ __tablename__ = "customer"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(255))
+ description = Column(String(255))
+
+
+def setup_database(dburl, echo, num):
+ global engine
+ engine = create_engine(dburl, echo=echo)
+ if engine.dialect.name == 'sqlite':
+ engine.pool = pool.StaticPool(creator=engine.pool._creator)
+ Base.metadata.drop_all(engine)
+ Base.metadata.create_all(engine)
+
+
+@Profiler.profile
+def test_orm_commit(n):
+ """Individual INSERT/COMMIT pairs via the ORM"""
+
+ for i in range(n):
+ session = Session(bind=engine)
+ session.add(
+ Customer(
+ name='customer name %d' % i,
+ description='customer description %d' % i)
+ )
+ session.commit()
+
+
+@Profiler.profile
+def test_bulk_save(n):
+ """Individual INSERT/COMMIT pairs using the "bulk" API """
+
+ for i in range(n):
+ session = Session(bind=engine)
+ session.bulk_save_objects([
+ Customer(
+ name='customer name %d' % i,
+ description='customer description %d' % i
+ )])
+ session.commit()
+
+
+@Profiler.profile
+def test_bulk_insert_dictionaries(n):
+ """Individual INSERT/COMMIT pairs using the "bulk" API with dictionaries"""
+
+ for i in range(n):
+ session = Session(bind=engine)
+ session.bulk_insert_mappings(Customer, [
+ dict(
+ name='customer name %d' % i,
+ description='customer description %d' % i
+ )])
+ session.commit()
+
+
+@Profiler.profile
+def test_core(n):
+ """Individual INSERT/COMMIT pairs using Core."""
+
+ for i in range(n):
+ with engine.begin() as conn:
+ conn.execute(
+ Customer.__table__.insert(),
+ dict(
+ name='customer name %d' % i,
+ description='customer description %d' % i
+ )
+ )
+
+
+@Profiler.profile
+def test_dbapi_raw_w_connect(n):
+ """Individual INSERT/COMMIT pairs using a pure DBAPI connection,
+ connect each time."""
+
+ _test_dbapi_raw(n, True)
+
+
+@Profiler.profile
+def test_dbapi_raw_w_pool(n):
+ """Individual INSERT/COMMIT pairs using a pure DBAPI connection,
+ using a connection pool."""
+
+ _test_dbapi_raw(n, False)
+
+
+def _test_dbapi_raw(n, connect):
+ compiled = Customer.__table__.insert().values(
+ name=bindparam('name'),
+ description=bindparam('description')).\
+ compile(dialect=engine.dialect)
+
+ if compiled.positional:
+ args = (
+ ('customer name %d' % i, 'customer description %d' % i)
+ for i in range(n))
+ else:
+ args = (
+ dict(
+ name='customer name %d' % i,
+ description='customer description %d' % i
+ )
+ for i in range(n)
+ )
+ sql = str(compiled)
+
+ if connect:
+ for arg in args:
+ # there's no connection pool, so if these were distinct
+ # calls, we'd be connecting each time
+ conn = engine.pool._creator()
+ cursor = conn.cursor()
+ cursor.execute(sql, arg)
+ conn.commit()
+ conn.close()
+ else:
+ for arg in args:
+ conn = engine.raw_connection()
+ cursor = conn.cursor()
+ cursor.execute(sql, arg)
+ conn.commit()
+ conn.close()
+
+
+if __name__ == '__main__':
+ Profiler.main(setup=setup_database, num=10000)