"engine_or_url", "bind_to", etc. are all present, but deprecated.
they all get replaced by the single term "bind". you also
set the "bind" of MetaData using
metadata.bind = <engine or connection>. this is part of 0.4
forwards compatibility where "bind" is the only keyword.
[ticket:631]
- better error message for NoSuchColumnError [ticket:607]
- finally figured out how to get setuptools version in, available
as sqlalchemy.__version__ [ticket:428]
+ - the various "engine" arguments, such as "engine", "connectable",
+ "engine_or_url", "bind_to", etc. are all present, but deprecated.
+ they all get replaced by the single term "bind". you also
+ set the "bind" of MetaData using
+ metadata.bind = <engine or connection>
- ext
- iteration over dict association proxies is now dict-like, not
InstrumentedList-like (e.g. over keys instead of values)
DBAPI cursor, and returns a ``ResultProxy`` that has ``fetch`` methods
you would also see on a cursor::
- >>> rp = db.engine.execute('select name, email from users order by name')
+ >>> rp = db.bind.execute('select name, email from users order by name')
>>> for name, email in rp.fetchall(): print name, email
Bhargan Basepair basepair+nospam@example.edu
Joe Student student@example.edu
self.schema = None
def engine(self):
- return self._metadata._engine
+ return self._metadata.bind
engine = property(engine)
+ bind = engine
def delete(self, *args, **kwargs):
objectstore.delete(*args, **kwargs)
def _begin(self):
return SessionTransaction(self.session, self)
- def add(self, connectable):
- if self.connections.has_key(connectable.engine):
+ def add(self, bind):
+ if self.connections.has_key(bind.engine):
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
- return self.get_or_add(connectable)
+ return self.get_or_add(bind)
- def get_or_add(self, connectable):
+ def get_or_add(self, bind):
# we reference the 'engine' attribute on the given object, which in the case of
# Connection, ProxyEngine, Engine, whatever, should return the original
# "Engine" object that is handling the connection.
- if self.connections.has_key(connectable.engine):
- return self.connections[connectable.engine][0]
- e = connectable.engine
- c = connectable.contextual_connect()
+ if self.connections.has_key(bind.engine):
+ return self.connections[bind.engine][0]
+ e = bind.engine
+ c = bind.contextual_connect()
if not self.connections.has_key(e):
- self.connections[e] = (c, c.begin(), c is not connectable)
+ self.connections[e] = (c, c.begin(), c is not bind)
return self.connections[e][0]
def commit(self):
of Sessions, see the ``sqlalchemy.ext.sessioncontext`` module.
"""
- def __init__(self, bind_to=None, hash_key=None, import_session=None, echo_uow=False, weak_identity_map=False):
+ def __init__(self, bind=None, bind_to=None, hash_key=None, import_session=None, echo_uow=False, weak_identity_map=False):
if import_session is not None:
self.uow = unitofwork.UnitOfWork(identity_map=import_session.uow.identity_map, weak_identity_map=weak_identity_map)
else:
self.uow = unitofwork.UnitOfWork(weak_identity_map=weak_identity_map)
- self.bind_to = bind_to
+ self.bind = bind or bind_to
self.binds = {}
self.echo_uow = echo_uow
self.weak_identity_map = weak_identity_map
def _set_echo_uow(self, value):
self.uow.echo = value
echo_uow = property(_get_echo_uow,_set_echo_uow)
+
+ bind_to = property(lambda self:self.bind)
def create_transaction(self, **kwargs):
"""Return a new ``SessionTransaction`` corresponding to an
return _class_mapper(class_, entity_name = entity_name)
- def bind_mapper(self, mapper, bindto):
+ def bind_mapper(self, mapper, bind):
"""Bind the given `mapper` to the given ``Engine`` or ``Connection``.
All subsequent operations involving this ``Mapper`` will use the
- given `bindto`.
+ given `bind`.
"""
- self.binds[mapper] = bindto
+ self.binds[mapper] = bind
- def bind_table(self, table, bindto):
+ def bind_table(self, table, bind):
"""Bind the given `table` to the given ``Engine`` or ``Connection``.
All subsequent operations involving this ``Table`` will use the
- given `bindto`.
+ given `bind`.
"""
- self.binds[table] = bindto
+ self.binds[table] = bind
def get_bind(self, mapper):
"""Return the ``Engine`` or ``Connection`` which is used to execute
"""
if mapper is None:
- return self.bind_to
+ return self.bind
elif self.binds.has_key(mapper):
return self.binds[mapper]
elif self.binds.has_key(mapper.mapped_table):
return self.binds[mapper.mapped_table]
- elif self.bind_to is not None:
- return self.bind_to
+ elif self.bind is not None:
+ return self.bind
else:
e = mapper.mapped_table.engine
if e is None:
- raise exceptions.InvalidRequestError("Could not locate any Engine bound to mapper '%s'" % str(mapper))
+ raise exceptions.InvalidRequestError("Could not locate any Engine or Connection bound to mapper '%s'" % str(mapper))
return e
def query(self, mapper_or_class, entity_name=None, **kwargs):
return None
- def _get_engine(self):
+ def _get_engine(self, raiseerr=False):
"""Return the engine or None if no engine."""
- return self._derived_metadata().engine
+ if raiseerr:
+ e = self._derived_metadata().bind
+ if e is None:
+ raise exceptions.InvalidRequestError("This SchemaItem is not connected to any Engine or Connection.")
+ else:
+ return e
+ else:
+ return self._derived_metadata().bind
def get_engine(self):
- """Return the engine or raise an error if no engine."""
-
- e = self._get_engine()
- if e is not None:
- return e
- else:
- raise exceptions.InvalidRequestError("This SchemaItem is not connected to any Engine")
+ """Return the engine or raise an error if no engine.
+
+ Deprecated. use the "bind" attribute.
+ """
+
+ return self._get_engine(raiseerr=True)
def _set_casing_strategy(self, name, kwargs, keyname='case_sensitive'):
"""Set the "case_sensitive" argument sent via keywords to the item's constructor.
engine = property(lambda s:s._get_engine())
metadata = property(lambda s:s._derived_metadata())
-
+ bind = property(lambda s:s.engine)
+
def _get_table_key(name, schema):
if schema is None:
return name
if autoload_with:
autoload_with.reflecttable(table)
else:
- metadata.get_engine().reflecttable(table)
+ metadata._get_engine(raiseerr=True).reflecttable(table)
except exceptions.NoSuchTableError:
del metadata.tables[key]
raise
else:
return []
- def exists(self, connectable=None):
+ def exists(self, bind=None, connectable=None):
"""Return True if this table exists."""
- if connectable is None:
- connectable = self.get_engine()
+ if connectable is not None:
+ bind = connectable
+
+ if bind is None:
+ bind = self._get_engine(raiseerr=True)
def do(conn):
e = conn.engine
return e.dialect.has_table(conn, self.name, schema=self.schema)
- return connectable.run_callable(do)
+ return bind.run_callable(do)
- def create(self, connectable=None, checkfirst=False):
+ def create(self, bind=None, checkfirst=False, connectable=None):
"""Issue a ``CREATE`` statement for this table.
See also ``metadata.create_all()``."""
- self.metadata.create_all(connectable=connectable, checkfirst=checkfirst, tables=[self])
+ if connectable is not None:
+ bind = connectable
+ self.metadata.create_all(bind=bind, checkfirst=checkfirst, tables=[self])
- def drop(self, connectable=None, checkfirst=False):
+ def drop(self, bind=None, checkfirst=False, connectable=None):
"""Issue a ``DROP`` statement for this table.
See also ``metadata.drop_all()``."""
- self.metadata.drop_all(connectable=connectable, checkfirst=checkfirst, tables=[self])
+ if connectable is not None:
+ bind = connectable
+ self.metadata.drop_all(bind=bind, checkfirst=checkfirst, tables=[self])
def tometadata(self, metadata, schema=None):
"""Return a copy of this ``Table`` associated with a different ``MetaData``."""
return self.table.metadata
def _get_engine(self):
- return self.table.engine
+ return self.table.bind
def append_foreign_key(self, fk):
fk._set_parent(self)
self.column.default = self
def execute(self, **kwargs):
- return self.get_engine().execute_default(self, **kwargs)
+ return self._get_engine(raiseerr=True).execute_default(self, **kwargs)
def __repr__(self):
return "DefaultGenerator()"
column.sequence = self
def create(self):
- self.get_engine().create(self)
+ self._get_engine(raiseerr=True).create(self)
return self
def drop(self):
- self.get_engine().drop(self)
+ self._get_engine(raiseerr=True).drop(self)
def accept_visitor(self, visitor):
"""Call the visit_seauence method on the given visitor."""
if connectable is not None:
connectable.create(self)
else:
- self.get_engine().create(self)
+ self._get_engine(raiseerr=True).create(self)
return self
def drop(self, connectable=None):
if connectable is not None:
connectable.drop(self)
else:
- self.get_engine().drop(self)
+ self._get_engine(raiseerr=True).drop(self)
def accept_visitor(self, visitor):
visitor.visit_index(self)
class MetaData(SchemaItem):
"""Represent a collection of Tables and their associated schema constructs."""
- def __init__(self, engine_or_url=None, url=None, engine=None, **kwargs):
+ def __init__(self, engine_or_url=None, url=None, bind=None, engine=None, **kwargs):
"""create a new MetaData object.
-
- engine_or_url
+
+ bind
an Engine, or a string or URL instance which will be passed
to create_engine(), along with \**kwargs - this MetaData will
be bound to the resulting engine.
-
+
+ engine_or_url
+ deprecated; a synonym for 'bind'
+
url
deprecated. a string or URL instance which will be passed to
create_engine(), along with \**kwargs - this MetaData will be
# MetaData(engine_or_url=None)
name = kwargs.get('name', None)
if engine_or_url is None:
- engine_or_url = url or engine
+ engine_or_url = url or bind or engine
elif 'name' in kwargs:
- engine_or_url = engine_or_url or engine or url
+ engine_or_url = engine_or_url or bind or engine or url
else:
import sqlalchemy.engine as engine
import sqlalchemy.engine.url as url
url.make_url(engine_or_url)
except exceptions.ArgumentError:
# nope, must have been a name as 1st positional
- name, engine_or_url = engine_or_url, (url or engine)
+ name, engine_or_url = engine_or_url, (url or engine or bind)
kwargs.pop('name', None)
self.tables = {}
self.name = name
- self._engine = None
+ self._bind = None
self._set_casing_strategy(name, kwargs)
if engine_or_url:
self.connect(engine_or_url, **kwargs)
self.tables = state['tables']
self.name = state['name']
self._case_sensitive_setting = state['casesensitive']
- self._engine = None
+ self._bind = None
def is_bound(self):
"""return True if this MetaData is bound to an Engine."""
- return self._engine is not None
+ return self._bind is not None
- def connect(self, engine_or_url, **kwargs):
+ def connect(self, bind=None, **kwargs):
"""bind this MetaData to an Engine.
+
+ DEPRECATED. use metadata.bind = <engine> or metadata.bind = <url>.
- engine_or_url
+ bind
a string, URL or Engine instance. If a string or URL,
will be passed to create_engine() along with \**kwargs to
produce the engine which to connect to. otherwise connects
directly to the given Engine.
+
+ engine_or_url
+ deprecated. synonymous with "bind"
"""
+ if bind is None:
+ bind = kwargs.pop('engine_or_url', None)
+ if bind is None:
+ raise exceptions.ArguemntError("'bind' argument is required for connect()")
from sqlalchemy.engine.url import URL
- if isinstance(engine_or_url, basestring) or isinstance(engine_or_url, URL):
- self._engine = sqlalchemy.create_engine(engine_or_url, **kwargs)
+ if isinstance(bind, (basestring, URL)):
+ self._bind = sqlalchemy.create_engine(bind, **kwargs)
else:
- self._engine = engine_or_url
+ self._bind = bind
+ bind = property(lambda self:self._bind, connect, doc="""an Engine or Connection to which this MetaData is bound. this is a settable property as well.""")
+
def clear(self):
self.tables.clear()
def _get_parent(self):
return None
- def create_all(self, connectable=None, tables=None, checkfirst=True):
+ def create_all(self, bind=None, tables=None, checkfirst=True, connectable=None):
"""Create all tables stored in this metadata.
This will conditionally create tables depending on if they do
not yet exist in the database.
+ bind
+ A ``Connectable`` used to access the database; if None, uses
+ the existing bind on this ``MetaData``, if any.
+
connectable
- A ``Connectable`` used to access the database; or use the engine
- bound to this ``MetaData``.
+ deprecated. synonymous with "bind"
tables
Optional list of tables, which is a subset of the total
tables in the ``MetaData`` (others are ignored).
"""
- if connectable is None:
- connectable = self.get_engine()
- connectable.create(self, checkfirst=checkfirst, tables=tables)
+ if connectable is not None:
+ bind = connectable
+ if bind is None:
+ bind = self._get_engine(raiseerr=True)
+ bind.create(self, checkfirst=checkfirst, tables=tables)
- def drop_all(self, connectable=None, tables=None, checkfirst=True):
+ def drop_all(self, bind=None, tables=None, checkfirst=True, connectable=None):
"""Drop all tables stored in this metadata.
This will conditionally drop tables depending on if they
currently exist in the database.
+ bind
+ A ``Connectable`` used to access the database; if None, uses
+ the existing bind on this ``MetaData``, if any.
+
connectable
- A ``Connectable`` used to access the database; or use the engine
- bound to this ``MetaData``.
+ deprecated. synonymous with "bind"
tables
Optional list of tables, which is a subset of the total
tables in the ``MetaData`` (others are ignored).
"""
- if connectable is None:
- connectable = self.get_engine()
- connectable.drop(self, checkfirst=checkfirst, tables=tables)
+ if connectable is not None:
+ bind = connectable
+ if bind is None:
+ bind = self._get_engine(raiseerr=True)
+ bind.drop(self, checkfirst=checkfirst, tables=tables)
def accept_visitor(self, visitor):
visitor.visit_metadata(self)
def _derived_metadata(self):
return self
- def _get_engine(self):
+ def _get_engine(self, raiseerr=False):
if not self.is_bound():
- return None
- return self._engine
+ if raiseerr:
+ raise exceptions.InvalidRequestError("This SchemaItem is not connected to any Engine or Connection.")
+ else:
+ return None
+ return self._bind
class BoundMetaData(MetaData):
for e in self.__engines.values():
e.dispose()
- def _get_engine(self):
+ def _get_engine(self, raiseerr=False):
if hasattr(self.context, '_engine'):
return self.context._engine
else:
- return None
+ if raiseerr:
+ raise exceptions.InvalidRequestError("This SchemaItem is not connected to any Engine or Connection.")
+ else:
+ return None
engine=property(_get_engine)
-
+ bind = property(_get_engine, connect)
def DynamicMetaData(name=None, threadlocal=True, **kw):
"""Deprecated. Use ``MetaData`` or ``ThreadLocalMetaData``."""
and oracle supports "nowait" which translates to
``FOR UPDATE NOWAIT``.
- engine=None
- an ``Engine`` instance to which the resulting ``Select``
+ bind=None
+ an ``Engine`` or ``Connection`` instance to which the resulting ``Select``
object will be bound. The ``Select`` object will otherwise
- automatically bind to whatever ``Engine`` instances can be located
+ automatically bind to whatever ``Connectable`` instances can be located
within its contained ``ClauseElement`` members.
-
+
+ engine=None
+ deprecated. a synonym for "bind".
+
limit=None
a numerical value which usually compiles to a ``LIMIT`` expression
in the resulting select. Databases that don't support ``LIMIT``
else:
return _BindParamClause(key, value, type=type, shortname=shortname, unique=unique)
-def text(text, engine=None, *args, **kwargs):
+def text(text, bind=None, engine=None, *args, **kwargs):
"""Create literal text to be inserted into a query.
When constructing a query from a ``select()``, ``update()``,
to specify bind parameters; they will be compiled to their
engine-specific format.
+ bind
+ An optional connection or engine to be used for this text query.
+
engine
- An optional engine to be used for this text query.
+ deprecated. a synonym for 'bind'.
bindparams
A list of ``bindparam()`` instances which can be used to define
"""
- return _TextClause(text, engine=engine, *args, **kwargs)
+ return _TextClause(text, engine=engine, bind=bind, *args, **kwargs)
def null():
"""Return a ``_Null`` object, which compiles to ``NULL`` in a sql statement."""
defaults.
"""
- def __init__(self, dialect, statement, parameters, engine=None):
+ def __init__(self, dialect, statement, parameters, bind=None, engine=None):
"""Construct a new ``Compiled`` object.
statement
can either be the string names of columns or
``_ColumnClause`` objects.
+ bind
+ optional engine or connection which will be bound to the
+ compiled object.
+
engine
- Optional Engine to compile this statement against.
+ deprecated, a synonym for 'bind'
"""
self.dialect = dialect
self.statement = statement
self.parameters = parameters
- self.engine = engine
+ self.bind = bind or engine
self.can_execute = statement.supports_execution()
def compile(self):
def execute(self, *multiparams, **params):
"""Execute this compiled object."""
- e = self.engine
+ e = self.bind
if e is None:
- raise exceptions.InvalidRequestError("This Compiled object is not bound to any engine.")
+ raise exceptions.InvalidRequestError("This Compiled object is not bound to any Engine or Connection.")
return e.execute_compiled(self, *multiparams, **params)
def scalar(self, *multiparams, **params):
"""
try:
- if self._engine is not None:
- return self._engine
+ if self._bind is not None:
+ return self._bind
except AttributeError:
pass
for f in self._get_from_objects():
if f is self:
continue
- engine = f.engine
+ engine = f.bind
if engine is not None:
return engine
else:
return None
-
- engine = property(lambda s: s._find_engine(),
- doc="""Attempts to locate a Engine within this ClauseElement
- structure, or returns None if none found.""")
+
+ bind = property(lambda s:s._find_engine(), doc="""Returns the Engine or Connection to which this ClauseElement is bound, or None if none found.""")
+ engine = bind
def execute(self, *multiparams, **params):
"""Compile and execute this ``ClauseElement``."""
compile_params = multiparams[0]
else:
compile_params = params
- return self.compile(engine=self.engine, parameters=compile_params).execute(*multiparams, **params)
+ return self.compile(bind=self.bind, parameters=compile_params).execute(*multiparams, **params)
def scalar(self, *multiparams, **params):
"""Compile and execute this ``ClauseElement``, returning the
return self.execute(*multiparams, **params).scalar()
- def compile(self, engine=None, parameters=None, compiler=None, dialect=None):
+ def compile(self, bind=None, engine=None, parameters=None, compiler=None, dialect=None):
"""Compile this SQL expression.
Uses the given ``Compiler``, or the given ``AbstractDialect``
if compiler is None:
if dialect is not None:
compiler = dialect.compiler(self, parameters)
+ elif bind is not None:
+ compiler = bind.compiler(self, parameters)
elif engine is not None:
compiler = engine.compiler(self, parameters)
- elif self.engine is not None:
- compiler = self.engine.compiler(self, parameters)
+ elif self.bind is not None:
+ compiler = self.bind.compiler(self, parameters)
if compiler is None:
import sqlalchemy.ansisql as ansisql
Public constructor is the ``text()`` function.
"""
- def __init__(self, text = "", engine=None, bindparams=None, typemap=None):
- self._engine = engine
+ def __init__(self, text = "", bind=None, engine=None, bindparams=None, typemap=None):
+ self._bind = bind or engine
self.bindparams = {}
self.typemap = typemap
if typemap is not None:
def __init__(self, name, *clauses, **kwargs):
self.name = name
self.type = sqltypes.to_instance(kwargs.get('type', None))
- self._engine = kwargs.get('engine', None)
+ self._bind = kwargs.get('bind', kwargs.get('engine', None))
self.group = kwargs.pop('group', True)
self.clauses = ClauseList(operator=kwargs.get('operator', None), group_contents=kwargs.get('group_contents', True), *clauses)
if self.group:
def copy_container(self):
clauses = [clause.copy_container() for clause in self.clauses]
- return _CalculatedClause(type=self.type, engine=self._engine, *clauses)
+ return _CalculatedClause(type=self.type, bind=self._bind, *clauses)
def get_children(self, **kwargs):
return self.clause_expr,
def copy_container(self):
clauses = [clause.copy_container() for clause in self.clauses]
- return _Function(self.name, type=self.type, packagenames=self.packagenames, engine=self._engine, *clauses)
+ return _Function(self.name, type=self.type, packagenames=self.packagenames, bind=self._bind, *clauses)
def accept_visitor(self, visitor):
visitor.visit_function(self)
def _group_parenthesized(self):
return False
- engine = property(lambda s: s.selectable.engine)
+ bind = property(lambda s: s.selectable.bind)
+ engine = bind
class _Grouping(ColumnElement):
def __init__(self, elem):
def __init__(self, columns=None, whereclause=None, from_obj=[],
order_by=None, group_by=None, having=None,
use_labels=False, distinct=False, for_update=False,
- engine=None, limit=None, offset=None, scalar=False,
+ engine=None, bind=None, limit=None, offset=None, scalar=False,
correlate=True):
"""construct a Select object.
self.use_labels = use_labels
self.whereclause = None
self.having = None
- self._engine = engine
+ self._bind = bind or engine
self.limit = limit
self.offset = offset
self.for_update = for_update
object, or searched within the from clauses for one.
"""
- if self._engine is not None:
- return self._engine
+ if self._bind is not None:
+ return self._bind
for f in self.__froms:
if f is self:
continue
- e = f.engine
+ e = f.bind
if e is not None:
- self._engine = e
+ self._bind = e
return e
# look through the columns (largely synomous with looking
# through the FROMs except in the case of _CalculatedClause/_Function)
for c in cc.columns:
if getattr(c, 'table', None) is self:
continue
- e = c.engine
+ e = c.bind
if e is not None:
- self._engine = e
+ self._bind = e
return e
return None
return parameters
def _find_engine(self):
- return self.table.engine
+ return self.table.bind
class _Insert(_UpdateBase):
def __init__(self, table, values=None):
# connectivity, execution
'engine.parseconnect',
'engine.pool',
+ 'engine.bind',
'engine.reconnect',
'engine.execute',
'engine.transaction',
--- /dev/null
+"""tests the "bind" attribute/argument across schema, SQL, and ORM sessions,
+including the deprecated versions of these arguments"""
+
+import testbase
+import unittest, sys, datetime
+import tables
+db = testbase.db
+from sqlalchemy import *
+
+class BindTest(testbase.PersistTest):
+ def test_create_drop_explicit(self):
+ metadata = MetaData()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+ for bind in (
+ testbase.db,
+ testbase.db.connect()
+ ):
+ for args in [
+ ([], {'connectable':bind}),
+ ([], {'bind':bind}),
+ ([bind], {})
+ ]:
+ metadata.create_all(*args[0], **args[1])
+ assert table.exists(*args[0], **args[1])
+ metadata.drop_all(*args[0], **args[1])
+ table.create(*args[0], **args[1])
+ table.drop(*args[0], **args[1])
+ assert not table.exists(*args[0], **args[1])
+
+ def test_create_drop_err(self):
+ metadata = MetaData()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+
+ for meth in [
+ metadata.create_all,
+ table.exists,
+ metadata.drop_all,
+ table.create,
+ table.drop,
+ ]:
+ try:
+ meth()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "This SchemaItem is not connected to any Engine or Connection."
+
+ def test_create_drop_bound(self):
+
+ for meta in (MetaData,ThreadLocalMetaData):
+ for bind in (
+ testbase.db,
+ testbase.db.connect()
+ ):
+ metadata = meta()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+ metadata.bind = bind
+ assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ metadata.create_all()
+ assert table.exists()
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ assert not table.exists()
+
+ metadata = meta()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+
+ metadata.connect(bind)
+ assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ metadata.create_all()
+ assert table.exists()
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ assert not table.exists()
+
+ def test_create_drop_constructor_bound(self):
+ for bind in (
+ testbase.db,
+ testbase.db.connect()
+ ):
+ for args in (
+ ([bind], {}),
+ ([], {'engine_or_url':bind}),
+ ([], {'bind':bind}),
+ ([], {'engine':bind})
+ ):
+ metadata = MetaData(*args[0], **args[1])
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+
+ assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ metadata.create_all()
+ assert table.exists()
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ assert not table.exists()
+
+
+ def test_clauseelement(self):
+ metadata = MetaData()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+ metadata.create_all(bind=testbase.db)
+ try:
+ for elem in [
+ table.select,
+ lambda **kwargs:func.current_timestamp(**kwargs).select(),
+# func.current_timestamp().select,
+ lambda **kwargs:text("select * from test_table", **kwargs)
+ ]:
+ for bind in (
+ testbase.db,
+ testbase.db.connect()
+ ):
+ e = elem(bind=bind)
+ assert e.bind is e.engine is bind
+ e.execute()
+ e = elem(engine=bind)
+ assert e.bind is e.engine is bind
+ e.execute()
+
+ try:
+ e = elem()
+ assert e.bind is e.engine is None
+ e.execute()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "This Compiled object is not bound to any Engine or Connection."
+
+ finally:
+ metadata.drop_all(bind=testbase.db)
+
+ def test_session(self):
+ metadata = MetaData()
+ table = Table('test_table', metadata,
+ Column('foo', Integer, primary_key=True),
+ Column('data', String(30)))
+ class Foo(object):
+ pass
+ mapper(Foo, table)
+ metadata.create_all(bind=testbase.db)
+ try:
+ for bind in (testbase.db, testbase.db.connect()):
+ for args in ({'bind':bind}, {'bind_to':bind}):
+ sess = create_session(**args)
+ assert sess.bind is sess.bind_to is bind
+ f = Foo()
+ sess.save(f)
+ sess.flush()
+ assert sess.get(Foo, f.foo) is f
+
+ sess = create_session()
+ f = Foo()
+ sess.save(f)
+ try:
+ sess.flush()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e).startswith("Could not locate any Engine or Connection bound to mapper")
+
+ finally:
+ metadata.drop_all(bind=testbase.db)
+
+
+if __name__ == '__main__':
+ testbase.main()
\ No newline at end of file