from sqlalchemy import util, exceptions
import types
from sqlalchemy.orm import mapper, Query
-
+
def _monkeypatch_query_method(name, ctx, class_):
def do(self, *args, **kwargs):
query = Query(class_, session=ctx.current)
do.__name__ = name
except:
pass
- if not hasattr(class_, name):
+ if not hasattr(class_, name):
setattr(class_, name, classmethod(do))
def _monkeypatch_session_method(name, ctx, class_):
do.__name__ = name
except:
pass
- if not hasattr(class_, name):
+ if not hasattr(class_, name):
setattr(class_, name, do)
-
+
def assign_mapper(ctx, class_, *args, **kwargs):
- util.warn_deprecated("assign_mapper is deprecated. Use scoped_session() instead.")
extension = kwargs.pop('extension', None)
if extension is not None:
extension = util.to_list(extension)
extension = ctx.mapper_extension
validate = kwargs.pop('validate', False)
-
+
if not isinstance(getattr(class_, '__init__'), types.MethodType):
def __init__(self, **kwargs):
for key, value in kwargs.items():
if validate:
- if not self.mapper.get_property(key, resolve_synonyms=False, raiseerr=False):
- raise exceptions.ArgumentError("Invalid __init__ argument: '%s'" % key)
+ if not self.mapper.get_property(key,
+ resolve_synonyms=False,
+ raiseerr=False):
+ raise exceptions.ArgumentError(
+ "Invalid __init__ argument: '%s'" % key)
setattr(self, key, value)
class_.__init__ = __init__
-
+
class query(object):
def __getattr__(self, key):
return getattr(ctx.current.query(class_), key)
def __call__(self):
return ctx.current.query(class_)
- if not hasattr(class_, 'query'):
+ if not hasattr(class_, 'query'):
class_.query = query()
-
- for name in ('get', 'filter', 'filter_by', 'select', 'select_by', 'selectfirst', 'selectfirst_by', 'selectone', 'selectone_by', 'get_by', 'join_to', 'join_via', 'count', 'count_by', 'options', 'instances'):
+
+ for name in ('get', 'filter', 'filter_by', 'select', 'select_by',
+ 'selectfirst', 'selectfirst_by', 'selectone', 'selectone_by',
+ 'get_by', 'join_to', 'join_via', 'count', 'count_by',
+ 'options', 'instances'):
_monkeypatch_query_method(name, ctx, class_)
for name in ('refresh', 'expire', 'delete', 'expunge', 'update'):
_monkeypatch_session_method(name, ctx, class_)
class_.mapper = m
return m
+assign_mapper = util.deprecated(
+ assign_mapper, "assign_mapper is deprecated. Use scoped_session() instead.")
import sqlalchemy.orm.query
return sqlalchemy.orm.Query(self, session).instances(cursor, *mappers, **kwargs)
- instances = util.deprecated(instances, False)
+ instances = util.deprecated(instances, add_deprecation_to_docstring=False)
def identity_key_from_row(self, row):
"""Return an identity-map key for use in storing/retrieving an
return self._legacy_filter_by(*args, **params).one()
-for deprecated_method in ['list', 'scalar', 'count_by',
- 'select_whereclause', 'get_by', 'select_by', 'join_by', 'selectfirst',
- 'selectone', 'select', 'execute', 'select_statement', 'select_text',
- 'join_to', 'join_via', 'selectfirst_by', 'selectone_by']:
- setattr(Query, deprecated_method, util.deprecated(getattr(Query, deprecated_method), False))
+
+for deprecated_method in ('list', 'scalar', 'count_by',
+ 'select_whereclause', 'get_by', 'select_by',
+ 'join_by', 'selectfirst', 'selectone', 'select',
+ 'execute', 'select_statement', 'select_text',
+ 'join_to', 'join_via', 'selectfirst_by',
+ 'selectone_by'):
+ setattr(Query, deprecated_method,
+ util.deprecated(getattr(Query, deprecated_method),
+ add_deprecation_to_docstring=False))
Query.logger = logging.class_logger(Query)
def warn_deprecated(msg):
warnings.warn(logging.SADeprecationWarning(msg), stacklevel=3)
-def deprecated(func, add_deprecation_to_docstring=True):
+def deprecated(func, message=None, add_deprecation_to_docstring=True):
+ if message is not None:
+ warning = message % dict(func=func.__name__)
+ else:
+ warning = "Call to deprecated function %s" % func.__name__
+
def func_with_warning(*args, **kwargs):
- warnings.warn(logging.SADeprecationWarning("Call to deprecated function %s" % func.__name__),
- stacklevel=2)
+ if self.warn:
+ warnings.warn(logging.SADeprecationWarning(warning),
+ stacklevel=2)
return func(*args, **kwargs)
- func_with_warning.__doc__ = (add_deprecation_to_docstring and 'Deprecated.\n' or '') + str(func.__doc__)
+ func_with_warning.warn = True
+ self = func_with_warning
+
+ doc = func.__doc__ is not None and func.__doc__ or ''
+
+ if add_deprecation_to_docstring:
+ header = message is not None and warning or 'Deprecated.'
+ doc = '\n'.join((header.rstrip(), doc))
+
+ func_with_warning.__doc__ = doc
func_with_warning.__dict__.update(func.__dict__)
try:
func_with_warning.__name__ = func.__name__
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
- column('name', String),
- column('description', String),
+ column('name', String(128)),
+ column('description', String(128)),
)
u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
- column('name', String),
- column('description', String),
+ column('name', String(128)),
+ column('description', String(128)),
)
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
including the deprecated versions of these arguments"""
import testbase
+import warnings
from sqlalchemy import *
from sqlalchemy import engine, exceptions
from testlib import *
class BindTest(PersistTest):
def test_create_drop_explicit(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
for bind in (
testbase.db,
table.create(*args[0], **args[1])
table.drop(*args[0], **args[1])
assert not table.exists(*args[0], **args[1])
-
+
def test_create_drop_err(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
for meth in [
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "This SchemaItem is not connected to any Engine or Connection."
-
+
def test_create_drop_bound(self):
-
+
for meta in (MetaData,ThreadLocalMetaData):
for bind in (
testbase.db,
testbase.db.connect()
):
metadata = meta()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
metadata.bind = bind
assert metadata.bind is table.bind is bind
assert not table.exists()
metadata = meta()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
+ testing.squelch_deprecation(metadata.connect)
metadata.connect(bind)
+
assert metadata.bind is table.bind is bind
metadata.create_all()
assert table.exists()
([], {'bind':bind}),
):
metadata = MetaData(*args[0], **args[1])
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
assert metadata.bind is table.bind is bind
metadata.create_all()
def test_implicit_execution(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer),
test_needs_acid=True,
)
assert conn.execute("select count(1) from test_table").scalar() == 0
finally:
metadata.drop_all(bind=conn)
-
+
def test_clauseelement(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
metadata.create_all(bind=testbase.db)
try:
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testbase.db)
-
+
def test_session(self):
from sqlalchemy.orm import create_session, mapper
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer, Sequence('foo_seq', optional=True), primary_key=True),
Column('data', String(30)))
class Foo(object):
mapper(Foo, table)
metadata.create_all(bind=testbase.db)
try:
- for bind in (testbase.db,
+ for bind in (testbase.db,
testbase.db.connect()
):
try:
if isinstance(bind, engine.Connection):
bind.close()
-
+
sess = create_session()
f = Foo()
sess.save(f)
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testbase.db)
-
-
+
+
if __name__ == '__main__':
testbase.main()
import testbase
-import pickle, StringIO, unicodedata
+import pickle, StringIO, unicodedata, warnings
from sqlalchemy import *
from sqlalchemy import exceptions
assert len(a4.constraints) == 2
finally:
meta.drop_all()
-
+
def test_unknown_types(self):
meta = MetaData(testbase.db)
- t = Table("test", meta,
+ t = Table("test", meta,
Column('foo', DateTime))
-
+
import sys
dialect_module = sys.modules[testbase.db.dialect.__module__]
-
- # we're relying on the presence of "ischema_names" in the
+
+ # we're relying on the presence of "ischema_names" in the
# dialect module, else we can't test this. we need to be able
# to get the dialect to not be aware of some type so we temporarily
# monkeypatch. not sure what a better way for this could be,
# except for an established dialect hook or dialect-specific tests
if not hasattr(dialect_module, 'ischema_names'):
return
-
+
ischema_names = dialect_module.ischema_names
t.create()
dialect_module.ischema_names = {}
try:
- m2 = MetaData(testbase.db)
- t2 = Table("test", m2, autoload=True)
- assert t2.c.foo.type.__class__ == sqltypes.NullType
+ try:
+ warnings.filterwarnings('error', 'Did not recognize type')
+ m2 = MetaData(testbase.db)
+ t2 = Table("test", m2, autoload=True)
+ assert False
+ except RuntimeWarning:
+ assert True
+
+ warnings.filterwarnings('ignore', 'Did not recognize type')
+ m3 = MetaData(testbase.db)
+ t3 = Table("test", m3, autoload=True)
+ assert t3.c.foo.type.__class__ == sqltypes.NullType
+
finally:
dialect_module.ischema_names = ischema_names
+ warnings.filterwarnings('always', 'Did not recognize type')
t.drop()
-
+
def test_override_fkandpkcol(self):
"""test that you can override columns which contain foreign keys to other reflected tables,
where the foreign key column is also a primary key column"""
slots_table = Table('slots', metadata,
Column('slot_id', Integer, primary_key=True),
Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')),
- Column('slot', String),
+ Column('slot', String(128)),
)
try:
metadata.create_all()
def test_basic(self):
try:
- # the 'convert_unicode' should not get in the way of the reflection
+ # the 'convert_unicode' should not get in the way of the reflection
# process. reflecttable for oracle, postgres (others?) expect non-unicode
# strings in result sets/bind params
bind = engines.utf8_engine(options={'convert_unicode':True})
class SchemaTest(PersistTest):
-
+
# this test should really be in the sql tests somewhere, not engine
def test_iteration(self):
metadata = MetaData()
from sqlalchemy import *
from sqlalchemy import exceptions
from sqlalchemy.orm import create_session, clear_mappers, relation, class_mapper
-import sqlalchemy.ext.assignmapper
+from sqlalchemy.ext.assignmapper import assign_mapper
from sqlalchemy.ext.sessioncontext import SessionContext
from testlib import *
-def assign_mapper(*args, **kw):
- try:
- warnings.filterwarnings('ignore', 'assign_mapper is deprecated')
- sqlalchemy.ext.assignmapper.assign_mapper(*args, **kw)
- finally:
- warnings.filterwarnings('always', 'assign_mapper is deprecated')
+
+testing.squelch_deprecation(assign_mapper)
class AssignMapperTest(PersistTest):
def setUpAll(self):
def setUpAll(self):
global items, item_keywords, keywords, metadata, Item, Keyword, KeywordAssociation
metadata = MetaData(testbase.db)
- items = Table('items', metadata,
+ items = Table('items', metadata,
Column('item_id', Integer, primary_key=True),
Column('name', String(40)),
)
Column('name', String(40))
)
metadata.create_all()
-
+
class Item(object):
def __init__(self, name):
self.name = name
self.data = data
def __repr__(self):
return "KeywordAssociation itemid=%d keyword=%s data=%s" % (self.item_id, repr(self.keyword), self.data)
-
+
mapper(Keyword, keywords)
mapper(KeywordAssociation, item_keywords, properties={
'keyword':relation(Keyword, lazy=False)
mapper(Item, items, properties={
'keywords' : relation(KeywordAssociation, association=Keyword)
})
-
+
def tearDown(self):
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
def tearDownAll(self):
clear_mappers()
metadata.drop_all()
-
+
def testinsert(self):
sess = create_session()
item1 = Item('item1')
sess.flush()
saved = repr([item1, item2])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
item1.keywords.append(KeywordAssociation(Keyword('red'), 'red_assoc'))
sess.save(item1)
sess.flush()
-
+
red_keyword = item1.keywords[1].keyword
del item1.keywords[1]
item1.keywords.append(KeywordAssociation(red_keyword, 'new_red_assoc'))
sess.flush()
saved = repr([item1])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
sess.save(item1)
sess.save(item2)
sess.flush()
-
+
red_keyword = item1.keywords[1].keyword
del item1.keywords[0]
del item1.keywords[0]
item2.keywords.append(KeywordAssociation(purple_keyword, 'purple_item2_assoc'))
item1.keywords.append(KeywordAssociation(purple_keyword, 'purple_item1_assoc'))
item1.keywords.append(KeywordAssociation(Keyword('yellow'), 'yellow_assoc'))
-
+
sess.flush()
saved = repr([item1, item2])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
Column('Country', CHAR(2), default='es'),
)
table_isauthor = Table('IsAuthor', metadata,
- Column('OriginalsID', Integer, ForeignKey('Originals.ID'),
+ Column('OriginalsID', Integer, ForeignKey('Originals.ID'),
default=None),
- Column('PeopleID', Integer, ForeignKey('People.ID'),
+ Column('PeopleID', Integer, ForeignKey('People.ID'),
default=None),
Column('Kind', CHAR(1), default='A'),
)
for k,v in kw.iteritems():
setattr(self, k, v)
def display(self):
- c = [ "%s=%s" % (col.key, repr(getattr(self, col.key))) for col
+ c = [ "%s=%s" % (col.key, repr(getattr(self, col.key))) for col
in self.c ]
return "%s(%s)" % (self.__class__.__name__, ', '.join(c))
def __repr__(self):
properties={
'people': relation(IsAuthor, association=People),
'authors': relation(People, secondary=table_isauthor, backref='written',
- primaryjoin=and_(table_originals.c.ID==table_isauthor.c.OriginalsID,
+ primaryjoin=and_(table_originals.c.ID==table_isauthor.c.OriginalsID,
table_isauthor.c.Kind=='A')),
'title': table_originals.c.Title,
'date': table_originals.c.Date,
'name': table_people.c.Name,
'country': table_people.c.Country,
})
- mapper(IsAuthor, table_isauthor,
- primary_key=[table_isauthor.c.OriginalsID, table_isauthor.c.PeopleID,
-table_isauthor.c.Kind],
+ mapper(IsAuthor, table_isauthor,
+ primary_key=[table_isauthor.c.OriginalsID, table_isauthor.c.PeopleID,
+table_isauthor.c.Kind],
properties={
'original': relation(Originals, lazy=False),
'person': relation(People, lazy=False),
sess.flush()
-
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
parents = Table('parents', metadata,
Column('id', Integer, primary_key=True),
- Column('label', String))
+ Column('label', String(128)))
children = Table('children', metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('parents.id'),
nullable=False),
- Column('a', String),
- Column('b', String),
- Column('c', String))
+ Column('a', String(128)),
+ Column('b', String(128)),
+ Column('c', String(128)))
class Parent(object):
def __init__(self, label=None):
metadata = MetaData(testbase.db)
a_table = Table("a", metadata,
Column("id", Integer(), primary_key=True),
- Column("fui", String()),
+ Column("fui", String(128)),
Column("b", Integer(), ForeignKey("a.id")),
)
a_table.create()
def define_tables(self, meta):
global ta, tb, tc
ta = ["a", meta]
- ta.append(Column('id', Integer, primary_key=True)),
+ ta.append(Column('id', Integer, primary_key=True)),
ta.append(Column('a_data', String(30)))
if "a"== parent and direction == MANYTOONE:
ta.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "a" == child and direction == ONETOMANY:
ta.append(Column('parent_id', Integer, ForeignKey("%s.id" % parent, use_alter=True, name="foo")))
ta = Table(*ta)
-
+
tb = ["b", meta]
tb.append(Column('id', Integer, ForeignKey("a.id"), primary_key=True, ))
-
+
tb.append(Column('b_data', String(30)))
-
+
if "b"== parent and direction == MANYTOONE:
tb.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "b" == child and direction == ONETOMANY:
tb.append(Column('parent_id', Integer, ForeignKey("%s.id" % parent, use_alter=True, name="foo")))
tb = Table(*tb)
-
+
tc = ["c", meta]
tc.append(Column('id', Integer, ForeignKey("b.id"), primary_key=True, ))
-
+
tc.append(Column('c_data', String(30)))
-
+
if "c"== parent and direction == MANYTOONE:
tc.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "c" == child and direction == ONETOMANY:
child_table = {"a":ta, "b":tb, "c": tc}[child]
child_table.update(values={child_table.c.parent_id:None}).execute()
super(ABCTest, self).tearDown()
-
+
def test_roundtrip(self):
parent_table = {"a":ta, "b":tb, "c": tc}[parent]
child_table = {"a":ta, "b":tb, "c": tc}[child]
remote_side = None
-
+
if direction == MANYTOONE:
foreign_keys = [parent_table.c.child_id]
elif direction == ONETOMANY:
somea = A('somea')
someb = B('someb')
somec = C('somec')
-
+
#print "APPENDING", parent.__class__.__name__ , "TO", child.__class__.__name__
-
+
sess.save(parent_obj)
parent_obj.collection.append(child_obj)
if direction == ONETOMANY:
result2 = sess.query(parent_class).get(parent2.id)
assert result2.id == parent2.id
assert result2.collection[0].id == child_obj.id
-
+
sess.clear()
# assert result via polymorphic load of parent object
- result = sess.query(A).get_by(id=parent_obj.id)
+ result = sess.query(A).filter_by(id=parent_obj.id).one()
assert result.id == parent_obj.id
assert result.collection[0].id == child_obj.id
if direction == ONETOMANY:
assert result.collection[1].id == child2.id
elif direction == MANYTOONE:
- result2 = sess.query(A).get_by(id=parent2.id)
+ result2 = sess.query(A).filter_by(id=parent2.id).one()
assert result2.id == parent2.id
assert result2.collection[0].id == child_obj.id
-
+
ABCTest.__name__ = "Test%sTo%s%s" % (parent, child, (direction is ONETOMANY and "O2M" or "M2O"))
return ABCTest
sess.flush()
compare = ','.join([repr(b1), repr(b2), repr(b1.parent_foo), repr(b2.parent_foo)])
sess.clear()
- l = sess.query(Blub).select()
+ l = sess.query(Blub).all()
result = ','.join([repr(l[0]), repr(l[1]), repr(l[0].parent_foo), repr(l[1].parent_foo)])
print compare
print result
sess = create_session()
q = sess.query(Bar)
- self.assert_(len(q.selectfirst().lazy) == 1)
- self.assert_(len(q.selectfirst().eager) == 1)
+ self.assert_(len(q.first().lazy) == 1)
+ self.assert_(len(q.first().eager) == 1)
class FlushTest(ORMTest):
_a_table = Table('a', metadata,
Column('id', Integer, primary_key=True),
- Column('data1', String)
+ Column('data1', String(128))
)
_b_table = Table('b', metadata,
Column('a_id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('data2', String)
+ Column('data2', String(128))
)
_c_table = Table('c', metadata,
# Column('a_id', Integer, ForeignKey('b.a_id'), primary_key=True), #works
Column('b_a_id', Integer, ForeignKey('b.a_id'), primary_key=True),
- Column('data3', String)
+ Column('data3', String(128))
)
def test_joins(self):
def define_tables(self, metadata):
global managers_table, engineers_table, companies
- companies = Table('companies', metadata,
+ companies = Table('companies', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)))
-
- managers_table = Table('managers', metadata,
+
+ managers_table = Table('managers', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('manager_data', String(50)),
Column('company_id', Integer, ForeignKey('companies.id'))
)
- engineers_table = Table('engineers', metadata,
+ engineers_table = Table('engineers', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('engineer_info', String(50)),
session.flush()
session.clear()
- print set([repr(x) for x in session.query(Employee).select()])
- assert set([repr(x) for x in session.query(Employee).select()]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
- assert set([repr(x) for x in session.query(Manager).select()]) == set(["Manager Tom knows how to manage things"])
- assert set([repr(x) for x in session.query(Engineer).select()]) == set(["Engineer Kurt knows how to hack"])
+ print set([repr(x) for x in session.query(Employee).all()])
+ assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
+ assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"])
+ assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Kurt knows how to hack"])
def test_relation(self):
class Employee(object):
c2 = session.query(Company).get(c.id)
assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
self.assert_sql_count(testbase.db, go, 1)
-
+
if __name__ == '__main__':
# clear and query forwards
sess.clear()
- node = sess.query(Table1).selectfirst(Table1.c.id==t.id)
+ node = sess.query(Table1).filter(Table1.c.id==t.id).first()
assertlist = []
while (node):
assertlist.append(node)
# clear and query backwards
sess.clear()
- node = sess.query(Table1).selectfirst(Table1.c.id==obj.id)
+ node = sess.query(Table1).filter(Table1.c.id==obj.id).first()
assertlist = []
while (node):
assertlist.insert(0, node)
class Boss(Manager):
def __repr__(self):
return "Boss %s, status %s, manager_name %s golf swing %s" % (self.get_name(), self.status, self.manager_name, self.golf_swing)
-
+
class Company(object):
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
class PolymorphTest(ORMTest):
def define_tables(self, metadata):
global companies, people, engineers, managers, boss
-
+
# a table to store companies
- companies = Table('companies', metadata,
+ companies = Table('companies', metadata,
Column('company_id', Integer, Sequence('company_id_seq', optional=True), primary_key=True),
Column('name', String(50)))
# we will define an inheritance relationship between the table "people" and "engineers",
# and a second inheritance relationship between the table "people" and "managers"
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('company_id', Integer, ForeignKey('companies.company_id')),
Column('name', String(50)),
Column('type', String(30)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('engineer_name', String(50)),
Column('primary_language', String(50)),
)
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
)
- boss = Table('boss', metadata,
+ boss = Table('boss', metadata,
Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
Column('golf_swing', String(30)),
)
-
+
metadata.create_all()
class CompileTest(PolymorphTest):
session.save(Manager(name='Tom', status='knows how to manage things'))
session.save(Engineer(name='Kurt', status='knows how to hack'))
session.flush()
- print session.query(Engineer).select()
+ print session.query(Engineer).all()
+
+ print session.query(Person).all()
- print session.query(Person).select()
-
def testcompile2(self):
"""test that a mapper can reference a property whose mapper inherits from this one."""
person_join = polymorphic_union( {
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type,
- polymorphic_identity='person',
+ polymorphic_identity='person',
properties = dict(managers = relation(Manager, lazy=True))
)
class InsertOrderTest(PolymorphTest):
def test_insert_order(self):
- """test that classes of multiple types mix up mapper inserts
+ """test that classes of multiple types mix up mapper inserts
so that insert order of individual tables is maintained"""
person_join = polymorphic_union(
{
mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
mapper(Company, companies, properties={
- 'employees': relation(Person, private=True, backref='company', order_by=person_join.c.person_id)
+ 'employees': relation(Person,
+ cascade="all, delete-orphan",
+ backref='company',
+ order_by=person_join.c.person_id)
})
session = create_session()
class RelationToSubclassTest(PolymorphTest):
def testrelationtosubclass(self):
"""test a relation to an inheriting mapper where the relation is to a subclass
- but the join condition is expressed by the parent table.
-
+ but the join condition is expressed by the parent table.
+
also test that backrefs work in this case.
-
+
this test touches upon a lot of the join/foreign key determination code in properties.py
- and creates the need for properties.py to search for conditions individually within
+ and creates the need for properties.py to search for conditions individually within
the mapper's local table as well as the mapper's 'mapped' table, so that relations
requiring lots of specificity (like self-referential joins) as well as relations requiring
more generalization (like the example here) both come up with proper results."""
-
+
mapper(Person, people)
-
+
mapper(Engineer, engineers, inherits=Person)
mapper(Manager, managers, inherits=Person)
mapper(Company, companies, properties={
'managers': relation(Manager, lazy=True,backref="company")
})
-
+
sess = create_session()
c = Company(name='company1')
sess.flush()
sess.clear()
- sess.query(Company).get_by(company_id=c.company_id)
+ sess.query(Company).filter_by(company_id=c.company_id).one()
assert sets.Set([e.get_name() for e in c.managers]) == sets.Set(['pointy haired boss'])
assert c.managers[0].company is c
class RoundTripTest(PolymorphTest):
pass
-
+
def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_colprop=False, use_literal_join=False, polymorphic_fetch=None, use_outer_joins=False):
"""generates a round trip test.
-
+
include_base - whether or not to include the base 'person' type in the union.
lazy_relation - whether or not the Company relation to People is lazy or eager.
redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class
use_literal_join - primary join condition is explicitly specified
"""
def test_roundtrip(self):
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
if not polymorphic_fetch == 'union':
person_join = None
manager_join = None
'manager':people.join(managers),
'person':people.select(people.c.type=='person'),
}, None, 'pjoin')
-
+
manager_join = people.join(managers).outerjoin(boss)
else:
if use_outer_joins:
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_fetch=polymorphic_fetch, polymorphic_on=people.c.type, polymorphic_identity='person', properties= {'person_name':people.c.name})
else:
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_fetch=polymorphic_fetch, polymorphic_on=people.c.type, polymorphic_identity='person')
-
+
mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
mapper(Manager, managers, inherits=person_mapper, select_table=manager_join, polymorphic_identity='manager')
mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
-
+
if use_literal_join:
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=lazy_relation, primaryjoin=people.c.company_id==companies.c.company_id, private=True,
- backref="company"
+ 'employees': relation(Person, lazy=lazy_relation,
+ primaryjoin=(people.c.company_id ==
+ companies.c.company_id),
+ cascade="all,delete-orphan",
+ backref="company"
)
})
else:
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=lazy_relation, private=True,
+ 'employees': relation(Person, lazy=lazy_relation,
+ cascade="all, delete-orphan",
backref="company"
)
})
-
+
if redefine_colprop:
person_attribute_name = 'person_name'
else:
person_attribute_name = 'name'
-
+
session = create_session()
c = Company(name='company1')
c.employees.append(Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'}))
c.employees.append(Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'}))
dilbert = c.employees[-1]
-
+
if include_base:
c.employees.append(Person(status='HHH', **{person_attribute_name:'joesmith'}))
c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'}))
print session.new
session.flush()
session.clear()
-
+
dilbert = session.query(Person).get(dilbert.person_id)
assert getattr(dilbert, person_attribute_name) == 'dilbert'
session.clear()
-
+
dilbert = session.query(Person).filter(Person.person_id==dilbert.person_id).one()
assert getattr(dilbert, person_attribute_name) == 'dilbert'
session.clear()
-
+
id = c.company_id
def go():
c = session.query(Company).get(id)
self.assert_sql_count(testbase.db, go, 1)
else:
self.assert_sql_count(testbase.db, go, 5)
-
+
else:
if polymorphic_fetch=='union':
self.assert_sql_count(testbase.db, go, 2)
assert dilbert is dilbert2
session.query(Person).filter((Engineer.engineer_name=="engineer1") & (Engineer.person_id==people.c.person_id)).first()
-
+
dilbert2 = session.query(Engineer).filter(Engineer.engineer_name=="engineer1")[0]
assert dilbert is dilbert2
-
+
dilbert.engineer_name = 'hes dibert!'
session.flush()
session.clear()
c = session.query(Manager).all()
assert sets.Set([repr(x) for x in c]) == sets.Set(["Manager pointy haired boss, status AAB, manager_name manager1", "Manager jsmith, status ABA, manager_name manager2", "Boss daboss, status BBB, manager_name boss golf swing fore"]), repr([repr(x) for x in c])
-
+
c = session.query(Company).get(id)
for e in c.employees:
print e, e._instance_key
session.delete(c)
session.flush()
-
-
+
+
test_roundtrip.__name__ = "test_%s%s%s%s%s" % (
(lazy_relation and "lazy" or "eager"),
(include_base and "_inclbase" or ""),
generate_round_trip_test(include_base, lazy_relation, redefine_colprop, use_literal_join, polymorphic_fetch, use_outer_joins)
else:
generate_round_trip_test(include_base, lazy_relation, redefine_colprop, use_literal_join, polymorphic_fetch, False)
-
-if __name__ == "__main__":
- testbase.main()
+if __name__ == "__main__":
+ testbase.main()
def define_tables(self, metadata):
global people, managers
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
def tearDown(self):
people.update(values={people.c.manager_id:None}).execute()
super(RelationTest1, self).tearDown()
-
+
def testparentrefsdescendant(self):
class Person(AttrSettable):
pass
clear_mappers()
mapper(Person, people, properties={
- 'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+ 'manager':relation(Manager, primaryjoin=(people.c.manager_id ==
+ managers.c.person_id),
+ foreign_keys=[people.c.manager_id],
+ uselist=False, post_update=True)
})
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==managers.c.person_id)
session = create_session()
p = Person(name='some person')
mapper(Person, people)
mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={
- 'employee':relation(Person, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+ 'employee':relation(Person, primaryjoin=(people.c.manager_id ==
+ managers.c.person_id),
+ foreign_keys=[people.c.manager_id],
+ uselist=False, post_update=True)
})
session = create_session()
m = session.query(Manager).get(m.person_id)
print p, m, m.employee
assert m.employee is p
-
+
class RelationTest2(ORMTest):
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('manager_id', Integer, ForeignKey('people.person_id')),
Column('status', String(30)),
)
-
+
data = Table('data', metadata,
Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
Column('data', String(30))
)
-
+
def testrelationonsubclass_j1_nodata(self):
self.do_test("join1", False)
def testrelationonsubclass_j2_nodata(self):
self.do_test("join3", False)
def testrelationonsubclass_j3_data(self):
self.do_test("join3", True)
-
+
def do_test(self, jointype="join1", usedata=False):
class Person(AttrSettable):
pass
elif jointype == "join3":
poly_union = None
polymorphic_on = people.c.type
-
+
if usedata:
class Data(object):
def __init__(self, data):
self.data = data
mapper(Data, data)
-
+
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=polymorphic_on)
if usedata:
m.data = Data('ms data')
sess.save(m)
sess.flush()
-
+
sess.clear()
p = sess.query(Person).get(p.person_id)
m = sess.query(Manager).get(m.person_id)
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('colleague_id', Integer, ForeignKey('people.person_id')),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
)
poly_union = people.outerjoin(managers)
elif jointype == "join4":
poly_union=None
-
+
if usedata:
mapper(Data, data)
-
+
if usedata:
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
properties={
'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True),
'data':relation(Data, uselist=False)
- }
+ }
)
else:
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
properties={
- 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
+ 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
remote_side=people.c.colleague_id, uselist=True)
- }
+ }
)
mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager')
sess.save(m)
sess.save(p)
sess.flush()
-
+
sess.clear()
p = sess.query(Person).get(p.person_id)
p2 = sess.query(Person).get(p2.person_id)
if usedata:
assert p.data.data == 'ps data'
assert m.data.data == 'ms data'
-
- do_test.__name__ = 'test_relationonbaseclass_%s_%s' % (jointype, data and "nodata" or "data")
+
+ do_test.__name__ = 'test_relationonbaseclass_%s_%s' % (jointype, data and "nodata" or "data")
return do_test
for jointype in ["join1", "join2", "join3", "join4"]:
for data in (True, False):
func = generate_test(jointype, data)
setattr(RelationTest3, func.__name__, func)
-
-
+
+
class RelationTest4(ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('name', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('longer_status', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('owner', Integer, ForeignKey('people.person_id')))
-
+
def testmanytoonepolymorphic(self):
"""in this test, the polymorphic union is between two subclasses, but does not include the base table by itself
in the union. however, the primaryjoin condition is going to be against the base table, and its a many-to-one
relationship (unlike the test in polymorph.py) so the column in the base table is explicit. Can the ClauseAdapter
figure out how to alias the primaryjoin to the polymorphic union ?"""
-
+
# class definitions
class Person(object):
def __init__(self, **kwargs):
def __repr__(self):
return "Car number %d" % self.car_id
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
'manager':people.join(managers),
}, "type", 'employee_join')
-
+
person_mapper = mapper(Person, people, select_table=employee_join,polymorphic_on=employee_join.c.type, polymorphic_identity='person')
engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
car_mapper = mapper(Car, cars, properties= {'employee':relation(person_mapper)})
-
+
print class_mapper(Person).primary_key
print person_mapper.get_select_mapper().primary_key
-
+
session = create_session()
# creating 5 managers named from M1 to E5
engineer4 = session.query(Engineer).filter(Engineer.name=="E4").first()
manager3 = session.query(Manager).filter(Manager.name=="M3").first()
-
+
car1 = Car(employee=engineer4)
session.save(car1)
car2 = Car(employee=manager3)
session.flush()
session.clear()
-
+
print "----------------------------"
car1 = session.query(Car).get(car1.car_id)
print "----------------------------"
testcar = session.query(Car).options(eagerload('employee')).get(car1.car_id)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testbase.db, go, 1)
-
+
session.clear()
s = session.query(Car)
c = s.join("employee").filter(Person.name=="E4")[0]
class RelationTest5(ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('name', String(50)),
Column('type', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('longer_status', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('owner', Integer, ForeignKey('people.person_id')))
-
+
def testeagerempty(self):
"""an easy one...test parent object with child relation to an inheriting mapper, using eager loads,
works when there are no child objects present"""
sess.save(car2)
sess.flush()
sess.clear()
-
- carlist = sess.query(Car).select()
+
+ carlist = sess.query(Car).all()
assert carlist[0].manager is None
assert carlist[1].manager.person_id == car2.manager.person_id
"""test self-referential relationships on a single joined-table inheritance mapper"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
)
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('colleague_id', Integer, ForeignKey('managers.person_id')),
Column('status', String(30)),
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
- 'manager':people.join(managers),
+ 'manager':people.join(managers),
}, "type", 'employee_join')
car_join = polymorphic_union(
offroad_car_mapper = mapper(Offraod_Car, offroad_cars, inherits=car_mapper, polymorphic_identity='offroad')
person_mapper = mapper(Person, people,
select_table=employee_join,polymorphic_on=employee_join.c.type,
- polymorphic_identity='person',
+ polymorphic_identity='person',
properties={
'car':relation(car_mapper)
})
session.flush()
session.clear()
- r = session.query(Person).select()
+ r = session.query(Person).all()
for p in r:
assert p.car_id == p.car.car_id
-
+
class GenerativeTest(AssertMixin):
def setUpAll(self):
# cars---owned by--- people (abstract) --- has a --- status
global metadata, status, people, engineers, managers, cars
metadata = MetaData(testbase.db)
# table definitions
- status = Table('status', metadata,
+ status = Table('status', metadata,
Column('status_id', Integer, primary_key=True),
Column('name', String(20)))
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('name', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('field', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('category', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('owner', Integer, ForeignKey('people.person_id'), nullable=False))
clear_mappers()
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
-
+
def testjointo(self):
# class definitions
class PersistentObject(object):
def __repr__(self):
return "Car number %d" % self.car_id
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
}, "type", 'employee_join')
status_mapper = mapper(Status, status)
- person_mapper = mapper(Person, people,
- select_table=employee_join,polymorphic_on=employee_join.c.type,
+ person_mapper = mapper(Person, people,
+ select_table=employee_join,polymorphic_on=employee_join.c.type,
polymorphic_identity='person', properties={'status':relation(status_mapper)})
engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
session.flush()
# TODO: we haven't created assertions for all the data combinations created here
-
+
# creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5
# M4, M5, E4 and E5 are dead
for i in range(1,5):
session.flush()
# get E4
- engineer4 = session.query(engineer_mapper).get_by(name="E4")
+ engineer4 = session.query(engineer_mapper).filter_by(name="E4").one()
# create 2 cars for E4, one active and one dead
car1 = Car(employee=engineer4,status=active)
assert str(list(r)) == "[Manager M2, category YYYYYYYYY, status Status active, Engineer E2, field X, status Status active]"
r = session.query(Engineer).join('status').filter(people.c.name.in_(['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) & (status.c.name=="active"))
assert str(list(r)) == "[Engineer E2, field X, status Status active, Engineer E3, field X, status Status active]"
- # this test embeds the original polymorphic union (employee_join) fully
- # into the WHERE criterion, using a correlated select. ticket #577 tracks
- # that Query's adaptation of the WHERE clause does not dig into the
+ # this test embeds the original polymorphic union (employee_join) fully
+ # into the WHERE criterion, using a correlated select. ticket #577 tracks
+ # that Query's adaptation of the WHERE clause does not dig into the
# mapped selectable itself, which permanently breaks the mapped selectable.
r = session.query(Person).filter(exists([Car.c.owner], Car.c.owner==employee_join.c.person_id))
assert str(list(r)) == "[Engineer E4, field X, status Status dead]"
-
+
class MultiLevelTest(ORMTest):
def define_tables(self, metadata):
global table_Employee, table_Engineer, table_Manager
# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'),
# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
# }, None, 'pu_employee', )
-
+
mapper_Employee = mapper( Employee, table_Employee,
polymorphic_identity= 'Employee',
polymorphic_on= pu_Employee.c.atype,
session.save(b)
session.save(c)
session.flush()
- assert set(session.query(Employee).select()) == set([a,b,c])
- assert set(session.query( Engineer).select()) == set([b,c])
- assert session.query( Manager).select() == [c]
+ assert set(session.query(Employee).all()) == set([a,b,c])
+ assert set(session.query( Engineer).all()) == set([b,c])
+ assert session.query( Manager).all() == [c]
class ManyToManyPolyTest(ORMTest):
def define_tables(self, metadata):
'collection', metadata,
Column('id', Integer, primary_key=True),
Column('name', Unicode(255)))
-
+
def test_pjoin_compile(self):
- """test that remote_side columns in the secondary join table arent attempted to be
+ """test that remote_side columns in the secondary join table arent attempted to be
matched to the target polymorphic selectable"""
class BaseItem(object): pass
class Item(BaseItem): pass
polymorphic_identity='Item')
mapper(Collection, collection_table)
-
+
class_mapper(BaseItem)
class CustomPKTest(ORMTest):
def define_tables(self, metadata):
global t1, t2
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('type', String(30), nullable=False),
Column('data', String(30)))
def test_custompk(self):
"""test that the primary_key attribute is propigated to the polymorphic mapper"""
-
+
class T1(object):pass
class T2(T1):pass
-
+
# create a polymorphic union with the select against the base table first.
- # with the join being second, the alias of the union will
+ # with the join being second, the alias of the union will
# pick up two "primary key" columns. technically the alias should have a
# 2-col pk in any case but the leading select has a NULL for the "t2id" column
d = util.OrderedDict()
d['t1'] = t1.select(t1.c.type=='t1')
d['t2'] = t1.join(t2)
pjoin = polymorphic_union(d, None, 'pjoin')
-
+
mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', select_table=pjoin, primary_key=[pjoin.c.id])
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
print [str(c) for c in class_mapper(T1).primary_key]
sess.save(ot2)
sess.flush()
sess.clear()
-
+
# query using get(), using only one value. this requires the select_table mapper
# has the same single-col primary key.
assert sess.query(T1).get(ot1.id).id == ot1.id
-
+
ot1 = sess.query(T1).get(ot1.id)
ot1.data = 'hi'
sess.flush()
def test_pk_collapses(self):
- """test that a composite primary key attribute formed by a join is "collapsed" into its
+ """test that a composite primary key attribute formed by a join is "collapsed" into its
minimal columns"""
class T1(object):pass
class T2(T1):pass
# create a polymorphic union with the select against the base table first.
- # with the join being second, the alias of the union will
+ # with the join being second, the alias of the union will
# pick up two "primary key" columns. technically the alias should have a
# 2-col pk in any case but the leading select has a NULL for the "t2id" column
d = util.OrderedDict()
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
assert len(class_mapper(T1).primary_key) == 1
assert len(class_mapper(T1).get_select_mapper().compile().primary_key) == 1
-
+
print [str(c) for c in class_mapper(T1).primary_key]
ot1 = T1()
ot2 = T2()
class InheritingEagerTest(ORMTest):
def define_tables(self, metadata):
global people, employees, tags, peopleTags
-
+
people = Table('people', metadata,
Column('id', Integer, primary_key=True),
Column('_type', String(30), nullable=False),
tags = Table('tags', metadata,
Column('id', Integer, primary_key=True),
- Column('label', String, nullable=False),
+ Column('label', String(50), nullable=False),
)
peopleTags = Table('peopleTags', metadata,
Column('person_id', Integer,ForeignKey('people.id')),
Column('tag_id', Integer,ForeignKey('tags.id')),
)
-
+
def test_basic(self):
"""test that Query uses the full set of mapper._eager_loaders when generating SQL"""
-
+
class Person(fixtures.Base):
pass
-
+
class Employee(Person):
def __init__(self, name='bob'):
self.name = name
bob = Employee()
session.save(bob)
-
+
tag = Tag('crazy')
bob.tags.append(tag)
# query from Employee with limit, query needs to apply eager limiting subquery
instance = session.query(Employee).filter_by(id=1).limit(1).first()
assert len(instance.tags) == 2
-
-
-if __name__ == "__main__":
+
+
+if __name__ == "__main__":
testbase.main()
-
nullable=True),
Column('quantity', Float, default=1.),
)
-
+
documents_table = Table('documents', metadata,
Column('document_id', Integer, primary_key=True),
Column('document_type', String(128)),
Column('data', Binary),
Column('size', Integer, default=0),
)
-
+
class Product(object):
def __init__(self, name, mark=''):
self.name = name
self.data = data
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.name)
-
- class RasterDocument(Document):
+
+ class RasterDocument(Document):
pass
def testone(self):
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
lazy=True, backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id),
uselist=False),
- slave=relation(Product,
- foreignkey=specification_table.c.slave_id,
+ slave=relation(Product,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
lazy=True, uselist=False),
quantity=specification_table.c.quantity,
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
- slave=relation(Product,
- foreignkey=specification_table.c.slave_id,
+ slave=relation(Product,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
lazy=True, uselist=False),
)
orig = repr([s, s2])
session.flush()
session.clear()
- new = repr(session.query(SpecLine).select())
+ new = repr(session.query(SpecLine).all())
print orig
print new
assert orig == new == '[<SpecLine 1.0 <Product p1>>, <SpecLine 1.0 <Detail d1>>]'
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly, lazy=False, uselist=False,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id, cascade="all, delete-orphan"),
),
slave=relation(Product, lazy=False, uselist=False,
- foreignkey=specification_table.c.slave_id,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
),
quantity=specification_table.c.quantity,
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
- assert len(session.query(Document).select()) == 0
+ a1 = session.query(Product).filter_by(name='a1').one()
+ assert len(session.query(Document).all()) == 0
def testfive(self):
"""tests the late compilation of mappers"""
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly, lazy=False, uselist=False,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id),
),
slave=relation(Product, lazy=False, uselist=False,
- foreignkey=specification_table.c.slave_id,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
),
quantity=specification_table.c.quantity,
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
assert orig == new == '<Assembly a1> specification=[<SpecLine 1.0 <Detail d1>>] documents=[<Document doc1>, <RasterDocument doc2>]'
-
-if __name__ == "__main__":
+
+if __name__ == "__main__":
testbase.main()
def setUpAll(self):
metadata = MetaData(testbase.db)
global employees_table
- employees_table = Table('employees', metadata,
+ employees_table = Table('employees', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('manager_data', String(50)),
session.save(e2)
session.flush()
- assert session.query(Employee).select() == [m1, e1, e2]
- assert session.query(Engineer).select() == [e1, e2]
- assert session.query(Manager).select() == [m1]
- assert session.query(JuniorEngineer).select() == [e2]
-
+ assert session.query(Employee).all() == [m1, e1, e2]
+ assert session.query(Engineer).all() == [e1, e2]
+ assert session.query(Manager).all() == [m1]
+ assert session.query(JuniorEngineer).all() == [e2]
+
if __name__ == '__main__':
testbase.main()
metadata = MetaData(testbase.db)
info_table = Table('infos', metadata,
Column('pk', Integer, primary_key=True),
- Column('info', String))
+ Column('info', String(128)))
data_table = Table('data', metadata,
Column('data_pk', Integer, primary_key=True),
Column('info_pk', Integer, ForeignKey(info_table.c.pk)),
Column('timeval', Integer),
- Column('data_val', String))
+ Column('data_val', String(128)))
rel_table = Table('rels', metadata,
Column('rel_pk', Integer, primary_key=True),
def test_propfilters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
- Column('type', String),
- Column('name', String),
+ Column('type', String(128)),
+ Column('name', String(128)),
Column('employee_number', Integer),
Column('boss_id', Integer, ForeignKey('person.id')),
Column('vendor_id', Integer))
metadata = MetaData()
tbl_a = Table("tbl_a", metadata,
Column("id", Integer, primary_key=True),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_b = Table("tbl_b", metadata,
Column("id", Integer, primary_key=True),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_c = Table("tbl_c", metadata,
Column("id", Integer, primary_key=True),
Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_d = Table("tbl_d", metadata,
Column("id", Integer, primary_key=True),
Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False),
Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")),
- Column("name", String),
+ Column("name", String(128)),
)
def setUp(self):
global session
Column("jobno", Unicode(15), primary_key=True),
Column("pagename", Unicode(30), primary_key=True),
Column("comment_id", Integer, primary_key=True, autoincrement=False),
- Column("content", Unicode),
+ Column("content", UnicodeText),
ForeignKeyConstraint(["jobno", "pagename"], ["pages.jobno", "pages.pagename"])
)
session.save(li)
session.flush()
session.clear()
- newcon = session.query(Container).selectfirst()
+ newcon = session.query(Container).first()
assert con.policyNum == newcon.policyNum
assert len(newcon.lineItems) == 10
for old, new in zip(con.lineItems, newcon.lineItems):
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
"""
-
+
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
"anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\
"anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\
"AS description FROM mytable) AS anon_2) AS anon_1")
-
+
def testmssql_noorderbyinsubquery(self):
"""test that the ms-sql dialect removes ORDER BY clauses from subqueries"""
dialect = mssql.dialect()
assert False
except exceptions.InvalidRequestError, err:
assert str(err) == "Scalar select can only be created from a Select object that has exactly one column expression.", str(err)
-
+
try:
# generic function which will look at the type of expression
func.coalesce(select([table1.c.myid]))
assert False
except exceptions.InvalidRequestError, err:
assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err)
-
+
s = select([table1.c.myid], scalar=True, correlate=False)
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
-
+
# generative order_by
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
+ table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
+ table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable"
)
-
+
def testgroupby_and_orderby(self):
self.assert_compile(
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
+ testing.squelch_deprecation(positional.get_params)
pp = positional.get_params()
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
pp = positional.get_params(**test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
+ testing.enable_deprecation(positional.get_params)
# check that params() doesnt modify original statement
s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid')))
# test using same 'unique' param object twice in one compile
s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
s2 = select([table1, s], table1.c.myid==s)
- self.assert_compile(s2,
+ self.assert_compile(s2,
"SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
":mytable_myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
+ testing.squelch_deprecation(positional.get_params)
pp = positional.get_params()
+ testing.enable_deprecation(positional.get_params)
assert [pp[k] for k in positional.positiontup] == [12, 12]
-
+
# check that conflicts with "unique" params are caught
s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid_1')))
try:
import datetime
table = Table('dt', metadata,
Column('date', Date))
- self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date_1 AND :dt_date_2", checkparams={'dt_date_1':datetime.date(2006,6,1), 'dt_date_2':datetime.date(2006,6,5)})
- self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
def testselect(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
- self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
+ self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
"SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\
"remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
return set([tuple(row) for row in results])
+
+def squelch_deprecation(callable_):
+ _set_deprecation(callable_, False)
+
+def enable_deprecation(callable_):
+ _set_deprecation(callable_, True)
+
+def _set_deprecation(callable_, state):
+ if hasattr(callable_, 'im_func'):
+ callable_ = callable_.im_func
+ assert hasattr(callable_, 'warn'), 'Callable is not deprecated'
+ setattr(callable_, 'warn', state)
+
class TestData(object):
"""Tracks SQL expressions as they are executed via an instrumented ExecutionContext."""
import sqlalchemy.util as util
def zblog_mappers():
- # User mapper. Here, we redefine the names of some of the columns
- # to different property names. normally the table columns are all
- # sucked in automatically.
+ # User mapper. Here, we redefine the names of some of the columns to
+ # different property names. normally the table columns are all sucked in
+ # automatically.
mapper(user.User, tables.users, properties={
'id':tables.users.c.user_id,
'name':tables.users.c.user_name,
'crypt_password':tables.users.c.password,
})
- # blog mapper. this contains a reference to the user mapper,
- # and also installs a "backreference" on that relationship to handle it
- # in both ways. this will also attach a 'blogs' property to the user mapper.
+ # blog mapper. this contains a reference to the user mapper, and also
+ # installs a "backreference" on that relationship to handle it in both
+ # ways. this will also attach a 'blogs' property to the user mapper.
mapper(Blog, tables.blogs, properties={
'id':tables.blogs.c.blog_id,
- 'owner':relation(user.User, lazy=False, backref=backref('blogs', cascade="all, delete-orphan")),
+ 'owner':relation(user.User, lazy=False,
+ backref=backref('blogs', cascade="all, delete-orphan")),
})
# topic mapper. map all topic columns to the Topic class.
mapper(Topic, tables.topics)
-
- # TopicAssocation mapper. This is an "association" object, which is similar to
- # a many-to-many relationship except extra data is associated with each pair
- # of related data. because the topic_xref table doesnt have a primary key,
- # the "primary key" columns of a TopicAssociation are defined manually here.
- mapper(TopicAssociation,tables.topic_xref,
- primary_key=[tables.topic_xref.c.post_id, tables.topic_xref.c.topic_id],
+
+ # TopicAssocation mapper. This is an "association" object, which is
+ # similar to a many-to-many relationship except extra data is associated
+ # with each pair of related data. because the topic_xref table doesnt
+ # have a primary key, the "primary key" columns of a TopicAssociation are
+ # defined manually here.
+ mapper(TopicAssociation,tables.topic_xref,
+ primary_key=[tables.topic_xref.c.post_id,
+ tables.topic_xref.c.topic_id],
properties={
'topic':relation(Topic, lazy=False),
})
- # Post mapper, these are posts within a blog.
- # since we want the count of comments for each post, create a select that will get the posts
- # and count the comments in one query.
+ # Post mapper, these are posts within a blog.
+ # since we want the count of comments for each post, create a select that
+ # will get the posts and count the comments in one query.
posts_with_ccount = select(
[c for c in tables.posts.c if c.key != 'body'] + [
func.count(tables.comments.c.comment_id).label('comment_count')
]
) .alias('postswcount')
- # then create a Post mapper on that query.
- # we have the body as "deferred" so that it loads only when needed,
- # the user as a Lazy load, since the lazy load will run only once per user and
- # its usually only one user's posts is needed per page,
- # the owning blog is a lazy load since its also probably loaded into the identity map
- # already, and topics is an eager load since that query has to be done per post in any
- # case.
+ # then create a Post mapper on that query.
+ # we have the body as "deferred" so that it loads only when needed, the
+ # user as a Lazy load, since the lazy load will run only once per user and
+ # its usually only one user's posts is needed per page, the owning blog is
+ # a lazy load since its also probably loaded into the identity map
+ # already, and topics is an eager load since that query has to be done per
+ # post in any case.
mapper(Post, posts_with_ccount, properties={
'id':posts_with_ccount.c.post_id,
'body':deferred(tables.posts.c.body),
- 'user':relation(user.User, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
- 'blog':relation(Blog, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
- 'topics':relation(TopicAssociation, lazy=False, private=True, association=Topic, backref='post')
+ 'user':relation(user.User, lazy=True,
+ backref=backref('posts', cascade="all, delete-orphan")),
+ 'blog':relation(Blog, lazy=True,
+ backref=backref('posts', cascade="all, delete-orphan")),
+ 'topics':relation(TopicAssociation, lazy=False,
+ cascade="all, delete-orphan",
+ backref='post')
}, order_by=[desc(posts_with_ccount.c.datetime)])
- # comment mapper. This mapper is handling a hierarchical relationship on itself, and contains
- # a lazy reference both to its parent comment and its list of child comments.
+ # comment mapper. This mapper is handling a hierarchical relationship on
+ # itself, and contains a lazy reference both to its parent comment and its
+ # list of child comments.
mapper(Comment, tables.comments, properties={
'id':tables.comments.c.comment_id,
- 'post':relation(Post, lazy=True, backref=backref('comments', cascade="all, delete-orphan")),
- 'user':relation(user.User, lazy=False, backref=backref('comments', cascade="all, delete-orphan")),
- 'parent':relation(Comment, primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, foreignkey=tables.comments.c.comment_id, lazy=True, uselist=False),
- 'replies':relation(Comment,primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, lazy=True, uselist=True, cascade="all"),
+ 'post':relation(Post, lazy=True,
+ backref=backref('comments',
+ cascade="all, delete-orphan")),
+ 'user':relation(user.User, lazy=False,
+ backref=backref('comments',
+ cascade="all, delete-orphan")),
+ 'parent':relation(Comment,
+ primaryjoin=(tables.comments.c.parent_comment_id ==
+ tables.comments.c.comment_id),
+ foreign_keys=[tables.comments.c.comment_id],
+ lazy=True, uselist=False),
+ 'replies':relation(Comment,
+ primaryjoin=(tables.comments.c.parent_comment_id ==
+ tables.comments.c.comment_id),
+ lazy=True, uselist=True, cascade="all"),
})
-# we define one special find-by for the comments of a post, which is going to make its own "noload"
-# mapper and organize the comments into their correct hierarchy in one pass. hierarchical
-# data normally needs to be loaded by separate queries for each set of children, unless you
-# use a proprietary extension like CONNECT BY.
+# we define one special find-by for the comments of a post, which is going to
+# make its own "noload" mapper and organize the comments into their correct
+# hierarchy in one pass. hierarchical data normally needs to be loaded by
+# separate queries for each set of children, unless you use a proprietary
+# extension like CONNECT BY.
def find_by_post(post):
- """returns a hierarchical collection of comments based on a given criterion.
- uses a mapper that does not lazy load replies or parents, and instead
+ """returns a hierarchical collection of comments based on a given criterion.
+
+ Uses a mapper that does not lazy load replies or parents, and instead
organizes comments into a hierarchical tree when the result is produced.
"""
+
q = session().query(Comment).options(noload('replies'), noload('parent'))
comments = q.select_by(post_id=post.id)
result = []
def session():
return trans.session
-
metadata = MetaData()
-users = Table('users', metadata,
+users = Table('users', metadata,
Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
Column('user_name', String(30), nullable=False),
Column('fullname', String(100), nullable=False),
Column('groupname', String(20), nullable=False),
)
-blogs = Table('blogs', metadata,
+blogs = Table('blogs', metadata,
Column('blog_id', Integer, Sequence('blog_id_seq', optional=True), primary_key=True),
Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('name', String(100), nullable=False),
Column('description', String(500))
)
-
+
posts = Table('posts', metadata,
Column('post_id', Integer, Sequence('post_id_seq', optional=True), primary_key=True),
Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('headline', String(500)),
- Column('summary', String),
- Column('body', String),
+ Column('summary', Text),
+ Column('body', Text),
)
-
+
topics = Table('topics', metadata,
Column('topic_id', Integer, primary_key=True),
Column('keyword', String(50), nullable=False),
Column('description', String(500))
)
-
-topic_xref = Table('topic_post_xref', metadata,
+
+topic_xref = Table('topic_post_xref', metadata,
Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
Column('is_primary', Boolean, nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
)
-comments = Table('comments', metadata,
+comments = Table('comments', metadata,
Column('comment_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
Column('subject', String(500)),
- Column('body', String),
+ Column('body', Text),
)
-