]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- add this for testing
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 8 Mar 2015 18:43:42 +0000 (14:43 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 8 Mar 2015 18:43:42 +0000 (14:43 -0400)
examples/performance/short_selects.py [new file with mode: 0644]

diff --git a/examples/performance/short_selects.py b/examples/performance/short_selects.py
new file mode 100644 (file)
index 0000000..d81dc0d
--- /dev/null
@@ -0,0 +1,109 @@
+"""This series of tests illustrates different ways to INSERT a large number
+of rows in bulk.
+
+
+"""
+from . import Profiler
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, Integer, String, create_engine, \
+    bindparam, select
+from sqlalchemy.orm import Session, deferred
+import random
+
+Base = declarative_base()
+engine = None
+
+ids = range(1, 11000)
+
+
+class Customer(Base):
+    __tablename__ = "customer"
+    id = Column(Integer, primary_key=True)
+    name = Column(String(255))
+    description = Column(String(255))
+    q = Column(Integer)
+    p = Column(Integer)
+    x = deferred(Column(Integer))
+    y = deferred(Column(Integer))
+    z = deferred(Column(Integer))
+
+Profiler.init("short_selects", num=10000)
+
+
+@Profiler.setup
+def setup_database(dburl, echo, num):
+    global engine
+    engine = create_engine(dburl, echo=echo)
+    Base.metadata.drop_all(engine)
+    Base.metadata.create_all(engine)
+    sess = Session(engine)
+    sess.add_all([
+        Customer(
+            id=i, name='c%d' % i, description="c%d" % i,
+            q="q%d" % i,
+            p="p%d" % i,
+            x="x%d" % i,
+            y="y%d" % i,
+        )
+        for i in ids
+    ])
+    sess.commit()
+
+
+@Profiler.profile
+def test_orm_query(n):
+    """test a straight ORM query of the full entity."""
+    session = Session(bind=engine)
+    for id_ in random.sample(ids, n):
+        session.query(Customer).filter(Customer.id == id_).one()
+
+
+@Profiler.profile
+def test_orm_query_cols_only(n):
+    """test an ORM query of only the entity columns."""
+    session = Session(bind=engine)
+    for id_ in random.sample(ids, n):
+        session.query(
+            Customer.id, Customer.name, Customer.description
+        ).filter(Customer.id == id_).one()
+
+
+@Profiler.profile
+def test_core_new_stmt_each_time(n):
+    """test core, creating a new statement each time."""
+
+    with engine.connect() as conn:
+        for id_ in random.sample(ids, n):
+            stmt = select([Customer.__table__]).where(Customer.id == id_)
+            row = conn.execute(stmt).first()
+            tuple(row)
+
+
+@Profiler.profile
+def test_core_reuse_stmt(n):
+    """test core, reusing the same statement (but recompiling each time)."""
+
+    stmt = select([Customer.__table__]).where(Customer.id == bindparam('id'))
+    with engine.connect() as conn:
+        for id_ in random.sample(ids, n):
+
+            row = conn.execute(stmt, id=id_).first()
+            tuple(row)
+
+
+@Profiler.profile
+def test_core_reuse_stmt_compiled_cache(n):
+    """test core, reusing the same statement + compiled cache."""
+
+    compiled_cache = {}
+    stmt = select([Customer.__table__]).where(Customer.id == bindparam('id'))
+    with engine.connect().\
+            execution_options(compiled_cache=compiled_cache) as conn:
+        for id_ in random.sample(ids, n):
+            row = conn.execute(stmt, id=id_).first()
+            tuple(row)
+
+
+if __name__ == '__main__':
+    Profiler.main()