]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- large resultsets
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 3 Sep 2014 23:30:38 +0000 (19:30 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 3 Sep 2014 23:30:38 +0000 (19:30 -0400)
examples/performance/__init__.py
examples/performance/bulk_inserts.py
examples/performance/large_resultsets.py [new file with mode: 0644]
examples/performance/single_inserts.py [new file with mode: 0644]

index ae914db96b799ef1091adc93f461ad517ea104e1..b57f25b9474947b9b384111df7fd91a2962a77d5 100644 (file)
@@ -44,12 +44,12 @@ import os
 import time
 
 
-
 class Profiler(object):
     tests = []
 
-    def __init__(self, setup, options):
+    def __init__(self, options, setup=None, setup_once=None):
         self.setup = setup
+        self.setup_once = setup_once
         self.test = options.test
         self.dburl = options.dburl
         self.runsnake = options.runsnake
@@ -72,6 +72,9 @@ class Profiler(object):
         else:
             tests = self.tests
 
+        if self.setup_once:
+            print("Running setup once...")
+            self.setup_once(self.dburl, self.echo, self.num)
         print("Tests to run: %s" % ", ".join([t.__name__ for t in tests]))
         for test in tests:
             self._run_test(test)
@@ -100,14 +103,15 @@ class Profiler(object):
             self.stats.append(TestResult(self, fn, total_time=total))
 
     def _run_test(self, fn):
-        self.setup(self.dburl, self.echo)
+        if self.setup:
+            self.setup(self.dburl, self.echo, self.num)
         if self.profile or self.runsnake or self.dump:
             self._run_with_profile(fn)
         else:
             self._run_with_time(fn)
 
     @classmethod
-    def main(cls, setup):
+    def main(cls, num, setup=None, setup_once=None):
         parser = argparse.ArgumentParser()
 
         parser.add_argument(
@@ -119,8 +123,9 @@ class Profiler(object):
             help="database URL, default sqlite:///profile.db"
         )
         parser.add_argument(
-            '--num', type=int, default=100000,
-            help="Number of iterations/items/etc for tests, default 100000"
+            '--num', type=int, default=num,
+            help="Number of iterations/items/etc for tests; "
+                 "default is %d module-specific" % num
         )
         parser.add_argument(
             '--profile', action='store_true',
@@ -133,13 +138,12 @@ class Profiler(object):
             help='invoke runsnakerun (implies --profile)')
         parser.add_argument(
             '--echo', action='store_true',
-            help="Echo SQL output"
-            )
+            help="Echo SQL output")
         args = parser.parse_args()
 
         args.profile = args.profile or args.dump or args.runsnake
 
-        Profiler(setup, args).run()
+        Profiler(args, setup=setup, setup_once=setup_once).run()
 
 
 class TestResult(object):
index 42ab920a64dfd55605ba9508423c3f47d858ea5f..648d5f2aad6df44ebad1642abf865129ccd7abd4 100644 (file)
@@ -1,7 +1,7 @@
 from . import Profiler
 
 from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import Column, Integer, String, create_engine
+from sqlalchemy import Column, Integer, String, create_engine, bindparam
 from sqlalchemy.orm import Session
 
 Base = declarative_base()
@@ -15,7 +15,7 @@ class Customer(Base):
     description = Column(String(255))
 
 
-def setup_database(dburl, echo):
+def setup_database(dburl, echo, num):
     global engine
     engine = create_engine(dburl, echo=echo)
     Base.metadata.drop_all(engine)
@@ -111,22 +111,35 @@ def test_core_insert(n):
 
 
 @Profiler.profile
-def test_sqlite_raw(n):
-    """pysqlite's pure C API inserting rows in bulk, no pure Python at all"""
-    conn = engine.raw_connection()
+def test_dbapi_raw(n):
+    """The DBAPI's pure C API inserting rows in bulk, no pure Python at all"""
+
+    conn = engine.pool._creator()
     cursor = conn.cursor()
