+++ /dev/null
-import testenv; testenv.simple_setup()
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from timeit import Timer
-import sys
-
-
-meta = MetaData()
-
-orders = Table('orders', meta,
- Column('id', Integer, Sequence('order_id_seq'), primary_key = True),
-)
-items = Table('items', meta,
- Column('id', Integer, Sequence('item_id_seq'), primary_key = True),
- Column('order_id', Integer, ForeignKey(orders.c.id), nullable=False),
-)
-attributes = Table('attributes', meta,
- Column('id', Integer, Sequence('attribute_id_seq'), primary_key = True),
- Column('item_id', Integer, ForeignKey(items.c.id), nullable=False),
-)
-values = Table('values', meta,
- Column('id', Integer, Sequence('value_id_seq'), primary_key = True),
- Column('attribute_id', Integer, ForeignKey(attributes.c.id), nullable=False),
-)
-
-class Order(object): pass
-class Item(object): pass
-class Attribute(object): pass
-class Value(object): pass
-
-valueMapper = mapper(Value, values)
-attrMapper = mapper(Attribute, attributes, properties=dict(
- values=relationship(valueMapper, cascade="save-update", backref="attribute")
-))
-itemMapper = mapper(Item, items, properties=dict(
- attributes=relationship(attrMapper, cascade="save-update", backref="item")
-))
-orderMapper = mapper(Order, orders, properties=dict(
- items=relationship(itemMapper, cascade="save-update", backref="order")
-))
-
-
-
-class TimeTrial(object):
-
- def create_fwd_assoc(self):
- item = Item()
- self.order.items.append(item)
- for attrid in range(10):
- attr = Attribute()
- item.attributes.append(attr)
- for valueid in range(5):
- val = Value()
- attr.values.append(val)
-
- def create_back_assoc(self):
- item = Item()
- item.order = self.order
- for attrid in range(10):
- attr = Attribute()
- attr.item = item
- for valueid in range(5):
- val = Value()
- val.attribute = attr
-
- def run(self, number):
- s = create_session()
- self.order = order = Order()
- s.save(order)
-
- ctime = 0.0
- timer = Timer("create_it()", "from __main__ import create_it")
- for n in xrange(number):
- t = timer.timeit(1)
- print "Time to create item %i: %.5f sec" % (n, t)
- ctime += t
-
- assert len(order.items) == 10
- assert sum(len(item.attributes) for item in order.items) == 100
- assert sum(len(attr.values) for item in order.items for attr in item.attributes) == 500
- assert len(s.new) == 611
- print "Created 610 objects in %.5f sec" % ctime
-
-if __name__ == "__main__":
- tt = TimeTrial()
-
- print "\nCreate forward associations"
- create_it = tt.create_fwd_assoc
- tt.run(10)
-
- print "\nCreate backward associations"
- create_it = tt.create_back_assoc
- tt.run(10)
+++ /dev/null
-# times how long it takes to create 26000 objects
-import testenv; testenv.simple_setup()
-
-from sqlalchemy.orm import attributes
-import time
-
-manage_attributes = True
-init_attributes = manage_attributes and True
-
-class User(object):
- pass
-class Address(object):
- pass
-
-if manage_attributes:
- attributes.register_attribute(User, 'id', False, False)
- attributes.register_attribute(User, 'name', False, False)
- attributes.register_attribute(User, 'addresses', True, False, trackparent=True)
- attributes.register_attribute(Address, 'email', False, False)
-
-now = time.time()
-for i in range(0,130):
- u = User()
- if init_attributes:
- attributes.manage(u)
- u.id = i
- u.name = "user " + str(i)
- if not manage_attributes:
- u.addresses = []
- for j in range(0,200):
- a = Address()
- if init_attributes:
- attributes.manage(a)
- a.email = 'foo@bar.com'
- u.addresses.append(a)
-# print len(managed_attributes)
-# managed_attributes.clear()
-total = time.time() - now
-print "Total time", total
+++ /dev/null
-import testenv; testenv.simple_setup()
-
-import random, string
-
-from sqlalchemy.orm import attributes
-from test.lib.util import gc_collect
-
-# with this test, run top. make sure the Python process doenst grow in size arbitrarily.
-
-class User(object):
- pass
-
-class Address(object):
- pass
-
-attributes.register_attribute(User, 'id', False, False)
-attributes.register_attribute(User, 'name', False, False)
-attributes.register_attribute(User, 'addresses', True, False)
-attributes.register_attribute(Address, 'email', False, False)
-attributes.register_attribute(Address, 'user', False, False)
-
-
-for i in xrange(1000):
- for j in xrange(1000):
- u = User()
- attributes.manage(u)
- u.name = str(random.randint(0, 100000000))
- for k in xrange(10):
- a = Address()
- a.email_address = str(random.randint(0, 100000000))
- attributes.manage(a)
- u.addresses.append(a)
- a.user = u
- print "clearing"
- #managed_attributes.clear()
- gc_collect()
+++ /dev/null
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from test.lib import profiling
-
-NUM = 500
-DIVISOR = 50
-
-engine = create_engine('sqlite://')
-meta = MetaData(engine)
-items = Table('items', meta,
- Column('item_id', Integer, primary_key=True),
- Column('value', String(100)))
-subitems = Table('subitems', meta,
- Column('sub_id', Integer, primary_key=True),
- Column('parent_id', Integer, ForeignKey('items.item_id')),
- Column('value', String(100)))
-
-class Item(object):pass
-class SubItem(object):pass
-mapper(Item, items, properties={'subs':relationship(SubItem, lazy='joined')})
-mapper(SubItem, subitems)
-
-def load():
- global l
- l = []
- for x in range(1,NUM/DIVISOR + 1):
- l.append({'item_id':x, 'value':'this is item #%d' % x})
- #print l
- items.insert().execute(*l)
- for x in range(1, NUM/DIVISOR + 1):
- l = []
- for y in range(1, DIVISOR + 1):
- z = ((x-1) * DIVISOR) + y
- l.append({'sub_id':z,'value':'this is item #%d' % z, 'parent_id':x})
- #print l
- subitems.insert().execute(*l)
-
-@profiling.profiled('massjoinedload', always=True, sort=['cumulative'])
-def massjoinedload(session):
- session.begin()
- query = session.query(Item)
- l = query.all()
- print "loaded ", len(l), " items each with ", len(l[0].subs), "subitems"
-
-def all():
- meta.create_all()
- try:
- load()
- massjoinedload(create_session())
- finally:
- meta.drop_all()
-
-if __name__ == '__main__':
- all()
+++ /dev/null
-import time
-#import sqlalchemy.orm.attributes as attributes
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from test.lib import *
-
-"""
-
-we are testing session.expunge() here, also that the attributes and unitofwork
-packages dont keep dereferenced stuff hanging around.
-
-for best results, dont run with sqlite :memory: database, and keep an eye on
-top while it runs
-"""
-
-NUM = 2500
-
-class LoadTest(TestBase, AssertsExecutionResults):
- @classmethod
- def setup_class(cls):
- global items, meta
- meta = MetaData(testing.db)
- items = Table('items', meta,
- Column('item_id', Integer, primary_key=True),
- Column('value', String(100)))
- items.create()
- @classmethod
- def teardown_class(cls):
- items.drop()
- def setup(self):
- for x in range(1,NUM/500+1):
- l = []
- for y in range(x*500-500 + 1, x*500 + 1):
- l.append({'item_id':y, 'value':'this is item #%d' % y})
- items.insert().execute(*l)
-
- def testload(self):
- class Item(object):pass
-
- m = mapper(Item, items)
- sess = create_session()
- now = time.time()
- query = sess.query(Item)
- for x in range (1,NUM/100):
- # this is not needed with cpython which clears non-circular refs immediately
- #gc_collect()
- l = query.filter(items.c.item_id.between(x*100 - 100 + 1, x*100)).all()
- assert len(l) == 100
- print "loaded ", len(l), " items "
- # modifying each object will ensure that the objects get placed in the "dirty" list
- # and will hang around until expunged
- #for a in l:
- # a.value = 'changed...'
- #assert len(objectstore.get_session().dirty) == len(l)
- #assert len(objectstore.get_session().identity_map) == len(l)
- #assert len(attributes.managed_attributes) == len(l)
- #print len(objectstore.get_session().dirty)
- #print len(objectstore.get_session().identity_map)
- #objectstore.expunge(*l)
- total = time.time() -now
- print "total time ", total
-
-
+++ /dev/null
-import testenv; testenv.simple_setup()
-import time
-from sqlalchemy import *
-from sqlalchemy.orm import *
-
-metadata = MetaData(create_engine('sqlite://', echo=True))
-
-t1s = Table( 't1s', metadata,
- Column( 'id', Integer, primary_key=True),
- Column('data', String(100))
- )
-
-t2s = Table( 't2s', metadata,
- Column( 'id', Integer, primary_key=True),
- Column( 't1id', Integer, ForeignKey("t1s.id"), nullable=True ))
-
-t3s = Table( 't3s', metadata,
- Column( 'id', Integer, primary_key=True),
- Column( 't2id', Integer, ForeignKey("t2s.id"), nullable=True ))
-
-t4s = Table( 't4s', metadata,
- Column( 'id', Integer, primary_key=True),
- Column( 't3id', Integer, ForeignKey("t3s.id"), nullable=True ))
-
-[t.create() for t in [t1s,t2s,t3s,t4s]]
-
-class T1( object ): pass
-class T2( object ): pass
-class T3( object ): pass
-class T4( object ): pass
-
-mapper( T1, t1s )
-mapper( T2, t2s )
-mapper( T3, t3s )
-mapper( T4, t4s )
-
-cascade = "all, delete-orphan"
-use_backref = True
-
-if use_backref:
- class_mapper(T1).add_property( 't2s', relationship(T2, backref=backref("t1", cascade=cascade), cascade=cascade))
- class_mapper(T2).add_property ( 't3s', relationship(T3, backref=backref("t2",cascade=cascade), cascade=cascade) )
- class_mapper(T3).add_property( 't4s', relationship(T4, backref=backref("t3", cascade=cascade), cascade=cascade) )
-else:
- T1.mapper.add_property( 't2s', relationship(T2, cascade=cascade))
- T2.mapper.add_property ( 't3s', relationship(T3, cascade=cascade) )
- T3.mapper.add_property( 't4s', relationship(T4, cascade=cascade) )
-
-now = time.time()
-print "start"
-sess = create_session()
-o1 = T1()
-sess.save(o1)
-for i2 in range(10):
- o2 = T2()
- o1.t2s.append( o2 )
-
- for i3 in range( 10 ):
- o3 = T3()
- o2.t3s.append( o3 )
-
- for i4 in range( 10 ):
- o3.t4s.append ( T4() )
- print i2, i3, i4
-
-print len([s for s in sess])
-print "flushing"
-sess.flush()
-total = time.time() - now
-print "done,total time", total
+++ /dev/null
-import gc
-import types
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from test.lib import *
-
-
-NUM = 2500
-
-class SaveTest(TestBase, AssertsExecutionResults):
- @classmethod
- def setup_class(cls):
- global items, metadata
- metadata = MetaData(testing.db)
- items = Table('items', metadata,
- Column('item_id', Integer, primary_key=True),
- Column('value', String(100)))
- items.create()
- @classmethod
- def teardown_class(cls):
- clear_mappers()
- metadata.drop_all()
-
- def testsave(self):
- class Item(object):pass
-
- m = mapper(Item, items)
-
- for x in range(0,NUM/50):
- sess = create_session()
- query = sess.query(Item)
- for y in range (0,50):
- # print "x,y", (x,y)
- sess.save(Item())
- sess.flush()
- #self._profile()
- print "ROWS:", x * 50
- def _profile(self):
- print "------------------------"
- d = {}
- for o in gc.get_objects():
- t = type(o)
- if t is types.InstanceType:
- t = o.__class__
- d.setdefault(t, 0)
- d[t] += 1
- rep = [(key, value) for key, value in d.iteritems()]
- def sorter(a, b):
- return cmp(b[1], a[1])
- rep.sort(sorter)
- for x in rep[0:30]:
- print x
-
-
+++ /dev/null
-# load test of connection pool
-import thread, time
-from sqlalchemy import *
-import sqlalchemy.pool as pool
-from test.lib import testing
-
-db = create_engine(testing.db.url, pool_timeout=30, echo_pool=True)
-metadata = MetaData(db)
-
-users_table = Table('users', metadata,
- Column('user_id', Integer, primary_key=True),
- Column('user_name', String(40)),
- Column('password', String(10)))
-metadata.drop_all()
-metadata.create_all()
-
-users_table.insert().execute([{'user_name':'user#%d' % i, 'password':'pw#%d' % i} for i in range(1000)])
-
-def runfast():
- while True:
- c = db.pool.connect()
- time.sleep(.5)
- c.close()
-# result = users_table.select(limit=100).execute()
-# d = {}
-# for row in result:
-# for col in row.keys():
-# d[col] = row[col]
-# time.sleep(.005)
-# result.close()
- print "runfast cycle complete"
-
-#thread.start_new_thread(runslow, ())
-for x in xrange(0,50):
- thread.start_new_thread(runfast, ())
-
-time.sleep(100)
+++ /dev/null
-#!/usr/bin/python
-"""Uses ``wsgiref``, standard in Python 2.5 and also in the cheeseshop."""
-
-from sqlalchemy import *
-from sqlalchemy.orm import *
-import thread
-from test.lib import *
-
-port = 8000
-
-import logging
-logging.basicConfig()
-logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
-
-threadids = set()
-meta = MetaData(testing.db)
-foo = Table('foo', meta,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
-class Foo(object):
- pass
-mapper(Foo, foo)
-
-def prep():
- meta.drop_all()
- meta.create_all()
-
- data = []
- for x in range(1,500):
- data.append({'id':x,'data':"this is x value %d" % x})
- foo.insert().execute(data)
-
-def serve(environ, start_response):
- start_response("200 OK", [('Content-type', 'text/plain')])
- sess = create_session()
- l = sess.query(Foo).select()
- threadids.add(thread.get_ident())
-
- print ("sending response on thread", thread.get_ident(),
- " total threads ", len(threadids))
- return [str("\n".join([x.data for x in l]))]
-
-
-if __name__ == '__main__':
- from wsgiref import simple_server
- try:
- prep()
- server = simple_server.make_server('localhost', port, serve)
- print "Server listening on port %d" % port
- server.serve_forever()
- finally:
- meta.drop_all()