columns joined by a "group" to load as "undeferred".
- sql
+ - DynamicMetaData has been renamed to ThreadLocalMetaData
+ - BoundMetaData has been removed- regular MetaData is equivalent
- significant architectural overhaul to SQL elements (ClauseElement).
all elements share a common "mutability" framework which allows a
consistent approach to in-place modifications of elements as well as
{python title="Implicit Execution Using Engine-Bound SQL Construct"}
engine = create_engine('sqlite:///:memory:')
- meta = BoundMetaData(engine)
+ meta = MetaData(engine)
table = Table('mytable', meta, Column('col1', Integer), Column('col2', String(20)))
r = table.insert().execute(col1=5, col2='some record')
A MetaData object can be associated with one or more Engine instances. This allows the MetaData and the elements within it to perform operations automatically, using the connection resources of that Engine. This includes being able to "reflect" the columns of tables, as well as to perform create and drop operations without needing to pass an `Engine` or `Connection` around. It also allows SQL constructs to be created which know how to execute themselves (called "implicit execution").
-To bind `MetaData` to a single `Engine`, use `BoundMetaData`:
+To bind `MetaData` to a single `Engine`, supply an Engine when creating the Meta
+Data, or use the `.connect()` method:
{python}
engine = create_engine('sqlite://', **kwargs)
- # create BoundMetaData from an Engine
- meta = BoundMetaData(engine)
+ # create MetaData from an Engine
+ meta = MetaData(engine)
# create the Engine and MetaData in one step
- meta = BoundMetaData('postgres://db/', **kwargs)
+ meta = MetaData('postgres://db/', **kwargs)
+
+ # or bind the engine later
+ meta = MetaData()
+ # ...
+ meta.connect(engine)
Another form of `MetaData` exits which can connect to an engine within the current thread (or "on a per-thread basis"), allowing other threads to be connected to different engines simultaneously:
{python}
- meta = DynamicMetaData()
+ meta = ThreadLocalMetaData()
# In thread 1, connect to an existing Engine
meta.connect(engine)
# Meanwhile in thread 2, create a new Engine and connect
meta.connect('mysq://user@host/dsn')
-`DynamicMetaData` is intended for applications that need to use the same set of `Tables` for many different database connections in the same process, such as a CherryPy web application which handles multiple application instances in one process.
+`ThreadLocalMetaData` is intended for applications that need to use the same set of `Tables` for many different database connections in the same process, such as a CherryPy web application which handles multiple application instances in one process.
#### Using the global Metadata object
#### Reflecting Tables
-Once you have a `BoundMetaData` or a connected `DynamicMetaData`, you can create `Table` objects without specifying their columns, just their names, using `autoload=True`:
+Once you have a connected `MetaData` or `ThreadLocalMetaData`, you can create `Table` objects without specifying their columns, just their names, using `autoload=True`:
{python}
>>> messages = Table('messages', meta, autoload = True)
Creating and dropping individual tables can be done via the `create()` and `drop()` methods of `Table`; these methods take an optional `connectable` parameter which references an `Engine` or a `Connection`. If not supplied, the `Engine` bound to the `MetaData` will be used, else an error is raised:
{python}
- meta = BoundMetaData('sqlite:///:memory:')
+ meta = MetaData('sqlite:///:memory:')
employees = Table('employees', meta,
Column('employee_id', Integer, primary_key=True),
Column('employee_name', String(60), nullable=False, key='name'),
{python}
# create two metadata
- meta1 = BoundMetaData('sqlite:///querytest.db')
+ meta1 = MetaData('sqlite:///querytest.db')
meta2 = MetaData()
# load 'users' from the sqlite engine
import sqlalchemy.mods.threadlocal
from sqlalchemy import *
- metadata = BoundMetaData('sqlite:///')
+ metadata = MetaData('sqlite:///')
user_table = Table('users', metadata,
Column('user_id', Integer, primary_key=True),
Column('user_name', String(50), nullable=False)
import sqlalchemy.mods.threadlocal
from sqlalchemy import *
- metadata = BoundMetaData('sqlite:///')
+ metadata = MetaData('sqlite:///')
user_table = Table('users', metadata,
Column('user_id', Integer, primary_key=True),
Column('user_name', String(50), nullable=False)
The examples below all include a dump of the generated SQL corresponding to the query object, as well as a dump of the statement's bind parameters. In all cases, bind parameters are shown as named parameters using the colon format (i.e. ':name'). When the statement is compiled into a database-specific version, the named-parameter statement and its bind values are converted to the proper paramstyle for that database automatically.
-For this section, we will mostly use the implcit style of execution, meaning the `Table` objects are associated with an instance of `BoundMetaData`, and constructed `ClauseElement` objects support self-execution. Assume the following configuration:
+For this section, we will mostly use the implcit style of execution, meaning the `Table` objects are associated with a bound instance of `MetaData`, and constructed `ClauseElement` objects support self-execution. Assume the following configuration:
{python}
from sqlalchemy import *
- metadata = BoundMetaData('sqlite:///mydb.db', echo=True)
+ metadata = MetaData('sqlite:///mydb.db', echo=True)
# a table to store users
users = Table('users', metadata,
### Defining Metadata, Binding to Engines {@name=metadata}
-Configuring SQLAlchemy for your database consists of creating objects called `Tables`, each of which represent an actual table in the database. A collection of `Table` objects resides in a `MetaData` object which is essentially a table collection. We will create a handy form of `MetaData` that automatically connects to our `Engine` (connecting a schema object to an Engine is called *binding*):
+Configuring SQLAlchemy for your database consists of creating objects called `Tables`, each of which represent an actual table in the database. A collection of `Table` objects resides in a `MetaData` object which is essentially a table collection. We will create a `MetaData` and connect it to our `Engine` (connecting a schema object to an Engine is called *binding*):
{python}
- >>> metadata = BoundMetaData(db)
+ >>> metadata = MetaData()
+ >>> metadata.connect(db)
-An equivalent operation is to create the `BoundMetaData` object directly with an Engine URL, which calls the `create_engine` call for us:
+An equivalent operation is to create the `MetaData` object directly with an Engine URL, which calls the `create_engine` call for us:
{python}
- >>> metadata = BoundMetaData('sqlite:///tutorial.db')
+ >>> metadata = MetaData('sqlite:///tutorial.db')
Now, when we tell "metadata" about the tables in our database, we can issue CREATE statements for those tables, as well as execute SQL statements derived from them, without needing to open or close any connections; that will be all done automatically.
... Column('password', String(10))
... )
-As you might have guessed, we have just defined a table named `users` which has three columns: `user_id` (which is a primary key column), `user_name` and `password`. Currently it is just an object that doesn't necessarily correspond to an existing table in our database. To actually create the table, we use the `create()` method. To make it interesting, we will have SQLAlchemy echo the SQL statements it sends to the database, by setting the `echo` flag on the `Engine` associated with our `BoundMetaData`:
+As you might have guessed, we have just defined a table named `users` which has three columns: `user_id` (which is a primary key column), `user_name` and `password`. Currently it is just an object that doesn't necessarily correspond to an existing table in our database. To actually create the table, we use the `create()` method. To make it interesting, we will have SQLAlchemy echo the SQL statements it sends to the database, by setting the `echo` flag on the `Engine` associated with our `MetaData`:
{python}
>>> metadata.engine.echo = True
from sqlalchemy import *
from sqlalchemy.util import OrderedDict
-metadata = BoundMetaData('sqlite:///', echo=True)
+metadata = MetaData('sqlite:///', echo=True)
trees = Table('treenodes', metadata,
Column('node_id', Integer, Sequence('treenode_id_seq',optional=False), primary_key=True),
engine = create_engine('sqlite:///:memory:', echo=True)
-metadata = BoundMetaData(engine)
+metadata = MetaData(engine)
"""create the treenodes table. This is ia basic adjacency list model table.
One additional column, "root_node_id", references a "root node" row and is used
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
engine = create_engine('sqlite://')
-metadata = BoundMetaData(engine)
+metadata = MetaData(engine)
orders = Table('orders', metadata,
Column('order_id', Integer, primary_key=True),
#logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
engine = create_engine('sqlite://')
-metadata = BoundMetaData(engine)
+metadata = MetaData(engine)
orders = Table('orders', metadata,
Column('order_id', Integer, primary_key=True),
"""illlustrates techniques for dealing with very large collections"""
from sqlalchemy import *
-meta = BoundMetaData('sqlite://', echo=True)
+meta = MetaData('sqlite://', echo=True)
org_table = Table('organizations', meta,
Column('org_id', Integer, primary_key=True),
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
-meta = BoundMetaData('sqlite://')
+meta = MetaData('sqlite://')
nodes = Table('nodes', meta,
Column("nodeid", Integer, primary_key=True)
from pickle import Pickler, Unpickler
import threading
-meta = BoundMetaData('sqlite://', echo=True)
+meta = MetaData('sqlite://', echo=True)
class MyExt(MapperExtension):
def populate_instance(self, mapper, selectcontext, row, instance, identitykey, isnew):
del MyPickler.sessions.current
f = sess.query(Foo).get(f.id)
-assert f.bar.data == 'some bar'
\ No newline at end of file
+assert f.bar.data == 'some bar'
# this example illustrates a polymorphic load of two classes
-metadata = BoundMetaData('sqlite://', echo=True)
+metadata = MetaData('sqlite://', echo=True)
# a table to store companies
companies = Table('companies', metadata,
from sqlalchemy import *
-metadata = BoundMetaData('sqlite://', echo='debug')
+metadata = MetaData('sqlite://', echo='debug')
# a table to store companies
companies = Table('companies', metadata,
from sqlalchemy.orm.collections import mapped_collection
import datetime
-e = BoundMetaData('sqlite://', echo=True)
+e = MetaData('sqlite://', echo=True)
# this table represents Entity objects. each Entity gets a row in this table,
# with a primary key and a title.
for entity in entities:
session.delete(entity)
-session.flush()
\ No newline at end of file
+session.flush()
-from sqlalchemy import join, DynamicMetaData, util, Integer
+from sqlalchemy import join, ThreadLocalMetaData, util, Integer
from sqlalchemy import and_, or_
from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy.orm import class_mapper, relation, mapper, create_session
#
# the "proxy" to the database engine... this can be swapped out at runtime
#
-metadata = DynamicMetaData("activemapper")
+metadata = ThreadLocalMetaData()
try:
objectstore = sqlalchemy.objectstore
or, you can re-use an existing metadata::
- >>> db = SqlSoup(BoundMetaData(e))
+ >>> db = SqlSoup(MetaData(e))
You can optionally specify a schema within the database for your
SqlSoup::
if args or kwargs:
raise ArgumentError('Extra arguments not allowed when metadata is given')
else:
- metadata = BoundMetaData(*args, **kwargs)
+ metadata = MetaData(*args, **kwargs)
self._metadata = metadata
self._cache = {}
self.schema = None
__all__ = ['SchemaItem', 'Table', 'Column', 'ForeignKey', 'Sequence', 'Index', 'ForeignKeyConstraint',
'PrimaryKeyConstraint', 'CheckConstraint', 'UniqueConstraint', 'DefaultGenerator', 'Constraint',
- 'MetaData', 'BoundMetaData', 'DynamicMetaData', 'SchemaVisitor', 'PassiveDefault', 'ColumnDefault']
+ 'MetaData', 'ThreadLocalMetaData', 'SchemaVisitor', 'PassiveDefault', 'ColumnDefault']
class SchemaItem(object):
"""Base class for items that define a database schema."""
__visit_name__ = 'metadata'
- def __init__(self, url=None, engine=None, **kwargs):
+ def __init__(self, engine_or_url=None, **kwargs):
"""create a new MetaData object.
url
all contained objects. defaults to True.
"""
-
+
+ if engine_or_url is None:
+ # limited backwards compatability
+ engine_or_url = kwargs.get('url', None) or kwargs.get('engine', None)
self.tables = {}
self._engine = None
self._set_casing_strategy(kwargs)
- if engine or url:
- self.connect(engine or url, **kwargs)
+ if engine_or_url:
+ self.connect(engine_or_url, **kwargs)
def __getstate__(self):
return {'tables':self.tables, 'casesensitive':self._case_sensitive_setting}
return self
-class BoundMetaData(MetaData):
- """``MetaData`` for which the first argument is a required Engine, url string, or URL instance.
-
- """
-
- __visit_name__ = 'metadata'
-
- def __init__(self, engine_or_url, **kwargs):
- from sqlalchemy.engine.url import URL
- if isinstance(engine_or_url, basestring) or isinstance(engine_or_url, URL):
- super(BoundMetaData, self).__init__(url=engine_or_url, **kwargs)
- else:
- super(BoundMetaData, self).__init__(engine=engine_or_url, **kwargs)
-
-
-class DynamicMetaData(MetaData):
+class ThreadLocalMetaData(MetaData):
"""Build upon ``MetaData`` to provide the capability to bind to
multiple ``Engine`` implementations on a dynamically alterable,
thread-local basis.
__visit_name__ = 'metadata'
- def __init__(self, threadlocal=True, **kwargs):
- if threadlocal:
- self.context = util.ThreadLocal()
- else:
- self.context = self
+ def __init__(self, **kwargs):
+ self.context = util.ThreadLocal()
self.__engines = {}
- super(DynamicMetaData, self).__init__(**kwargs)
+ super(ThreadLocalMetaData, self).__init__(**kwargs)
def connect(self, engine_or_url, **kwargs):
from sqlalchemy.engine.url import URL
return hasattr(self.context, '_engine') and self.context._engine is not None
def dispose(self):
- """Dispose all ``Engines`` to which this ``DynamicMetaData`` has been connected."""
+ """Dispose all ``Engines`` to which this ``ThreadLocalMetaData`` has been connected."""
for e in self.__engines.values():
e.dispose()
engine = property(_get_engine, connect)
+
class SchemaVisitor(sql.ClauseVisitor):
"""Define the visiting for ``SchemaItem`` objects."""
'SMALLINT(4) UNSIGNED ZEROFILL'),
]
- table_args = ['test_mysql_numeric', BoundMetaData(db)]
+ table_args = ['test_mysql_numeric', MetaData(db)]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw)))
'''ENUM('foo','bar') UNICODE''')
]
- table_args = ['test_mysql_charset', BoundMetaData(db)]
+ table_args = ['test_mysql_charset', MetaData(db)]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
table_args.append(Column('c%s' % index, type_(*args, **kw)))
def test_enum(self):
"Exercise the ENUM type"
- enum_table = Table('mysql_enum', BoundMetaData(db),
+ enum_table = Table('mysql_enum', MetaData(db),
Column('e1', mysql.MSEnum('"a"', "'b'")),
Column('e2', mysql.MSEnum('"a"', "'b'"), nullable=False),
Column('e3', mysql.MSEnum('"a"', "'b'", strict=True)),
columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
- m = BoundMetaData(db)
+ m = MetaData(db)
t_table = Table('mysql_types', m, *columns)
m.drop_all()
m.create_all()
- m2 = BoundMetaData(db)
+ m2 = MetaData(db)
rt = Table('mysql_types', m2, autoload=True)
#print
@testbase.supported('postgres')
def test_table_is_reflected(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
@testbase.supported('postgres')
def test_domain_is_reflected(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
@testbase.supported('postgres')
def test_table_is_reflected_alt_schema(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
@testbase.supported('postgres')
def test_schema_domain_is_reflected(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
@testbase.supported('postgres')
def test_crosschema_domain_is_reflected(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table('crosschema', metadata, autoload=True)
self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
class MiscTest(AssertMixin):
@testbase.supported('postgres')
def test_date_reflection(self):
- m1 = BoundMetaData(testbase.db)
+ m1 = MetaData(testbase.db)
t1 = Table('pgdate', m1,
Column('date1', DateTime(timezone=True)),
Column('date2', DateTime(timezone=False))
)
m1.create_all()
try:
- m2 = BoundMetaData(testbase.db)
+ m2 = MetaData(testbase.db)
t2 = Table('pgdate', m2, autoload=True)
assert t2.c.date1.type.timezone is True
assert t2.c.date2.type.timezone is False
@testbase.supported('postgres')
def test_checksfor_sequence(self):
- meta1 = BoundMetaData(testbase.db)
+ meta1 = MetaData(testbase.db)
t = Table('mytable', meta1,
Column('col1', Integer, Sequence('fooseq')))
try:
def test_schema_reflection(self):
"""note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
- meta1 = BoundMetaData(testbase.db)
+ meta1 = MetaData(testbase.db)
users = Table('users', meta1,
Column('user_id', Integer, primary_key = True),
Column('user_name', String(30), nullable = False),
)
meta1.create_all()
try:
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
users = Table('users', meta2, mustexist=True, schema="alt_schema")
that PassiveDefault upon insert."""
try:
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
testbase.db.execute("""
CREATE TABLE speedy_users
(
@testbase.supported('postgres')
def setUpAll(self):
global tztable, notztable, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
# current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
tztable = Table('tztable', metadata,
class ExecuteTest(testbase.PersistTest):
def setUpAll(self):
global users, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
users = Table('users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
deftype2, deftype3 = Integer, Integer
defval2, defval3 = "15", "16"
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
users = Table('engine_users', meta,
Column('user_id', INT, primary_key = True),
def testoverridecolumns(self):
"""test that you can override columns which contain foreign keys to other reflected tables"""
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
users = Table('users', meta,
Column('id', Integer, primary_key=True),
Column('name', String(30)))
meta.create_all()
try:
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
a2 = Table('addresses', meta2,
Column('user_id', Integer, ForeignKey('users.id')),
autoload=True)
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
- meta3 = BoundMetaData(testbase.db)
+ meta3 = MetaData(testbase.db)
u3 = Table('users', meta3, autoload=True)
a3 = Table('addresses', meta3,
Column('user_id', Integer, ForeignKey('users.id')),
def testoverridecolumns2(self):
"""test that you can override columns which contain foreign keys to other reflected tables,
where the foreign key column is also a primary key column"""
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
users = Table('users', meta,
Column('id', Integer, primary_key=True),
Column('name', String(30)))
meta.create_all()
try:
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
a2 = Table('addresses', meta2,
Column('id', Integer, ForeignKey('users.id'), primary_key=True, ),
autoload=True)
#sess.save(add1)
#sess.flush()
- meta3 = BoundMetaData(testbase.db)
+ meta3 = MetaData(testbase.db)
u3 = Table('users', meta3, autoload=True)
a3 = Table('addresses', meta3,
Column('id', Integer, ForeignKey('users.id'), primary_key=True),
@testbase.supported('mysql')
def testmysqltypes(self):
- meta1 = BoundMetaData(testbase.db)
+ meta1 = MetaData(testbase.db)
table = Table(
'mysql_types', meta1,
Column('id', Integer, primary_key=True),
try:
table.drop(checkfirst=True)
table.create()
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
t2 = Table('mysql_types', meta2, autoload=True)
assert isinstance(t2.c.num1.type, mysql.MSInteger)
assert t2.c.num1.type.unsigned
)
""")
try:
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table1 = Table("django_admin_log", meta, autoload=True)
table2 = Table("django_content_type", meta, autoload=True)
j = table1.join(table2)
if (testbase.db.engine.name == 'mysql' and
testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)):
return
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table = Table(
'multi', meta,
Column('multi_id', Integer, primary_key=True),
def test_nonexistent(self):
self.assertRaises(NoSuchTableError, Table,
'fake_table',
- BoundMetaData(testbase.db), autoload=True)
+ MetaData(testbase.db), autoload=True)
def testoverride(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table = Table(
'override_test', meta,
Column('col1', Integer, primary_key=True),
table.create()
# clear out table registry
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
try:
table = Table(
'override_test', meta2,
class OverrideAttributesTest(PersistTest):
def setUpAll(self):
global metadata, table, table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
def setUp(self):
collection_class = self.collection_class
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
parents_table = Table('Parent', metadata,
Column('id', Integer, primary_key=True),
class ScalarTest(PersistTest):
def test_scalar_proxy(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
parents_table = Table('Parent', metadata,
Column('id', Integer, primary_key=True),
class LazyLoadTest(PersistTest):
def setUp(self):
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
parents_table = Table('Parent', metadata,
Column('id', Integer, primary_key=True),
global metadata, slides_table, bullets_table, Slide, Bullet
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
slides_table = Table('test_Slides', metadata,
Column('id', Integer, primary_key=True),
Column('name', String))
def setUpAll(self):
self.install_threadlocal()
global foo, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
foo = Table('foo', metadata,
Column('id', Integer, Sequence('foo_id_seq'), primary_key=True),
Column('bar', Integer),
def setUpAll(self):
self.install_threadlocal()
global metadata, table1, table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('id', Integer, primary_key=True),
)
def setUpAll(self):
self.install_threadlocal()
global metadata, table1, table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('ID', Integer, primary_key=True),
)
class AssociationTest(testbase.PersistTest):
def setUpAll(self):
global items, item_keywords, keywords, metadata, Item, Keyword, KeywordAssociation
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
items = Table('items', metadata,
Column('item_id', Integer, primary_key=True),
Column('name', String(40)),
class AssociationTest2(testbase.PersistTest):
def setUpAll(self):
global table_originals, table_people, table_isauthor, metadata, Originals, People, IsAuthor
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table_originals = Table('Originals', metadata,
Column('ID', Integer, primary_key=True),
Column('Title', String(200), nullable=False),
class EagerTest(AssertMixin):
def setUpAll(self):
global dbmeta, owners, categories, tests, options, Owner, Category, Test, Option, false
- dbmeta = BoundMetaData(testbase.db)
+ dbmeta = MetaData(testbase.db)
# determine a literal value for "false" based on the dialect
false = Boolean().dialect_impl(testbase.db.dialect).convert_bind_param(False, testbase.db.dialect)
class EagerTest2(AssertMixin):
def setUpAll(self):
global metadata, middle, left, right
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
middle = Table('middle', metadata,
Column('id', Integer, primary_key = True),
Column('data', String(50)),
def setUpAll(self):
global ctx, data, metadata, User, Pref, Extra
ctx = SessionContext(create_session)
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
extra = Table("extra", metadata,
Column("extra_id", Integer, Sequence("extra_id_seq", optional=True), primary_key=True),
Column("prefs_id", Integer, ForeignKey("prefs.prefs_id"))
class M2MCascadeTest(testbase.AssertMixin):
def setUpAll(self):
global metadata, a, b, atob
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
a = Table('a', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30))
def setUpAll(self):
global metadata, address_table, businesses, homes
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
address_table = Table('addresses', metadata,
Column('address_id', Integer, primary_key=True),
Column('street', String(30)),
def testone(self):
global metadata, order, employee, product, tax, orderproduct
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
order = Table('orders', metadata,
Column('id', Integer, primary_key=True),
def testtwo(self):
"""test that conflicting backrefs raises an exception"""
global metadata, order, employee, product, tax, orderproduct
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
order = Table('orders', metadata,
Column('id', Integer, primary_key=True),
assert str(e).index("Backrefs do not match") > -1
def testthree(self):
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
node_table = Table("node", metadata,
Column('node_id', Integer, primary_key=True),
Column('name_index', Integer, nullable=True),
"""tests a self-referential mapper, with an additional list of child objects."""
def setUpAll(self):
global t1, t2, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('parent_c1', Integer, ForeignKey('t1.c1')),
"""test self-referential relationship that joins on a column other than the primary key column"""
def setUpAll(self):
global table, meta
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table = Table('item', meta,
Column('id', Integer, primary_key=True),
Column('uuid', String(32), unique=True, nullable=False),
class InheritTestOne(AssertMixin):
def setUpAll(self):
global parent, child1, child2, meta
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
parent = Table("parent", meta,
Column("id", Integer, primary_key=True),
Column("parent_data", String(50)),
"""tests two mappers with a one-to-many relation to each other."""
def setUpAll(self):
global t1, t2, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('c2', Integer, ForeignKey('t2.c1'))
"""tests two mappers with a one-to-many relation to each other, with a second one-to-many on one of the mappers"""
def setUpAll(self):
global t1, t2, t3, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
t1 = Table('t1', metadata,
Column('c1', Integer, Sequence('t1c1_id_seq', optional=True), primary_key=True),
Column('c2', Integer, ForeignKey('t2.c1')),
raise an exception when dependencies are sorted."""
def setUpAll(self):
global metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
global person
global ball
ball = Table('ball', metadata,
"""test using post_update on a single self-referential mapper"""
def setUpAll(self):
global metadata, node_table
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
node_table = Table('node', metadata,
Column('id', Integer, Sequence('nodeid_id_seq', optional=True), primary_key=True),
Column('path', String(50), nullable=False),
class SelfReferentialPostUpdateTest2(AssertMixin):
def setUpAll(self):
global metadata, a_table
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
a_table = Table("a", metadata,
Column("id", Integer(), primary_key=True),
Column("fui", String()),
to have multiple primary mappers """
def setUpAll(self):
global user1, user2, address1, address2, metadata, ctx
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
ctx = SessionContext(create_session)
user1 = Table('user1', metadata,
def setUpAll(self):
self.install_threadlocal()
global foo, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
foo = Table('foo', metadata,
Column('id', Integer, Sequence('foo_id_seq'), primary_key=True),
Column('bar', Integer),
def setUpAll(self):
self.install_threadlocal()
global metadata, table1, table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('id', Integer, primary_key=True),
)
def setUpAll(self):
self.install_threadlocal()
global metadata, table1, table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('ID', Integer, primary_key=True),
)
# +--------------------------------------- has a ------+
global metadata, status, people, engineers, managers, cars
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
# table definitions
status = Table('status', metadata,
Column('status_id', Integer, primary_key=True),
class SingleInheritanceTest(testbase.AssertMixin):
def setUpAll(self):
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
global employees_table
employees_table = Table('employees', metadata,
Column('employee_id', Integer, primary_key=True),
class LazyTest(AssertMixin):
def setUpAll(self):
global info_table, data_table, rel_table, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
info_table = Table('infos', metadata,
Column('pk', Integer, primary_key=True),
Column('info', String))
assert True
def do_test(self):
- metadata = BoundMetaData(engine)
+ metadata = MetaData(engine)
table1 = Table("mytable", metadata,
Column('col1', Integer, primary_key=True),
class O2OTest(testbase.AssertMixin):
def setUpAll(self):
global jack, port, metadata, ctx
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
ctx = SessionContext(create_session)
jack = Table('jack', metadata,
Column('id', Integer, primary_key=True),
is 'joined to itself'."""
def setUpAll(self):
global metadata, company_tbl, employee_tbl
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
company_tbl = Table('company', metadata,
Column('company_id', Integer, primary_key=True),
def setUpAll(self):
global jobs, pageversions, pages, metadata, Job, Page, PageVersion, PageComment
import datetime
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
jobs = Table("jobs", metadata,
Column("jobno", Unicode(15), primary_key=True),
Column("created", DateTime, nullable=False, default=datetime.datetime.now),
UnitOfWorkTest.setUpAll(self)
ctx.current.clear()
global version_table
- version_table = Table('version_test', BoundMetaData(db),
+ version_table = Table('version_test', MetaData(db),
Column('id', Integer, Sequence('version_test_seq'), primary_key=True ),
Column('version_id', Integer, nullable=False),
Column('value', String(40), nullable=False)
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global metadata, uni_table, uni_table2
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
uni_table = Table('uni_test', metadata,
Column('id', Integer, Sequence("uni_test_id_seq", optional=True), primary_key=True),
Column('txt', Unicode(50), unique=True))
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global metadata, table
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table = Table('mutabletest', metadata,
Column('id', Integer, Sequence('mutableidseq', optional=True), primary_key=True),
Column('data', PickleType),
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global table, table2, table3
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
table = Table(
'multipk', metadata,
Column('multi_id', Integer, Sequence("multi_id_seq", optional=True), primary_key=True),
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global metadata, people, peoplesites
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
people = Table("people", metadata,
Column('person', String(10), primary_key=True),
Column('firstname', String(10)),
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global metadata, mytable,myothertable
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
mytable = Table('mytable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
self.hohoval = 9
self.althohoval = 15
global default_table
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
default_table = Table('default_test', metadata,
Column('id', Integer, Sequence("dt_seq", optional=True), primary_key=True),
Column('hoho', hohotype, PassiveDefault(str(self.hohoval))),
ctx.current.clear()
clear_mappers()
global meta, users, addresses
- meta = BoundMetaData(db)
+ meta = MetaData(db)
users = Table('users', meta,
Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),
Column('user_name', String(20)),
+import testbase
from sqlalchemy import *
+from sqlalchemy.orm import *
from testbase import Table, Column
from timeit import Timer
import sys
-meta = DynamicMetaData("time_trial")
+meta = MetaData()
orders = Table('orders', meta,
Column('id', Integer, Sequence('order_id_seq'), primary_key = True),
class LoadTest(AssertMixin):
def setUpAll(self):
global items, meta,subitems
- meta = BoundMetaData(db)
+ meta = MetaData(db)
items = Table('items', meta,
Column('item_id', Integer, primary_key=True),
Column('value', String(100)))
class LoadTest(AssertMixin):
def setUpAll(self):
global items, meta
- meta = BoundMetaData(db)
+ meta = MetaData(db)
items = Table('items', meta,
Column('item_id', Integer, primary_key=True),
Column('value', String(100)))
class SaveTest(AssertMixin):
def setUpAll(self):
global items, metadata
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
items = Table('items', metadata,
Column('item_id', Integer, primary_key=True),
Column('value', String(100)))
print psycopg
db = create_engine('postgres://scott:tiger@127.0.0.1/test',pool=psycopg,strategy='threadlocal')
print db.connection_provider._pool
-metadata = BoundMetaData(db)
+metadata = MetaData(db)
users_table = Table('users', metadata,
Column('user_id', Integer, primary_key=True),
from sqlalchemy.orm import mapperlib
from testbase import Table, Column
-meta = BoundMetaData('sqlite:///foo.db')
+meta = MetaData('sqlite:///foo.db')
t1 = Table('t1', meta,
Column('c1', Integer, primary_key=True),
logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
threadids = set()
-#meta = BoundMetaData('postgres://scott:tiger@127.0.0.1/test')
+#meta = MetaData('postgres://scott:tiger@127.0.0.1/test')
-#meta = BoundMetaData('mysql://scott:tiger@localhost/test', poolclass=pool.SingletonThreadPool)
-meta = BoundMetaData('mysql://scott:tiger@localhost/test')
+#meta = MetaData('mysql://scott:tiger@localhost/test', poolclass=pool.SingletonThreadPool)
+meta = MetaData('mysql://scott:tiger@localhost/test')
foo = Table('foo', meta,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
class CaseTest(testbase.PersistTest):
def setUpAll(self):
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
global info_table
info_table = Table('infos', metadata,
Column('pk', Integer, primary_key=True),
def setUp(self):
global metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
def tearDown(self):
metadata.drop_all()
def setUpAll(self):
global t, f, f2, ts, currenttime, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
x = {'x':50}
def mydefault():
x['x'] += 1
key values in memory before insert; otherwise we cant locate the just inserted row."""
try:
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
testbase.db.execute("""
CREATE TABLE speedy_users
(
class AutoIncrementTest(PersistTest):
@testbase.supported('postgres', 'mysql')
def testnonautoincrement(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
nonai_table = Table("aitest", meta,
Column('id', Integer, autoincrement=False, primary_key=True),
Column('data', String(20)))
nonai_table.drop()
def testwithautoincrement(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table = Table("aitest", meta,
Column('id', Integer, primary_key=True),
Column('data', String(20)))
# TODO: what does this test do that all the various ORM tests dont ?
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
table = Table("aitest", meta,
Column('id', Integer, primary_key=True),
Column('data', String(20)))
try:
# simulate working on a table that doesn't already exist
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
table2 = Table("aitest", meta2,
Column('id', Integer, primary_key=True),
Column('data', String(20)))
@testbase.supported('postgres', 'oracle')
def setUpAll(self):
global cartitems, sometable, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
cartitems = Table("cartitems", metadata,
Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
Column("description", String(40)),
def setUpAll(self):
global users, addresses, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
def test_update_functions(self):
"""test sending functions and SQL expressions to the VALUES and SET clauses of INSERT/UPDATE instances,
and that column-level defaults get overridden"""
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
t = Table('t1', meta,
Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
Column('value', Integer)
@testbase.unsupported('oracle', 'firebird')
def test_column_accessor_shadow(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
shadowed = Table('test_shadowed', meta,
Column('shadow_id', INT, primary_key = True),
Column('shadow_name', VARCHAR(20)),
@testbase.supported('mssql')
def test_fetchid_trigger(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
t1 = Table('t1', meta,
Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
Column('descr', String(200)))
@testbase.supported('mssql')
def test_insertid_schema(self):
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
con = testbase.db.connect()
con.execute('create schema paj')
tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj')
different databases."""
def setUpAll(self):
global metadata, t1, t2, t3
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
t1 = Table('t1', metadata,
Column('col1', Integer, Sequence('t1pkseq'), primary_key=True),
Column('col2', String(30)),
class OperatorTest(PersistTest):
def setUpAll(self):
global metadata, flds
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
flds = Table('flds', metadata,
Column('idcol', Integer, Sequence('t1pkseq'), primary_key=True),
Column('intcol', Integer),
# such as: spaces, quote characters, punctuation characters, set up tests for those as
# well.
global table1, table2, table3
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
table1 = Table('WorstCase1', metadata,
Column('lowercase', Integer, primary_key=True),
Column('UPPERCASE', Integer),
assert(res2==[(1,2,3),(2,2,3),(4,3,2)])
def testreflect(self):
- meta2 = BoundMetaData(testbase.db)
+ meta2 = MetaData(testbase.db)
t2 = Table('WorstCase2', meta2, autoload=True, quote=True)
assert t2.c.has_key('MixedCase')
Column('MixedCase', Integer))
# first test case sensitive tables migrating via tometadata
- meta = BoundMetaData(testbase.db, case_sensitive=False)
+ meta = MetaData(testbase.db, case_sensitive=False)
lc_table1 = table1.tometadata(meta)
lc_table2 = table2.tometadata(meta)
assert lc_table1.case_sensitive is False
class FoundRowsTest(testbase.AssertMixin):
"""tests rowcount functionality"""
def setUpAll(self):
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
global employees_table
db = testbase.db
-metadata = BoundMetaData(db)
+metadata = MetaData(db)
table = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(20)),
def setUpAll(self):
global users
- users = Table('type_users', BoundMetaData(db),
+ users = Table('type_users', MetaData(db),
Column('user_id', Integer, primary_key = True),
# totall custom type
Column('goofy', MyType, nullable = False),
expectedResults['float_column'] = 'float_column FLOAT(25)'
print db.engine.__module__
- testTable = Table('testColumns', BoundMetaData(db),
+ testTable = Table('testColumns', MetaData(db),
Column('int_column', Integer),
Column('smallint_column', Smallinteger),
Column('varchar_column', String(20)),
"""tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
def setUpAll(self):
global unicode_table
- metadata = BoundMetaData(db)
+ metadata = MetaData(db)
unicode_table = Table('unicode_table', metadata,
Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
Column('unicode_varchar', Unicode(250)),
class BinaryTest(AssertMixin):
def setUpAll(self):
global binary_table
- binary_table = Table('binary_table', BoundMetaData(db),
+ binary_table = Table('binary_table', MetaData(db),
Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
Column('data', Binary),
Column('data_slice', Binary(100)),
collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime(timezone=False)),
Column('user_date', Date), Column('user_time', Time)]
- users_with_date = Table('query_users_with_date', BoundMetaData(db), *collist)
+ users_with_date = Table('query_users_with_date', MetaData(db), *collist)
users_with_date.create()
insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
class IntervalTest(AssertMixin):
def setUpAll(self):
global interval_table, metadata
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
interval_table = Table("intervaltable", metadata,
Column("id", Integer, primary_key=True),
Column("interval", Interval),
class BooleanTest(AssertMixin):
def setUpAll(self):
global bool_table
- metadata = BoundMetaData(testbase.db)
+ metadata = MetaData(testbase.db)
bool_table = Table('booltest', metadata,
Column('id', Integer, primary_key=True),
Column('value', Boolean))
t1.insert().execute({u'méil':2, u'éXXm':7})
t2.insert().execute({'a':2, 'b':2})
- meta = BoundMetaData(testbase.db)
+ meta = MetaData(testbase.db)
tt1 = Table(t1.name, meta, autoload=True)
tt2 = Table(t2.name, meta, autoload=True)
tt1.insert().execute({u'méil':1, u'éXXm':5})
ECHO = testbase.echo
db = testbase.db
-metadata = BoundMetaData(db)
+metadata = MetaData(db)
users = Table('users', metadata,
Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),
import os, unittest, StringIO, re, ConfigParser, optparse
sys.path.insert(0, os.path.join(os.getcwd(), 'lib'))
import sqlalchemy
-from sqlalchemy import sql, schema, engine, pool, BoundMetaData
+from sqlalchemy import sql, schema, engine, pool, MetaData
from sqlalchemy.orm import clear_mappers
db = None
if options.log_debug is not None:
for elem in options.log_debug:
logging.getLogger(elem).setLevel(logging.DEBUG)
- metadata = sqlalchemy.BoundMetaData(db)
+ metadata = sqlalchemy.MetaData(db)
def unsupported(*dbs):
"""a decorator that marks a test as unsupported by one or more database implementations"""
keep_data = False
def setUpAll(self):
global _otest_metadata
- _otest_metadata = BoundMetaData(db)
+ _otest_metadata = MetaData(db)
self.define_tables(_otest_metadata)
_otest_metadata.create_all()
def define_tables(self, _otest_metadata):