from sqlalchemy.sql import util as sql_util
-__all__ = 'declarative_base', 'synonym_for', 'comparable_using'
+__all__ = 'declarative_base', 'synonym_for', 'comparable_using', 'instrument_declarative'
+def instrument_declarative(cls, registry, metadata):
+ """Given a class, configure the class declaratively,
+ using the given registry (any dictionary) and MetaData object.
+ This operation does not assume any kind of class hierarchy.
+
+ """
+ if '_decl_class_registry' in cls.__dict__:
+ raise exceptions.InvalidRequestError("Class %r already has been instrumented declaratively" % cls)
+ cls._decl_class_registry = registry
+ cls.metadata = metadata
+ _as_declarative(cls, cls.__name__, cls.__dict__)
+
+def _as_declarative(cls, classname, dict_):
+ cls._decl_class_registry[classname] = cls
+ our_stuff = util.OrderedDict()
+ for k in dict_:
+ value = dict_[k]
+ if (isinstance(value, tuple) and len(value) == 1 and
+ isinstance(value[0], (Column, MapperProperty))):
+ util.warn("Ignoring declarative-like tuple value of attribute "
+ "%s: possibly a copy-and-paste error with a comma "
+ "left at the end of the line?" % k)
+ continue
+ if not isinstance(value, (Column, MapperProperty)):
+ continue
+ prop = _deferred_relation(cls, value)
+ our_stuff[k] = prop
+
+ # set up attributes in the order they were created
+ our_stuff.sort(lambda x, y: cmp(our_stuff[x]._creation_order,
+ our_stuff[y]._creation_order))
+
+ table = None
+ if '__table__' not in cls.__dict__:
+ if '__tablename__' in cls.__dict__:
+ tablename = cls.__tablename__
+ autoload = cls.__dict__.get('__autoload__')
+ if autoload:
+ table_kw = {'autoload': True}
+ else:
+ table_kw = {}
+ cols = []
+ for key, c in our_stuff.iteritems():
+ if isinstance(c, ColumnProperty):
+ for col in c.columns:
+ if isinstance(col, Column) and col.table is None:
+ _undefer_column_name(key, col)
+ cols.append(col)
+ elif isinstance(c, Column):
+ _undefer_column_name(key, c)
+ cols.append(c)
+ cls.__table__ = table = Table(tablename, cls.metadata,
+ *cols, **table_kw)
+ else:
+ table = cls.__table__
+
+ mapper_args = getattr(cls, '__mapper_args__', {})
+ if 'inherits' not in mapper_args:
+ inherits = cls.__mro__[1]
+ inherits = cls._decl_class_registry.get(inherits.__name__, None)
+ if inherits:
+ mapper_args['inherits'] = inherits
+ if not mapper_args.get('concrete', False) and table:
+ # figure out the inherit condition with relaxed rules
+ # about nonexistent tables, to allow for ForeignKeys to
+ # not-yet-defined tables (since we know for sure that our
+ # parent table is defined within the same MetaData)
+ mapper_args['inherit_condition'] = sql_util.join_condition(
+ inherits.__table__, table,
+ ignore_nonexistent_tables=True)
+
+ if hasattr(cls, '__mapper_cls__'):
+ mapper_cls = util.unbound_method_to_callable(cls.__mapper_cls__)
+ else:
+ mapper_cls = mapper
+
+ cls.__mapper__ = mapper_cls(cls, table, properties=our_stuff,
+ **mapper_args)
class DeclarativeMeta(type):
def __init__(cls, classname, bases, dict_):
if '_decl_class_registry' in cls.__dict__:
return type.__init__(cls, classname, bases, dict_)
-
- cls._decl_class_registry[classname] = cls
- our_stuff = util.OrderedDict()
- for k in dict_:
- value = dict_[k]
- if (isinstance(value, tuple) and len(value) == 1 and
- isinstance(value[0], (Column, MapperProperty))):
- util.warn("Ignoring declarative-like tuple value of attribute "
- "%s: possibly a copy-and-paste error with a comma "
- "left at the end of the line?" % k)
- continue
- if not isinstance(value, (Column, MapperProperty)):
- continue
- prop = _deferred_relation(cls, value)
- our_stuff[k] = prop
-
- # set up attributes in the order they were created
- our_stuff.sort(lambda x, y: cmp(our_stuff[x]._creation_order,
- our_stuff[y]._creation_order))
-
- table = None
- if '__table__' not in cls.__dict__:
- if '__tablename__' in cls.__dict__:
- tablename = cls.__tablename__
- autoload = cls.__dict__.get('__autoload__')
- if autoload:
- table_kw = {'autoload': True}
- else:
- table_kw = {}
- cols = []
- for key, c in our_stuff.iteritems():
- if isinstance(c, ColumnProperty):
- for col in c.columns:
- if isinstance(col, Column) and col.table is None:
- _undefer_column_name(key, col)
- cols.append(col)
- elif isinstance(c, Column):
- _undefer_column_name(key, c)
- cols.append(c)
- cls.__table__ = table = Table(tablename, cls.metadata,
- *cols, **table_kw)
- else:
- table = cls.__table__
-
- mapper_args = getattr(cls, '__mapper_args__', {})
- if 'inherits' not in mapper_args:
- inherits = cls.__mro__[1]
- inherits = cls._decl_class_registry.get(inherits.__name__, None)
- if inherits:
- mapper_args['inherits'] = inherits
- if not mapper_args.get('concrete', False) and table:
- # figure out the inherit condition with relaxed rules
- # about nonexistent tables, to allow for ForeignKeys to
- # not-yet-defined tables (since we know for sure that our
- # parent table is defined within the same MetaData)
- mapper_args['inherit_condition'] = sql_util.join_condition(
- inherits.__table__, table,
- ignore_nonexistent_tables=True)
-
- if hasattr(cls, '__mapper_cls__'):
- mapper_cls = util.unbound_method_to_callable(cls.__mapper_cls__)
- else:
- mapper_cls = mapper
-
- cls.__mapper__ = mapper_cls(cls, table, properties=our_stuff,
- **mapper_args)
+
+ _as_declarative(cls, classname, dict_)
return type.__init__(cls, classname, bases, dict_)
def __setattr__(cls, key, value):
return comparable_property(comparator_factory, fn)
return decorate
-def declarative_base(engine=None, metadata=None, mapper=None):
+def declarative_base(engine=None, metadata=None, mapper=None, cls=object):
lcl_metadata = metadata or MetaData()
if engine:
lcl_metadata.bind = engine
- class Base(object):
+ class Base(cls):
__metaclass__ = DeclarativeMeta
metadata = lcl_metadata
if mapper:
foo = sa.orm.column_property(User.id == 5)
self.assertRaises(sa.exc.InvalidRequestError, go)
+ def test_custom_base(self):
+ class MyBase(object):
+ def foobar(self):
+ return "foobar"
+ Base = decl.declarative_base(cls=MyBase)
+ assert hasattr(Base, 'metadata')
+ assert Base().foobar() == "foobar"
+
def test_add_prop(self):
class User(Base, ComparableEntity):
__tablename__ = 'users'
eq_(a1, Address(email='two'))
eq_(a1.user, User(name='u1'))
+ def test_as_declarative(self):
+ class User(ComparableEntity):
+ __tablename__ = 'users'
+
+ id = Column('id', Integer, primary_key=True)
+ name = Column('name', String(50))
+ addresses = relation("Address", backref="user")
+
+ class Address(ComparableEntity):
+ __tablename__ = 'addresses'
+
+ id = Column('id', Integer, primary_key=True)
+ email = Column('email', String(50))
+ user_id = Column('user_id', Integer, ForeignKey('users.id'))
+
+ reg = {}
+ decl.instrument_declarative(User, reg, Base.metadata)
+ decl.instrument_declarative(Address, reg, Base.metadata)
+ Base.metadata.create_all()
+
+ u1 = User(name='u1', addresses=[
+ Address(email='one'),
+ Address(email='two'),
+ ])
+ sess = create_session()
+ sess.save(u1)
+ sess.flush()
+ sess.clear()
+ eq_(sess.query(User).all(), [User(name='u1', addresses=[
+ Address(email='one'),
+ Address(email='two'),
+ ])])
+
def test_custom_mapper(self):
class MyExt(sa.orm.MapperExtension):
def create_instance(self):