+++ /dev/null
-This directory contains informal scripts used to stress test various
-library subsections over the years, including testing of memory usage,
-function call count, threading behavior.
-
-The scripts here are *not* part of the automated test suite, and instead
-were used at the time of development for particular features or
-performance enhancements in an ad-hoc fashion. Ideally
-the various functionality tested within would be brought under the
-umbrella of controlled, automated tests. Many of the scripts here
-are out of date and are possibly covered by formal performance tests
-elsewhere.
-
-Current automated stress and performance tests are in test/aaa_profiling/,
-which test either for expected function call count, or flat growth in memory
-usage over time. These tests are part of the automated test suite
-and are maintained for 100% success rate along Python versions from 2.4 through
-current 3 versions.
\ No newline at end of file
+++ /dev/null
-import sys, time
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.testing import profiling
-
-db = create_engine('sqlite://')
-metadata = MetaData(db)
-Person_table = Table('Person', metadata,
- Column('name', String(40)),
- Column('sex', Integer),
- Column('age', Integer))
-
-
-def sa_unprofiled_insertmany(n):
- i = Person_table.insert()
- i.execute([{'name':'John Doe','sex':1,'age':35} for j in range(n)])
-
-def sqlite_unprofiled_insertmany(n):
- conn = db.connect().connection
- c = conn.cursor()
- persons = [('john doe', 1, 35) for i in range(n)]
- c.executemany("insert into Person(name, sex, age) values (?,?,?)", persons)
-
-@profiling.profiled('sa_profiled_insert_many', always=True)
-def sa_profiled_insert_many(n):
- i = Person_table.insert()
- i.execute([{'name':'John Doe','sex':1,'age':35} for j in range(n)])
- s = Person_table.select()
- r = s.execute()
- res = [[value for value in row] for row in r.fetchall()]
-
-def sqlite_unprofiled_insert(n):
- conn = db.connect().connection
- c = conn.cursor()
- for j in range(n):
- c.execute("insert into Person(name, sex, age) values (?,?,?)",
- ('john doe', 1, 35))
-
-def sa_unprofiled_insert(n):
- # Another option is to build Person_table.insert() outside of the
- # loop. But it doesn't make much of a difference, so might as well
- # use the worst-case/naive version here.
- for j in range(n):
- Person_table.insert().execute({'name':'John Doe','sex':1,'age':35})
-
-@profiling.profiled('sa_profiled_insert', always=True)
-def sa_profiled_insert(n):
- i = Person_table.insert()
- for j in range(n):
- i.execute({'name':'John Doe','sex':1,'age':35})
- s = Person_table.select()
- r = s.execute()
- res = [[value for value in row] for row in r.fetchall()]
-
-def run_timed(fn, label, *args, **kw):
- metadata.drop_all()
- metadata.create_all()
-
- sys.stdout.write("%s (%s): " % (label, ', '.join([str(a) for a in args])))
- sys.stdout.flush()
-
- t = time.clock()
- fn(*args, **kw)
- t2 = time.clock()
-
- sys.stdout.write("%0.2f seconds\n" % (t2 - t))
-
-def run_profiled(fn, label, *args, **kw):
- metadata.drop_all()
- metadata.create_all()
-
- print("%s (%s)" % (label, ', '.join([str(a) for a in args])))
- fn(*args, **kw)
-
-def all():
- try:
- print("Bulk INSERTS via executemany():\n")
-
- run_timed(sqlite_unprofiled_insertmany,
- 'pysqlite bulk insert',
- 50000)
-
- run_timed(sa_unprofiled_insertmany,
- 'SQLAlchemy bulk insert',
- 50000)
-
- run_profiled(sa_profiled_insert_many,
- 'SQLAlchemy bulk insert/select, profiled',
- 50000)
-
- print("\nIndividual INSERTS via execute():\n")
-
- run_timed(sqlite_unprofiled_insert,
- "pysqlite individual insert",
- 50000)
-
- run_timed(sa_unprofiled_insert,
- "SQLAlchemy individual insert",
- 50000)
-
- run_profiled(sa_profiled_insert,
- 'SQLAlchemy individual insert/select, profiled',
- 50000)
-
- finally:
- metadata.drop_all()
-
-if __name__ == '__main__':
- all()
+++ /dev/null
-import sqlalchemy as sa
-from sqlalchemy import create_engine, MetaData, orm
-from sqlalchemy import Column, ForeignKey
-from sqlalchemy import Integer, String
-from sqlalchemy.orm import mapper
-from sqlalchemy.testing import profiling
-
-class Object(object):
- pass
-
-class Q(Object):
- pass
-
-class A(Object):
- pass
-
-class C(Object):
- pass
-
-class WC(C):
- pass
-
-engine = create_engine('sqlite:///:memory:', echo=True)
-
-sm = orm.sessionmaker(bind=engine)
-
-SA_Session = orm.scoped_session(sm)
-
-SA_Metadata = MetaData()
-
-object_table = sa.Table('Object',
- SA_Metadata,
- Column('ObjectID', Integer,primary_key=True),
- Column('Type', String(1), nullable=False))
-
-q_table = sa.Table('Q',
- SA_Metadata,
- Column('QID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
-
-c_table = sa.Table('C',
- SA_Metadata,
- Column('CID', Integer, ForeignKey('Object.ObjectID'),primary_key=True))
-
-wc_table = sa.Table('WC',
- SA_Metadata,
- Column('WCID', Integer, ForeignKey('C.CID'), primary_key=True))
-
-a_table = sa.Table('A',
- SA_Metadata,
- Column('AID', Integer, ForeignKey('Object.ObjectID'),primary_key=True),
- Column('QID', Integer, ForeignKey('Q.QID')),
- Column('CID', Integer, ForeignKey('C.CID')))
-
-mapper(Object, object_table, polymorphic_on=object_table.c.Type, polymorphic_identity='O')
-
-mapper(Q, q_table, inherits=Object, polymorphic_identity='Q')
-mapper(C, c_table, inherits=Object, polymorphic_identity='C')
-mapper(WC, wc_table, inherits=C, polymorphic_identity='W')
-
-mapper(A, a_table, inherits=Object, polymorphic_identity='A',
- properties = {
- 'Q' : orm.relation(Q,primaryjoin=a_table.c.QID==q_table.c.QID,
- backref='As'
- ),
- 'C' : orm.relation(C,primaryjoin=a_table.c.CID==c_table.c.CID,
- backref='A',
- uselist=False)
- }
- )
-
-SA_Metadata.create_all(engine)
-
-@profiling.profiled('large_flush', always=True, sort=['file'])
-def generate_error():
- q = Q()
- for j in range(100): #at 306 the error does not pop out (depending on recursion depth)
- a = A()
- a.Q = q
- a.C = WC()
-
- SA_Session.add(q)
- SA_Session.commit() #here the error pops out
-
-generate_error()
\ No newline at end of file
+++ /dev/null
-import time, resource
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.testing.util import gc_collect
-from sqlalchemy.testing import profiling
-
-db = create_engine('sqlite://')
-metadata = MetaData(db)
-Person_table = Table('Person', metadata,
- Column('id', Integer, primary_key=True),
- Column('type', String(10)),
- Column('name', String(40)),
- Column('sex', Integer),
- Column('age', Integer))
-
-
-Employee_table = Table('Employee', metadata,
- Column('id', Integer, ForeignKey('Person.id'), primary_key=True),
- Column('foo', String(40)),
- Column('bar', Integer),
- Column('bat', Integer))
-
-class RawPerson(object): pass
-class Person(object): pass
-mapper(Person, Person_table)
-
-class JoinedPerson(object):pass
-class Employee(JoinedPerson):pass
-mapper(JoinedPerson, Person_table, \
- polymorphic_on=Person_table.c.type, polymorphic_identity='person')
-mapper(Employee, Employee_table, \
- inherits=JoinedPerson, polymorphic_identity='employee')
-compile_mappers()
-
-def setup():
- metadata.create_all()
- i = Person_table.insert()
- data = [{'name':'John Doe','sex':1,'age':35, 'type':'employee'}] * 100
- for j in range(500):
- i.execute(data)
-
- # note we arent fetching from employee_table,
- # so we can leave it empty even though its "incorrect"
- #i = Employee_table.insert()
- #data = [{'foo':'foo', 'bar':'bar':'bat':'bat'}] * 100
- #for j in xrange(500):
- # i.execute(data)
-
- print("Inserted 50,000 rows")
-
-def sqlite_select(entity_cls):
- conn = db.connect().connection
- cr = conn.cursor()
- cr.execute("SELECT id, name, sex, age FROM Person")
- people = []
- for row in cr.fetchall():
- person = entity_cls()
- person.id = row[0]
- person.name = row[1]
- person.sex = row[2]
- person.age = row[3]
- people.append(person)
- cr.close()
- conn.close()
-
-def sql_select(entity_cls):
- people = []
- for row in Person_table.select().execute().fetchall():
- person = entity_cls()
- person.id = row['id']
- person.name = row['name']
- person.sex = row['sex']
- person.age = row['age']
- people.append(person)
-
-#@profiling.profiled(report=True, always=True)
-def orm_select():
- session = create_session()
- people = session.query(Person).all()
-
-#@profiling.profiled(report=True, always=True)
-def joined_orm_select():
- session = create_session()
- people = session.query(JoinedPerson).all()
-
-def all():
- setup()
- try:
- t, t2 = 0, 0
- def usage(label):
- now = resource.getrusage(resource.RUSAGE_SELF)
- print("%s: %0.3fs real, %0.3fs user, %0.3fs sys" % (
- label, t2 - t,
- now.ru_utime - usage.last.ru_utime,
- now.ru_stime - usage.last.ru_stime))
- usage.snap(now)
- usage.snap = lambda stats=None: setattr(
- usage, 'last', stats or resource.getrusage(resource.RUSAGE_SELF))
-
- gc_collect()
- usage.snap()
- t = time.clock()
- sqlite_select(RawPerson)
- t2 = time.clock()
- usage('sqlite select/native')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- sqlite_select(Person)
- t2 = time.clock()
- usage('sqlite select/instrumented')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- sql_select(RawPerson)
- t2 = time.clock()
- usage('sqlalchemy.sql select/native')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- sql_select(Person)
- t2 = time.clock()
- usage('sqlalchemy.sql select/instrumented')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- orm_select()
- t2 = time.clock()
- usage('sqlalchemy.orm fetch')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- joined_orm_select()
- t2 = time.clock()
- usage('sqlalchemy.orm "joined" fetch')
- finally:
- metadata.drop_all()
-
-
-if __name__ == '__main__':
- all()
+++ /dev/null
-import time, resource
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.testing import *
-from sqlalchemy.testing.util import gc_collect
-
-
-NUM = 100
-
-metadata = MetaData(testing.db)
-Person_table = Table('Person', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(40)),
- Column('sex', Integer),
- Column('age', Integer))
-
-Email_table = Table('Email', metadata,
- Column('id', Integer, primary_key=True),
- Column('person_id', Integer, ForeignKey('Person.id')),
- Column('address', String(300)))
-
-class Person(object):
- pass
-class Email(object):
- def __repr__(self):
- return '<email %s %s>' % (getattr(self, 'id', None),
- getattr(self, 'address', None))
-
-mapper(Person, Person_table, properties={
- 'emails': relationship(Email, backref='owner', lazy='joined')
- })
-mapper(Email, Email_table)
-compile_mappers()
-
-def setup():
- metadata.create_all()
- i = Person_table.insert()
- data = [{'name':'John Doe','sex':1,'age':35}] * NUM
- i.execute(data)
-
- i = Email_table.insert()
- for j in range(1, NUM + 1):
- i.execute(address='foo@bar', person_id=j)
- if j % 2:
- i.execute(address='baz@quux', person_id=j)
-
- print("Inserted %d rows." % (NUM + NUM + (NUM // 2)))
-
-def orm_select(session):
- return session.query(Person).all()
-
-@profiling.profiled('update_and_flush')
-def update_and_flush(session, people):
- for p in people:
- p.name = 'Exene Cervenka'
- p.sex = 2
- p.emails[0].address = 'hoho@lala'
- session.flush()
-
-def all():
- setup()
- try:
- t, t2 = 0, 0
- def usage(label):
- now = resource.getrusage(resource.RUSAGE_SELF)
- print("%s: %0.3fs real, %0.3fs user, %0.3fs sys" % (
- label, t2 - t,
- now.ru_utime - usage.last.ru_utime,
- now.ru_stime - usage.last.ru_stime))
- usage.snap(now)
- usage.snap = lambda stats=None: setattr(
- usage, 'last', stats or resource.getrusage(resource.RUSAGE_SELF))
-
- session = create_session()
-
- gc_collect()
- usage.snap()
- t = time.clock()
- people = orm_select(session)
- t2 = time.clock()
- usage('load objects')
-
- gc_collect()
- usage.snap()
- t = time.clock()
- update_and_flush(session, people)
- t2 = time.clock()
- usage('update and flush')
- finally:
- metadata.drop_all()
-
-
-if __name__ == '__main__':
- all()
+++ /dev/null
-import time
-from datetime import datetime
-
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy.testing import *
-from sqlalchemy.testing.profiling import profiled
-
-class Item(object):
- def __repr__(self):
- return 'Item<#%s "%s">' % (self.id, self.name)
-class SubItem(object):
- def __repr__(self):
- return 'SubItem<#%s "%s">' % (self.id, self.name)
-class Customer(object):
- def __repr__(self):
- return 'Customer<#%s "%s">' % (self.id, self.name)
-class Purchase(object):
- def __repr__(self):
- return 'Purchase<#%s "%s">' % (self.id, self.purchase_date)
-
-items, subitems, customers, purchases, purchaseitems = \
- None, None, None, None, None
-
-metadata = MetaData()
-
-@profiled('table')
-def define_tables():
- global items, subitems, customers, purchases, purchaseitems
- items = Table('items', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(100)),
- test_needs_acid=True)
- subitems = Table('subitems', metadata,
- Column('id', Integer, primary_key=True),
- Column('item_id', Integer, ForeignKey('items.id'),
- nullable=False),
- Column('name', String(100), server_default='no name'),
- test_needs_acid=True)
- customers = Table('customers', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(100)),
- *[Column("col_%s" % chr(i), String(64), default=str(i))
- for i in range(97,117)],
- **dict(test_needs_acid=True))
- purchases = Table('purchases', metadata,
- Column('id', Integer, primary_key=True),
- Column('customer_id', Integer,
- ForeignKey('customers.id'), nullable=False),
- Column('purchase_date', DateTime,
- default=datetime.now),
- test_needs_acid=True)
- purchaseitems = Table('purchaseitems', metadata,
- Column('purchase_id', Integer,
- ForeignKey('purchases.id'),
- nullable=False, primary_key=True),
- Column('item_id', Integer, ForeignKey('items.id'),
- nullable=False, primary_key=True),
- test_needs_acid=True)
-
-@profiled('mapper')
-def setup_mappers():
- mapper(Item, items, properties={
- 'subitems': relationship(SubItem, backref='item', lazy='select')
- })
- mapper(SubItem, subitems)
- mapper(Customer, customers, properties={
- 'purchases': relationship(Purchase, lazy='select', backref='customer')
- })
- mapper(Purchase, purchases, properties={
- 'items': relationship(Item, lazy='select', secondary=purchaseitems)
- })
-
-@profiled('inserts')
-def insert_data():
- q_items = 1000
- q_sub_per_item = 10
- q_customers = 1000
-
- con = testing.db.connect()
-
- transaction = con.begin()
- data, subdata = [], []
- for item_id in range(1, q_items + 1):
- data.append({'name': "item number %s" % item_id})
- for subitem_id in range(1, (item_id % q_sub_per_item) + 1):
- subdata.append({'item_id': item_id,
- 'name': "subitem number %s" % subitem_id})
- if item_id % 100 == 0:
- items.insert().execute(*data)
- subitems.insert().execute(*subdata)
- del data[:]
- del subdata[:]
- if data:
- items.insert().execute(*data)
- if subdata:
- subitems.insert().execute(*subdata)
- transaction.commit()
-
- transaction = con.begin()
- data = []
- for customer_id in range(1, q_customers):
- data.append({'name': "customer number %s" % customer_id})
- if customer_id % 100 == 0:
- customers.insert().execute(*data)
- del data[:]
- if data:
- customers.insert().execute(*data)
- transaction.commit()
-
- transaction = con.begin()
- data, subdata = [], []
- order_t = int(time.time()) - (5000 * 5 * 60)
- current = range(1, q_customers)
- step, purchase_id = 1, 0
- while current:
- next = []
- for customer_id in current:
- order_t += 300
- data.append({'customer_id': customer_id,
- 'purchase_date': datetime.fromtimestamp(order_t)})
- purchase_id += 1
- for item_id in range(customer_id % 200, customer_id + 1, 200):
- if item_id != 0:
- subdata.append({'purchase_id': purchase_id,
- 'item_id': item_id})
- if customer_id % 10 > step:
- next.append(customer_id)
-
- if len(data) >= 100:
- purchases.insert().execute(*data)
- if subdata:
- purchaseitems.insert().execute(*subdata)
- del data[:]
- del subdata[:]
- step, current = step + 1, next
-
- if data:
- purchases.insert().execute(*data)
- if subdata:
- purchaseitems.insert().execute(*subdata)
- transaction.commit()
-
-@profiled('queries')
-def run_queries():
- session = create_session()
- # no explicit transaction here.
-
- # build a report of summarizing the last 50 purchases and
- # the top 20 items from all purchases
-
- q = session.query(Purchase). \
- order_by(desc(Purchase.purchase_date)). \
- limit(50).\
- options(joinedload('items'), joinedload('items.subitems'),
- joinedload('customer'))
-
- report = []
- # "write" the report. pretend it's going to a web template or something,
- # the point is to actually pull data through attributes and collections.
- for purchase in q:
- report.append(purchase.customer.name)
- report.append(purchase.customer.col_a)
- report.append(purchase.purchase_date)
- for item in purchase.items:
- report.append(item.name)
- report.extend([s.name for s in item.subitems])
-
- # mix a little low-level with orm
- # pull a report of the top 20 items of all time
- _item_id = purchaseitems.c.item_id
- top_20_q = select([func.distinct(_item_id).label('id')],
- group_by=[purchaseitems.c.purchase_id, _item_id],
- order_by=[desc(func.count(_item_id)), _item_id],
- limit=20)
- ids = [r.id for r in top_20_q.execute().fetchall()]
- q2 = session.query(Item).filter(Item.id.in_(ids))
-
- for num, item in enumerate(q2):
- report.append("number %s: %s" % (num + 1, item.name))
-
-@profiled('creating')
-def create_purchase():
- # commit a purchase
- customer_id = 100
- item_ids = (10,22,34,46,58)
-
- session = create_session()
- session.begin()
-
- customer = session.query(Customer).get(customer_id)
- items = session.query(Item).filter(Item.id.in_(item_ids))
-
- purchase = Purchase()
- purchase.customer = customer
- purchase.items.extend(items)
-
- session.flush()
- session.commit()
- session.expire(customer)
-
-def setup_db():
- metadata.drop_all()
- metadata.create_all()
-def cleanup_db():
- metadata.drop_all()
-
-@profiled('default')
-def default():
- run_queries()
- create_purchase()
-
-@profiled('all')
-def main():
- metadata.bind = testing.db
- try:
- define_tables()
- setup_mappers()
- setup_db()
- insert_data()
- default()
- finally:
- cleanup_db()
-
-main()
+++ /dev/null
-from sqlalchemy import *
-from sqlalchemy.orm import *
-
-from sqlalchemy.testing.compat import gc_collect
-from sqlalchemy.testing import AssertsExecutionResults, profiling, testing
-from test.orm import _fixtures
-
-# in this test we are specifically looking for time spent in the attributes.InstanceState.__cleanup() method.
-
-ITERATIONS = 100
-
-class SessionTest(fixtures.TestBase, AssertsExecutionResults):
- @classmethod
- def setup_class(cls):
- global t1, t2, metadata,T1, T2
- metadata = MetaData(testing.db)
- t1 = Table('t1', metadata,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(30)))
-
- t2 = Table('t2', metadata,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(30)),
- Column('t1id', Integer, ForeignKey('t1.c1'))
- )
-
- metadata.create_all()
-
- l = []
- for x in range(1,51):
- l.append({'c2':'this is t1 #%d' % x})
- t1.insert().execute(*l)
- for x in range(1, 51):
- l = []
- for y in range(1, 100):
- l.append({'c2':'this is t2 #%d' % y, 't1id':x})
- t2.insert().execute(*l)
-
- class T1(fixtures.ComparableEntity):
- pass
- class T2(fixtures.ComparableEntity):
- pass
-
- mapper(T1, t1, properties={
- 't2s':relationship(T2, backref='t1')
- })
- mapper(T2, t2)
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
- clear_mappers()
-
- @profiling.profiled('clean', report=True)
- def test_session_clean(self):
- for x in range(0, ITERATIONS):
- sess = create_session()
- t1s = sess.query(T1).filter(T1.c1.between(15, 48)).all()
- for index in [2, 7, 12, 15, 18, 20]:
- t1s[index].t2s
-
- sess.close()
- del sess
- gc_collect()
-
- @profiling.profiled('dirty', report=True)
- def test_session_dirty(self):
- for x in range(0, ITERATIONS):
- sess = create_session()
- t1s = sess.query(T1).filter(T1.c1.between(15, 48)).all()
-
- for index in [2, 7, 12, 15, 18, 20]:
- t1s[index].c2 = 'this is some modified text'
- for t2 in t1s[index].t2s:
- t2.c2 = 'this is some modified text'
-
- del t1s
- gc_collect()
-
- sess.close()
- del sess
- gc_collect()
-
- @profiling.profiled('noclose', report=True)
- def test_session_noclose(self):
- for x in range(0, ITERATIONS):
- sess = create_session()
- t1s = sess.query(T1).filter(T1.c1.between(15, 48)).all()
- for index in [2, 7, 12, 15, 18, 20]:
- t1s[index].t2s
-
- del sess
- gc_collect()
-
-
+++ /dev/null
-# -*- encoding: utf8 -*-
-from datetime import *
-import decimal
-#from fastdec import mpd as Decimal
-from pickle import dumps, loads
-
-#from sqlalchemy.dialects.postgresql.base import ARRAY
-
-from stresstest import *
-
-# ---
-test_types = False
-test_methods = True
-test_pickle = False
-test_orm = False
-# ---
-verbose = True
-
-def values_results(raw_results):
- return [tuple(r.values()) for r in raw_results]
-
-def getitem_str_results(raw_results):
- return [
- (r['id'],
- r['field0'], r['field1'], r['field2'], r['field3'], r['field4'],
- r['field5'], r['field6'], r['field7'], r['field8'], r['field9'])
- for r in raw_results]
-
-def getitem_fallback_results(raw_results):
- return [
- (r['ID'],
- r['FIELD0'], r['FIELD1'], r['FIELD2'], r['FIELD3'], r['FIELD4'],
- r['FIELD5'], r['FIELD6'], r['FIELD7'], r['FIELD8'], r['FIELD9'])
- for r in raw_results]
-
-def getitem_int_results(raw_results):
- return [
- (r[0],
- r[1], r[2], r[3], r[4], r[5],
- r[6], r[7], r[8], r[9], r[10])
- for r in raw_results]
-
-def getitem_long_results(raw_results):
- return [
- (r[0],
- r[1], r[2], r[3], r[4], r[5],
- r[6], r[7], r[8], r[9], r[10])
- for r in raw_results]
-
-def getitem_obj_results(raw_results):
- c = test_table.c
- fid, f0, f1, f2, f3, f4, f5, f6, f7, f8, f9 = (
- c.id, c.field0, c.field1, c.field2, c.field3, c.field4,
- c.field5, c.field6, c.field7, c.field8, c.field9)
- return [
- (r[fid],
- r[f0], r[f1], r[f2], r[f3], r[f4],
- r[f5], r[f6], r[f7], r[f8], r[f9])
- for r in raw_results]
-
-def slice_results(raw_results):
- return [row[0:6] + row[6:11] for row in raw_results]
-
-# ---------- #
-# Test types #
-# ---------- #
-
-# Array
-#def genarrayvalue(rnum, fnum):
-# return [fnum, fnum + 1, fnum + 2]
-#arraytest = (ARRAY(Integer), genarrayvalue,
-# dict(num_fields=100, num_records=1000,
-# engineurl='postgresql:///test'))
-
-# Boolean
-def genbooleanvalue(rnum, fnum):
- if rnum % 4:
- return bool(fnum % 2)
- else:
- return None
-booleantest = (Boolean, genbooleanvalue, dict(num_records=100000))
-
-# Datetime
-def gendatetimevalue(rnum, fnum):
- return (rnum % 4) and datetime(2005, 3, 3) or None
-datetimetest = (DateTime, gendatetimevalue, dict(num_records=10000))
-
-# Decimal
-def gendecimalvalue(rnum, fnum):
- if rnum % 4:
- return Decimal(str(0.25 * fnum))
- else:
- return None
-decimaltest = (Numeric(10, 2), gendecimalvalue, dict(num_records=10000))
-
-# Interval
-
-# no microseconds because Postgres does not seem to support it
-from_epoch = timedelta(14643, 70235)
-def genintervalvalue(rnum, fnum):
- return from_epoch
-intervaltest = (Interval, genintervalvalue,
- dict(num_fields=2, num_records=100000))
-
-# PickleType
-def genpicklevalue(rnum, fnum):
- return (rnum % 4) and {'str': "value%d" % fnum, 'int': rnum} or None
-pickletypetest = (PickleType, genpicklevalue,
- dict(num_fields=1, num_records=100000))
-
-# TypeDecorator
-class MyIntType(TypeDecorator):
- impl = Integer
-
- def process_bind_param(self, value, dialect):
- return value * 10
-
- def process_result_value(self, value, dialect):
- return value / 10
-
- def copy(self):
- return MyIntType()
-
-def genmyintvalue(rnum, fnum):
- return rnum + fnum
-typedecoratortest = (MyIntType, genmyintvalue,
- dict(num_records=100000))
-
-# Unicode
-def genunicodevalue(rnum, fnum):
- return (rnum % 4) and ("value%d" % fnum) or None
-unicodetest = (Unicode(20, ), genunicodevalue,
- dict(num_records=100000))
-# dict(engineurl='mysql:///test', freshdata=False))
-
-# do the tests
-if test_types:
- tests = [booleantest, datetimetest, decimaltest, intervaltest,
- pickletypetest, typedecoratortest, unicodetest]
- for engineurl in ('postgresql://scott:tiger@localhost/test',
- 'sqlite://', 'mysql://scott:tiger@localhost/test'):
- print("\n%s\n" % engineurl)
- for datatype, genvalue, kwargs in tests:
- print("%s:" % getattr(datatype, '__name__',
- datatype.__class__.__name__), end=' ')
- profile_and_time_dbfunc(iter_results, datatype, genvalue,
- profile=False, engineurl=engineurl,
- verbose=verbose, **kwargs)
-
-# ---------------------- #
-# test row proxy methods #
-# ---------------------- #
-
-if test_methods:
- methods = [iter_results, values_results, getattr_results,
- getitem_str_results, getitem_fallback_results,
- getitem_int_results, getitem_long_results, getitem_obj_results,
- slice_results]
- for engineurl in ('postgresql://scott:tiger@localhost/test',
- 'sqlite://', 'mysql://scott:tiger@localhost/test'):
- print("\n%s\n" % engineurl)
- test_table = prepare(Unicode(20,),
- genunicodevalue,
- num_fields=10, num_records=100000,
- verbose=verbose, engineurl=engineurl)
- for method in methods:
- print("%s:" % method.__name__, end=' ')
- time_dbfunc(test_table, method, genunicodevalue,
- num_fields=10, num_records=100000, profile=False,
- verbose=verbose)
-
-# --------------------------------
-# test pickling Rowproxy instances
-# --------------------------------
-
-def pickletofile_results(raw_results):
- from pickle import dump, load
- for protocol in (0, 1, 2):
- print("dumping protocol %d..." % protocol)
- f = file('noext.pickle%d' % protocol, 'wb')
- dump(raw_results, f, protocol)
- f.close()
- return raw_results
-
-def pickle_results(raw_results):
- return loads(dumps(raw_results, 2))
-
-def pickle_meta(raw_results):
- pickled = dumps(raw_results[0]._parent, 2)
- metadata = loads(pickled)
- return raw_results
-
-def pickle_rows(raw_results):
- return [loads(dumps(row, 2)) for row in raw_results]
-
-if test_pickle:
- test_table = prepare(Unicode, genunicodevalue,
- num_fields=10, num_records=10000)
- funcs = [pickle_rows, pickle_results]
- for func in funcs:
- print("%s:" % func.__name__, end=' ')
- time_dbfunc(test_table, func, genunicodevalue,
- num_records=10000, profile=False, verbose=verbose)
-
-# --------------------------------
-# test ORM
-# --------------------------------
-
-if test_orm:
- from sqlalchemy.orm import *
-
- class Test(object):
- pass
-
- Session = sessionmaker()
- session = Session()
-
- def get_results():
- return session.query(Test).all()
- print("ORM:", end=' ')
- for engineurl in ('postgresql:///test', 'sqlite://', 'mysql:///test'):
- print("\n%s\n" % engineurl)
- profile_and_time_dbfunc(getattr_results, Unicode(20), genunicodevalue,
- class_=Test, getresults_func=get_results,
- engineurl=engineurl, #freshdata=False,
- num_records=10000, verbose=verbose)
+++ /dev/null
-import gc
-import sys
-import timeit
-import cProfile
-
-from sqlalchemy import MetaData, Table, Column
-from sqlalchemy.types import *
-from sqlalchemy.orm import mapper, clear_mappers
-
-metadata = MetaData()
-
-def gen_table(num_fields, field_type, metadata):
- return Table('test', metadata,
- Column('id', Integer, primary_key=True),
- *[Column("field%d" % fnum, field_type)
- for fnum in range(num_fields)])
-
-def insert(test_table, num_fields, num_records, genvalue, verbose=True):
- if verbose:
- print("building insert values...", end=' ')
- sys.stdout.flush()
- values = [dict(("field%d" % fnum, genvalue(rnum, fnum))
- for fnum in range(num_fields))
- for rnum in range(num_records)]
- if verbose:
- print("inserting...", end=' ')
- sys.stdout.flush()
- def db_insert():
- test_table.insert().execute(values)
- sys.modules['__main__'].db_insert = db_insert
- timing = timeit.timeit("db_insert()",
- "from __main__ import db_insert",
- number=1)
- if verbose:
- print("%s" % round(timing, 3))
-
-def check_result(results, num_fields, genvalue, verbose=True):
- if verbose:
- print("checking...", end=' ')
- sys.stdout.flush()
- for rnum, row in enumerate(results):
- expected = tuple([rnum + 1] +
- [genvalue(rnum, fnum) for fnum in range(num_fields)])
- assert row == expected, "got: %s\nexpected: %s" % (row, expected)
- return True
-
-def avgdev(values, comparison):
- return sum(value - comparison for value in values) / len(values)
-
-def nicer_res(values, printvalues=False):
- if printvalues:
- print(values)
- min_time = min(values)
- return round(min_time, 3), round(avgdev(values, min_time), 2)
-
-def profile_func(func_name, verbose=True):
- if verbose:
- print("profiling...", end=' ')
- sys.stdout.flush()
- cProfile.run('%s()' % func_name, 'prof')
-
-def time_func(func_name, num_tests=1, verbose=True):
- if verbose:
- print("timing...", end=' ')
- sys.stdout.flush()
- timings = timeit.repeat('%s()' % func_name,
- "from __main__ import %s" % func_name,
- number=num_tests, repeat=5)
- avg, dev = nicer_res(timings)
- if verbose:
- print("%s (%s)" % (avg, dev))
- else:
- print(avg)
-
-def profile_and_time(func_name, num_tests=1):
- profile_func(func_name)
- time_func(func_name, num_tests)
-
-def iter_results(raw_results):
- return [tuple(row) for row in raw_results]
-
-def getattr_results(raw_results):
- return [
- (r.id,
- r.field0, r.field1, r.field2, r.field3, r.field4,
- r.field5, r.field6, r.field7, r.field8, r.field9)
- for r in raw_results]
-
-def fetchall(test_table):
- def results():
- return test_table.select().order_by(test_table.c.id).execute() \
- .fetchall()
- return results
-
-def hashable_set(l):
- hashables = []
- for o in l:
- try:
- hash(o)
- hashables.append(o)
- except:
- pass
- return set(hashables)
-
-def prepare(field_type, genvalue, engineurl='sqlite://',
- num_fields=10, num_records=1000, freshdata=True, verbose=True):
- global metadata
- metadata.clear()
- metadata.bind = engineurl
- test_table = gen_table(num_fields, field_type, metadata)
- if freshdata:
- metadata.drop_all()
- metadata.create_all()
- insert(test_table, num_fields, num_records, genvalue, verbose)
- return test_table
-
-def time_dbfunc(test_table, test_func, genvalue,
- class_=None,
- getresults_func=None,
- num_fields=10, num_records=1000, num_tests=1,
- check_results=check_result, profile=True,
- check_leaks=True, print_leaks=False, verbose=True):
- if verbose:
- print("testing '%s'..." % test_func.__name__, end=' ')
- sys.stdout.flush()
- if class_ is not None:
- clear_mappers()
- mapper(class_, test_table)
- if getresults_func is None:
- getresults_func = fetchall(test_table)
- def test():
- return test_func(getresults_func())
- sys.modules['__main__'].test = test
- if check_leaks:
- gc.collect()
- objects_before = gc.get_objects()
- num_objects_before = len(objects_before)
- hashable_objects_before = hashable_set(objects_before)
-# gc.set_debug(gc.DEBUG_LEAK)
- if check_results:
- check_results(test(), num_fields, genvalue, verbose)
- if check_leaks:
- gc.collect()
- objects_after = gc.get_objects()
- num_objects_after = len(objects_after)
- num_leaks = num_objects_after - num_objects_before
- hashable_objects_after = hashable_set(objects_after)
- diff = hashable_objects_after - hashable_objects_before
- ldiff = len(diff)
- if print_leaks and ldiff < num_records:
- print("\n*** hashable objects leaked (%d) ***" % ldiff)
- print('\n'.join(map(str, diff)))
- print("***\n")
-
- if num_leaks > num_records:
- print("(leaked: %d !)" % num_leaks, end=' ')
- if profile:
- profile_func('test', verbose)
- time_func('test', num_tests, verbose)
-
-def profile_and_time_dbfunc(test_func, field_type, genvalue,
- class_=None,
- getresults_func=None,
- engineurl='sqlite://', freshdata=True,
- num_fields=10, num_records=1000, num_tests=1,
- check_results=check_result, profile=True,
- check_leaks=True, print_leaks=False, verbose=True):
- test_table = prepare(field_type, genvalue, engineurl,
- num_fields, num_records, freshdata, verbose)
- time_dbfunc(test_table, test_func, genvalue, class_,
- getresults_func,
- num_fields, num_records, num_tests,
- check_results, profile,
- check_leaks, print_leaks, verbose)
+++ /dev/null
-"""test that mapper compilation is threadsafe, including
-when additional mappers are created while the existing
-collection is being compiled."""
-
-from sqlalchemy import *
-from sqlalchemy.orm import *
-import _thread, time
-from sqlalchemy.orm import mapperlib
-
-
-meta = MetaData('sqlite:///foo.db')
-
-t1 = Table('t1', meta,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(30))
- )
-
-t2 = Table('t2', meta,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(30)),
- Column('t1c1', None, ForeignKey('t1.c1'))
-)
-t3 = Table('t3', meta,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(30)),
-)
-meta.create_all()
-
-class T1(object):
- pass
-
-class T2(object):
- pass
-
-class FakeLock(object):
- def acquire(self):pass
- def release(self):pass
-
-# uncomment this to disable the mutex in mapper compilation;
-# should produce thread collisions
-#mapperlib._COMPILE_MUTEX = FakeLock()
-
-def run1():
- for i in range(50):
- print("T1", _thread.get_ident())
- class_mapper(T1)
- time.sleep(.05)
-
-def run2():
- for i in range(50):
- print("T2", _thread.get_ident())
- class_mapper(T2)
- time.sleep(.057)
-
-def run3():
- for i in range(50):
- def foo():
- print("FOO", _thread.get_ident())
- class Foo(object):pass
- mapper(Foo, t3)
- class_mapper(Foo).compile()
- foo()
- time.sleep(.05)
-
-mapper(T1, t1, properties={'t2':relationship(T2, backref="t1")})
-mapper(T2, t2)
-print("START")
-for j in range(0, 5):
- _thread.start_new_thread(run1, ())
- _thread.start_new_thread(run2, ())
- _thread.start_new_thread(run3, ())
- _thread.start_new_thread(run3, ())
- _thread.start_new_thread(run3, ())
-print("WAIT")
-time.sleep(5)