-    cursor.executemany(
-        "INSERT INTO customer (name, description) VALUES(:name, :description)",
-        [
+    compiled = Customer.__table__.insert().values(
+        name=bindparam('name'),
+        description=bindparam('description')).\
+        compile(dialect=engine.dialect)
+
+    if compiled.positional:
+        args = (
+            ('customer name %d' % i, 'customer description %d' % i)
+            for i in range(n))
+    else:
+        args = (
             dict(
                 name='customer name %d' % i,
                 description='customer description %d' % i
             )
             for i in range(n)
-        ]
+        )
+
+    cursor.executemany(
+        str(compiled),
+        list(args)
     )
     conn.commit()
-
+    conn.close()
 
 if __name__ == '__main__':
-    Profiler.main(setup=setup_database)
+    Profiler.main(setup=setup_database, num=100000)
diff --git a/examples/performance/large_resultsets.py b/examples/performance/large_resultsets.py
new file mode 100644 (file)
index 0000000..268c6dc
--- /dev/null
@@ -0,0 +1,155 @@
+"""In this series of tests, we are looking at time to load 1M very small
+and simple rows.
+
+"""
+from . import Profiler
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine, literal_column
+from sqlalchemy.orm import Session, Bundle
+
+Base = declarative_base()
+engine = None
+
+
+class Customer(Base):
+    __tablename__ = "customer"
+    id = Column(Integer, primary_key=True)
+    name = Column(String(255))
+    description = Column(String(255))
+
+
+def setup_database(dburl, echo, num):
+    global engine
+    engine = create_engine(dburl, echo=echo)
+    Base.metadata.drop_all(engine)
+    Base.metadata.create_all(engine)
+
+    s = Session(engine)
+    for chunk in range(0, num, 10000):
+        s.bulk_insert_mappings(Customer, [
+            {
+                'name': 'customer name %d' % i,
+                'description': 'customer description %d' % i
+            } for i in range(chunk, chunk + 10000)
+        ])
+    s.commit()
+
+
+@Profiler.profile
+def test_orm_full_objects(n):
+    """Load fully tracked objects using the ORM."""
+
+    sess = Session(engine)
+    # avoid using all() so that we don't have the overhead of building
+    # a large list of full objects in memory
+    for obj in sess.query(Customer).yield_per(1000).limit(n):
+        pass
+
+
+@Profiler.profile
+def test_orm_bundles(n):
+    """Load lightweight "bundle" objects using the ORM."""
+
+    sess = Session(engine)
+    bundle = Bundle('customer',
+                    Customer.id, Customer.name, Customer.description)
+    for row in sess.query(bundle).yield_per(10000).limit(n):
+        pass
+
+
+@Profiler.profile
+def test_orm_columns(n):
+    """Load individual columns into named tuples using the ORM."""
+
+    sess = Session(engine)
+    for row in sess.query(
+        Customer.id, Customer.name,
+            Customer.description).yield_per(10000).limit(n):
+        pass
+
+
+@Profiler.profile
+def test_core_fetchall(n):
+    """Load Core result rows using Core / fetchall."""
+
+    with engine.connect() as conn:
+        result = conn.execute(Customer.__table__.select().limit(n)).fetchall()
+        for row in result:
+            data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_core_fetchchunks_w_streaming(n):
+    """Load Core result rows using Core with fetchmany and
+    streaming results."""
+
+    with engine.connect() as conn:
+        result = conn.execution_options(stream_results=True).\
+            execute(Customer.__table__.select().limit(n))
+        while True:
+            chunk = result.fetchmany(10000)
+            if not chunk:
+                break
+            for row in chunk:
+                data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_core_fetchchunks(n):
+    """Load Core result rows using Core / fetchmany."""
+
+    with engine.connect() as conn:
+        result = conn.execute(Customer.__table__.select().limit(n))
+        while True:
+            chunk = result.fetchmany(10000)
+            if not chunk:
+                break
+            for row in chunk:
+                data = row['id'], row['name'], row['description']
+
+
+@Profiler.profile
+def test_dbapi_fetchall(n):
+    """Load DBAPI cursor rows using fetchall()"""
+
+    _test_dbapi_raw(n, True)
+
+
+@Profiler.profile
+def test_dbapi_fetchchunks(n):
+    """Load DBAPI cursor rows using fetchmany()
+    (usually doesn't limit memory)"""
+
+    _test_dbapi_raw(n, False)
+
+
+def _test_dbapi_raw(n, fetchall):
+    compiled = Customer.__table__.select().limit(n).\
+        compile(
+            dialect=engine.dialect,
+            compile_kwargs={"literal_binds": True})
+
+    sql = str(compiled)
+
+    import pdb
+    pdb.set_trace()
+    conn = engine.raw_connection()
+    cursor = conn.cursor()
+    cursor.execute(sql)
+
+    if fetchall:
+        for row in cursor.fetchall():
+            # ensure that we fully fetch!
+            data = row[0], row[1], row[2]
+    else:
+        while True:
+            chunk = cursor.fetchmany(10000)
+            if not chunk:
+                break
+            for row in chunk:
+                data = row[0], row[1], row[2]
+    conn.close()
+
+if __name__ == '__main__':
+    Profiler.main(setup_once=setup_database, num=1000000)
diff --git a/examples/performance/single_inserts.py b/examples/performance/single_inserts.py
new file mode 100644 (file)
index 0000000..671bbbe
--- /dev/null
@@ -0,0 +1,145 @@
+"""In this series of tests, we're looking at a method that inserts a row
+within a distinct transaction, and afterwards returns to essentially a
+"closed" state.   This would be analogous to an API call that starts up
+a database connection, inserts the row, commits and closes.
+
+"""
+from . import Profiler
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine, bindparam, pool
+from sqlalchemy.orm import Session
+
+Base = declarative_base()
+engine = None
+
+
+class Customer(Base):
+    __tablename__ = "customer"
+    id = Column(Integer, primary_key=True)
+    name = Column(String(255))
+    description = Column(String(255))
+
+
+def setup_database(dburl, echo, num):
+    global engine
+    engine = create_engine(dburl, echo=echo)
+    if engine.dialect.name == 'sqlite':
+        engine.pool = pool.StaticPool(creator=engine.pool._creator)
+    Base.metadata.drop_all(engine)
+    Base.metadata.create_all(engine)
+
+
+@Profiler.profile
+def test_orm_commit(n):
+    """Individual INSERT/COMMIT pairs via the ORM"""
+
+    for i in range(n):
+        session = Session(bind=engine)
+        session.add(
+            Customer(
+                name='customer name %d' % i,
+                description='customer description %d' % i)
+        )
+        session.commit()
+
+
+@Profiler.profile
+def test_bulk_save(n):
+    """Individual INSERT/COMMIT pairs using the "bulk" API """
+
+    for i in range(n):
+        session = Session(bind=engine)
+        session.bulk_save_objects([
+            Customer(
+                name='customer name %d' % i,
+                description='customer description %d' % i
+            )])
+        session.commit()
+
+
+@Profiler.profile
+def test_bulk_insert_dictionaries(n):
+    """Individual INSERT/COMMIT pairs using the "bulk" API with dictionaries"""
+
+    for i in range(n):
+        session = Session(bind=engine)
+        session.bulk_insert_mappings(Customer, [
+            dict(
+                name='customer name %d' % i,
+                description='customer description %d' % i
+            )])
+        session.commit()
+
+
+@Profiler.profile
+def test_core(n):
+    """Individual INSERT/COMMIT pairs using Core."""
+
+    for i in range(n):
+        with engine.begin() as conn:
+            conn.execute(
+                Customer.__table__.insert(),
+                dict(
+                    name='customer name %d' % i,
+                    description='customer description %d' % i
+                )
+            )
+
+
+@Profiler.profile
+def test_dbapi_raw_w_connect(n):
+    """Individual INSERT/COMMIT pairs using a pure DBAPI connection,
+    connect each time."""
+
+    _test_dbapi_raw(n, True)
+
+
+@Profiler.profile
+def test_dbapi_raw_w_pool(n):
+    """Individual INSERT/COMMIT pairs using a pure DBAPI connection,
+    using a connection pool."""
+
+    _test_dbapi_raw(n, False)
+
+
+def _test_dbapi_raw(n, connect):
+    compiled = Customer.__table__.insert().values(
+        name=bindparam('name'),
+        description=bindparam('description')).\
+        compile(dialect=engine.dialect)
+
+    if compiled.positional:
+        args = (
+            ('customer name %d' % i, 'customer description %d' % i)
+            for i in range(n))
+    else:
+        args = (
+            dict(
+                name='customer name %d' % i,
+                description='customer description %d' % i
+            )
+            for i in range(n)
+        )
+    sql = str(compiled)
+
+    if connect:
+        for arg in args:
+            # there's no connection pool, so if these were distinct
+            # calls, we'd be connecting each time
+            conn = engine.pool._creator()
+            cursor = conn.cursor()
+            cursor.execute(sql, arg)
+            conn.commit()
+            conn.close()
+    else:
+        for arg in args:
+            conn = engine.raw_connection()
+            cursor = conn.cursor()
+            cursor.execute(sql, arg)
+            conn.commit()
+            conn.close()
+
+
+if __name__ == '__main__':
+    Profiler.main(setup=setup_database, num=10000)