From: Mike Bayer Date: Wed, 3 Sep 2014 23:30:38 +0000 (-0400) Subject: - large resultsets X-Git-Tag: rel_1_0_0b1~201 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2c081f9a4af8928505ce4ea6ca2747ccb2e649c7;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - large resultsets --- diff --git a/examples/performance/__init__.py b/examples/performance/__init__.py index ae914db96b..b57f25b947 100644 --- a/examples/performance/__init__.py +++ b/examples/performance/__init__.py @@ -44,12 +44,12 @@ import os import time - class Profiler(object): tests = [] - def __init__(self, setup, options): + def __init__(self, options, setup=None, setup_once=None): self.setup = setup + self.setup_once = setup_once self.test = options.test self.dburl = options.dburl self.runsnake = options.runsnake @@ -72,6 +72,9 @@ class Profiler(object): else: tests = self.tests + if self.setup_once: + print("Running setup once...") + self.setup_once(self.dburl, self.echo, self.num) print("Tests to run: %s" % ", ".join([t.__name__ for t in tests])) for test in tests: self._run_test(test) @@ -100,14 +103,15 @@ class Profiler(object): self.stats.append(TestResult(self, fn, total_time=total)) def _run_test(self, fn): - self.setup(self.dburl, self.echo) + if self.setup: + self.setup(self.dburl, self.echo, self.num) if self.profile or self.runsnake or self.dump: self._run_with_profile(fn) else: self._run_with_time(fn) @classmethod - def main(cls, setup): + def main(cls, num, setup=None, setup_once=None): parser = argparse.ArgumentParser() parser.add_argument( @@ -119,8 +123,9 @@ class Profiler(object): help="database URL, default sqlite:///profile.db" ) parser.add_argument( - '--num', type=int, default=100000, - help="Number of iterations/items/etc for tests, default 100000" + '--num', type=int, default=num, + help="Number of iterations/items/etc for tests; " + "default is %d module-specific" % num ) parser.add_argument( '--profile', action='store_true', @@ -133,13 +138,12 @@ class Profiler(object): help='invoke runsnakerun (implies --profile)') parser.add_argument( '--echo', action='store_true', - help="Echo SQL output" - ) + help="Echo SQL output") args = parser.parse_args() args.profile = args.profile or args.dump or args.runsnake - Profiler(setup, args).run() + Profiler(args, setup=setup, setup_once=setup_once).run() class TestResult(object): diff --git a/examples/performance/bulk_inserts.py b/examples/performance/bulk_inserts.py index 42ab920a64..648d5f2aad 100644 --- a/examples/performance/bulk_inserts.py +++ b/examples/performance/bulk_inserts.py @@ -1,7 +1,7 @@ from . import Profiler from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy import Column, Integer, String, create_engine +from sqlalchemy import Column, Integer, String, create_engine, bindparam from sqlalchemy.orm import Session Base = declarative_base() @@ -15,7 +15,7 @@ class Customer(Base): description = Column(String(255)) -def setup_database(dburl, echo): +def setup_database(dburl, echo, num): global engine engine = create_engine(dburl, echo=echo) Base.metadata.drop_all(engine) @@ -111,22 +111,35 @@ def test_core_insert(n): @Profiler.profile -def test_sqlite_raw(n): - """pysqlite's pure C API inserting rows in bulk, no pure Python at all""" - conn = engine.raw_connection() +def test_dbapi_raw(n): + """The DBAPI's pure C API inserting rows in bulk, no pure Python at all""" + + conn = engine.pool._creator() cursor = conn.cursor() - cursor.executemany( - "INSERT INTO customer (name, description) VALUES(:name, :description)", - [ + compiled = Customer.__table__.insert().values( + name=bindparam('name'), + description=bindparam('description')).\ + compile(dialect=engine.dialect) + + if compiled.positional: + args = ( + ('customer name %d' % i, 'customer description %d' % i) + for i in range(n)) + else: + args = ( dict( name='customer name %d' % i, description='customer description %d' % i ) for i in range(n) - ] + ) + + cursor.executemany( + str(compiled), + list(args) ) conn.commit() - + conn.close() if __name__ == '__main__': - Profiler.main(setup=setup_database) + Profiler.main(setup=setup_database, num=100000) diff --git a/examples/performance/large_resultsets.py b/examples/performance/large_resultsets.py new file mode 100644 index 0000000000..268c6dc871 --- /dev/null +++ b/examples/performance/large_resultsets.py @@ -0,0 +1,155 @@ +"""In this series of tests, we are looking at time to load 1M very small +and simple rows. + +""" +from . import Profiler + +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, create_engine, literal_column +from sqlalchemy.orm import Session, Bundle + +Base = declarative_base() +engine = None + + +class Customer(Base): + __tablename__ = "customer" + id = Column(Integer, primary_key=True) + name = Column(String(255)) + description = Column(String(255)) + + +def setup_database(dburl, echo, num): + global engine + engine = create_engine(dburl, echo=echo) + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + + s = Session(engine) + for chunk in range(0, num, 10000): + s.bulk_insert_mappings(Customer, [ + { + 'name': 'customer name %d' % i, + 'description': 'customer description %d' % i + } for i in range(chunk, chunk + 10000) + ]) + s.commit() + + +@Profiler.profile +def test_orm_full_objects(n): + """Load fully tracked objects using the ORM.""" + + sess = Session(engine) + # avoid using all() so that we don't have the overhead of building + # a large list of full objects in memory + for obj in sess.query(Customer).yield_per(1000).limit(n): + pass + + +@Profiler.profile +def test_orm_bundles(n): + """Load lightweight "bundle" objects using the ORM.""" + + sess = Session(engine) + bundle = Bundle('customer', + Customer.id, Customer.name, Customer.description) + for row in sess.query(bundle).yield_per(10000).limit(n): + pass + + +@Profiler.profile +def test_orm_columns(n): + """Load individual columns into named tuples using the ORM.""" + + sess = Session(engine) + for row in sess.query( + Customer.id, Customer.name, + Customer.description).yield_per(10000).limit(n): + pass + + +@Profiler.profile +def test_core_fetchall(n): + """Load Core result rows using Core / fetchall.""" + + with engine.connect() as conn: + result = conn.execute(Customer.__table__.select().limit(n)).fetchall() + for row in result: + data = row['id'], row['name'], row['description'] + + +@Profiler.profile +def test_core_fetchchunks_w_streaming(n): + """Load Core result rows using Core with fetchmany and + streaming results.""" + + with engine.connect() as conn: + result = conn.execution_options(stream_results=True).\ + execute(Customer.__table__.select().limit(n)) + while True: + chunk = result.fetchmany(10000) + if not chunk: + break + for row in chunk: + data = row['id'], row['name'], row['description'] + + +@Profiler.profile +def test_core_fetchchunks(n): + """Load Core result rows using Core / fetchmany.""" + + with engine.connect() as conn: + result = conn.execute(Customer.__table__.select().limit(n)) + while True: + chunk = result.fetchmany(10000) + if not chunk: + break + for row in chunk: + data = row['id'], row['name'], row['description'] + + +@Profiler.profile +def test_dbapi_fetchall(n): + """Load DBAPI cursor rows using fetchall()""" + + _test_dbapi_raw(n, True) + + +@Profiler.profile +def test_dbapi_fetchchunks(n): + """Load DBAPI cursor rows using fetchmany() + (usually doesn't limit memory)""" + + _test_dbapi_raw(n, False) + + +def _test_dbapi_raw(n, fetchall): + compiled = Customer.__table__.select().limit(n).\ + compile( + dialect=engine.dialect, + compile_kwargs={"literal_binds": True}) + + sql = str(compiled) + + import pdb + pdb.set_trace() + conn = engine.raw_connection() + cursor = conn.cursor() + cursor.execute(sql) + + if fetchall: + for row in cursor.fetchall(): + # ensure that we fully fetch! + data = row[0], row[1], row[2] + else: + while True: + chunk = cursor.fetchmany(10000) + if not chunk: + break + for row in chunk: + data = row[0], row[1], row[2] + conn.close() + +if __name__ == '__main__': + Profiler.main(setup_once=setup_database, num=1000000) diff --git a/examples/performance/single_inserts.py b/examples/performance/single_inserts.py new file mode 100644 index 0000000000..671bbbe9ca --- /dev/null +++ b/examples/performance/single_inserts.py @@ -0,0 +1,145 @@ +"""In this series of tests, we're looking at a method that inserts a row +within a distinct transaction, and afterwards returns to essentially a +"closed" state. This would be analogous to an API call that starts up +a database connection, inserts the row, commits and closes. + +""" +from . import Profiler + +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, create_engine, bindparam, pool +from sqlalchemy.orm import Session + +Base = declarative_base() +engine = None + + +class Customer(Base): + __tablename__ = "customer" + id = Column(Integer, primary_key=True) + name = Column(String(255)) + description = Column(String(255)) + + +def setup_database(dburl, echo, num): + global engine + engine = create_engine(dburl, echo=echo) + if engine.dialect.name == 'sqlite': + engine.pool = pool.StaticPool(creator=engine.pool._creator) + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + + +@Profiler.profile +def test_orm_commit(n): + """Individual INSERT/COMMIT pairs via the ORM""" + + for i in range(n): + session = Session(bind=engine) + session.add( + Customer( + name='customer name %d' % i, + description='customer description %d' % i) + ) + session.commit() + + +@Profiler.profile +def test_bulk_save(n): + """Individual INSERT/COMMIT pairs using the "bulk" API """ + + for i in range(n): + session = Session(bind=engine) + session.bulk_save_objects([ + Customer( + name='customer name %d' % i, + description='customer description %d' % i + )]) + session.commit() + + +@Profiler.profile +def test_bulk_insert_dictionaries(n): + """Individual INSERT/COMMIT pairs using the "bulk" API with dictionaries""" + + for i in range(n): + session = Session(bind=engine) + session.bulk_insert_mappings(Customer, [ + dict( + name='customer name %d' % i, + description='customer description %d' % i + )]) + session.commit() + + +@Profiler.profile +def test_core(n): + """Individual INSERT/COMMIT pairs using Core.""" + + for i in range(n): + with engine.begin() as conn: + conn.execute( + Customer.__table__.insert(), + dict( + name='customer name %d' % i, + description='customer description %d' % i + ) + ) + + +@Profiler.profile +def test_dbapi_raw_w_connect(n): + """Individual INSERT/COMMIT pairs using a pure DBAPI connection, + connect each time.""" + + _test_dbapi_raw(n, True) + + +@Profiler.profile +def test_dbapi_raw_w_pool(n): + """Individual INSERT/COMMIT pairs using a pure DBAPI connection, + using a connection pool.""" + + _test_dbapi_raw(n, False) + + +def _test_dbapi_raw(n, connect): + compiled = Customer.__table__.insert().values( + name=bindparam('name'), + description=bindparam('description')).\ + compile(dialect=engine.dialect) + + if compiled.positional: + args = ( + ('customer name %d' % i, 'customer description %d' % i) + for i in range(n)) + else: + args = ( + dict( + name='customer name %d' % i, + description='customer description %d' % i + ) + for i in range(n) + ) + sql = str(compiled) + + if connect: + for arg in args: + # there's no connection pool, so if these were distinct + # calls, we'd be connecting each time + conn = engine.pool._creator() + cursor = conn.cursor() + cursor.execute(sql, arg) + conn.commit() + conn.close() + else: + for arg in args: + conn = engine.raw_connection() + cursor = conn.cursor() + cursor.execute(sql, arg) + conn.commit() + conn.close() + + +if __name__ == '__main__': + Profiler.main(setup=setup_database, num=10000)