from sqlalchemy.test.testing import eq_
-from sqlalchemy.orm import mapper, relationship, create_session, clear_mappers, \
- sessionmaker, class_mapper
+from sqlalchemy.orm import mapper, relationship, create_session,\
+ clear_mappers, sessionmaker, class_mapper
from sqlalchemy.orm.mapper import _mapper_registry
from sqlalchemy.orm.session import _sessions
from sqlalchemy.util import jython
import operator
from sqlalchemy.test import testing, engines
-from sqlalchemy import MetaData, Integer, String, ForeignKey, PickleType, create_engine
+from sqlalchemy import MetaData, Integer, String, ForeignKey, \
+ PickleType, create_engine
from sqlalchemy.test.schema import Table, Column
import sqlalchemy as sa
from sqlalchemy.sql import column
else:
flatline = True
- if not flatline and samples[-1] > samples[0]: # object count is bigger than when it started
+ # object count is bigger than when it started
+ if not flatline and samples[-1] > samples[0]:
for x in samples[1:-2]:
- if x > samples[-1]: # see if a spike bigger than the endpoint exists
+ # see if a spike bigger than the endpoint exists
+ if x > samples[-1]:
break
else:
assert False, repr(samples) + " " + repr(flatline)
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
metadata.create_all()
m1 = mapper(A, table1, properties={
- "bs":relationship(B, cascade="all, delete", order_by=table2.c.col1)},
+ "bs":relationship(B, cascade="all, delete",
+ order_by=table2.c.col1)},
order_by=table1.c.col1)
m2 = mapper(B, table2)
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
metadata.create_all()
m1 = mapper(A, table1, properties={
- "bs":relationship(B, cascade="all, delete", order_by=table2.c.col1)},
- order_by=table1.c.col1)
- m2 = mapper(B, table2)
+ "bs":relationship(B, cascade="all, delete",
+ order_by=table2.c.col1)},
+ order_by=table1.c.col1,
+ _compiled_cache_size=10
+ )
+ m2 = mapper(B, table2,
+ _compiled_cache_size=10
+ )
m3 = mapper(A, table1, non_primary=True)
@profile_memory
def go():
- engine = engines.testing_engine(options={'logging_name':'FOO', 'pool_logging_name':'BAR'})
+ engine = engines.testing_engine(
+ options={'logging_name':'FOO',
+ 'pool_logging_name':'BAR'}
+ )
sess = create_session(bind=engine)
a1 = A(col2="a1")
del m1, m2, m3
assert_no_mappers()
+ def test_many_updates(self):
+ metadata = MetaData(testing.db)
+
+ wide_table = Table('t', metadata,
+ Column('id', Integer, primary_key=True),
+ *[Column('col%d' % i, Integer) for i in range(10)]
+ )
+
+ class Wide(object):
+ pass
+
+ mapper(Wide, wide_table, _compiled_cache_size=10)
+
+ metadata.create_all()
+ session = create_session()
+ w1 = Wide()
+ session.add(w1)
+ session.flush()
+ session.close()
+ del session
+ counter = [1]
+
+ @profile_memory
+ def go():
+ session = create_session()
+ w1 = session.query(Wide).first()
+ x = counter[0]
+ dec = 10
+ while dec > 0:
+ # trying to count in binary here,
+ # works enough to trip the test case
+ if pow(2, dec) < x:
+ setattr(w1, 'col%d' % dec, counter[0])
+ x -= pow(2, dec)
+ dec -= 1
+ session.flush()
+ session.close()
+ counter[0] += 1
+
+ try:
+ go()
+ finally:
+ metadata.drop_all()
+
def test_mapper_reset(self):
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)))
table2 = Table("mytable2", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)),
Column('col3', Integer, ForeignKey("mytable.col1")))
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30))
)
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30))
)
table2 = Table("mytable2", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', String(30)),
)
pass
mapper(A, table1, properties={
- 'bs':relationship(B, secondary=table3, backref='as', order_by=table3.c.t1)
+ 'bs':relationship(B, secondary=table3,
+ backref='as', order_by=table3.c.t1)
})
mapper(B, table2)
metadata = MetaData(testing.db)
table1 = Table("table1", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30))
)
table2 = Table("table2", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30)),
Column('t1id', Integer, ForeignKey('table1.id'))
)
metadata = MetaData(testing.db)
table1 = Table("mytable", metadata,
- Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('col1', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('col2', PickleType(comparator=operator.eq))
)