from exceptions import *
import mapping as mapperlib
from mapping import *
+
+import sqlalchemy.schema
+import sqlalchemy.ext.proxy
+sqlalchemy.schema.default_engine = sqlalchemy.ext.proxy.ProxyEngine()
self.opts['port'] = str(self.opts['port'])
ansisql.ANSISQLEngine.__init__(self, **params)
-
+
def connect_args(self):
return [[], self.opts]
else:
ischema_names = pg1_ischema_names
- ischema.reflecttable(self, table, ischema_names)
+ # give ischema the given table's engine with which to look up
+ # other tables, not 'self', since it could be a ProxyEngine
+ ischema.reflecttable(table.engine, table, ischema_names)
class PGCompiler(ansisql.ANSICompiler):
break
(tablename, localcol, remotecol) = (row[2], row[3], row[4])
#print "row! " + repr(row)
- remotetable = Table(tablename, self, autoload = True)
+ # look up the table based on the given table's engine, not 'self',
+ # since it could be a ProxyEngine
+ remotetable = Table(tablename, table.engine, autoload = True)
table.c[localcol].append_item(schema.ForeignKey(remotetable.c[remotecol]))
# check for UNIQUE indexes
c = self.execute("PRAGMA index_list(" + table.name + ")", {})
# get a handle on the connection pool via the connect arguments
# this insures the SQLEngine instance integrates with the pool referenced
# by direct usage of pool.manager(<module>).connect(*args, **params)
+ schema.SchemaEngine.__init__(self)
(cargs, cparams) = self.connect_args()
if pool is None:
params['echo'] = echo_pool
self.echo_uow = echo_uow
self.convert_unicode = convert_unicode
self.context = util.ThreadLocal(raiseerror=False)
- self.tables = {}
self._ischema = None
self._figure_paramstyle()
if logger is None:
def hash_key(self):
return "%s(%s)" % (self.__class__.__name__, repr(self.connect_args()))
+
+ def _get_name(self):
+ return sys.modules[self.__module__].descriptor()['name']
+ name = property(_get_name)
def dispose(self):
"""disposes of the underlying pool manager for this SQLEngine."""
from sqlalchemy import sql
from sqlalchemy.engine import create_engine
from sqlalchemy.types import TypeEngine
-
+import sqlalchemy.schema as schema
import thread, weakref
-class BaseProxyEngine(object):
+class BaseProxyEngine(schema.SchemaEngine):
'''
Basis for all proxy engines
'''
- def __init__(self):
- self.tables = {}
-
+
def get_engine(self):
raise NotImplementedError
engine = property(get_engine, set_engine)
+ def reflecttable(self, table):
+ return self.get_engine().reflecttable(table)
+
def hash_key(self):
return "%s(%s)" % (self.__class__.__name__, id(self))
classes for TypeEngine.
"""
- def __init__(self):
+ def __init__(self, **kwargs):
BaseProxyEngine.__init__(self)
# create the local storage for uri->engine map and current engine
self.storage = local()
self.storage.connection = {}
self.storage.engine = None
+ self.kwargs = kwargs
def connect(self, uri, opts=None, **kwargs):
"""Establish connection to a real engine.
"""
+ kw = self.kwargs.copy()
+ kw.update(kwargs)
+ kwargs = kw
key = "%s(%s,%s)" % (uri, repr(opts), repr(kwargs))
try:
map = self.storage.connection
class TableSingleton(type):
"""a metaclass used by the Table object to provide singleton behavior."""
- def __call__(self, name, engine, *args, **kwargs):
+ def __call__(self, name, engine=None, *args, **kwargs):
try:
+ if engine is None:
+ engine = default_engine
name = str(name) # in case of incoming unicode
schema = kwargs.get('schema', None)
autoload = kwargs.pop('autoload', False)
metaclass constructor."""
self._clear()
- print "RELOAD VALUES", args
self._init_items(*args)
def append_item(self, item):
if isinstance(self._colspec, str):
return self._colspec
elif self._colspec.table.schema is not None:
- return "%s.%s.%s" % (self._colspec.table.schema, self._colspec.table.name, self._colspec.column.key)
+ return "%s.%s.%s" % (self._colspec.table.schema, self._colspec.table.name, self._colspec.key)
else:
return "%s.%s" % (self._colspec.table.name, self._colspec.key)
self._column = table.c[colname]
else:
self._column = self._colspec
-
return self._column
column = property(lambda s: s._init_column())
class SchemaEngine(object):
"""a factory object used to create implementations for schema objects. This object
is the ultimate base class for the engine.SQLEngine class."""
+
+ def __init__(self):
+ # a dictionary that stores Table objects keyed off their name (and possibly schema name)
+ self.tables = {}
+
def reflecttable(self, table):
"""given a table, will query the database and populate its Column and ForeignKey
objects."""
mysql_engine='InnoDB'
)
- print repr(users)
- print repr(addresses)
# users.c.parent_user_id.set_foreign_key(ForeignKey(users.c.user_id))
# clear out table registry
users.deregister()
addresses.deregister()
-
+
try:
users = Table('engine_users', testbase.db, autoload = True)
addresses = Table('engine_email_addresses', testbase.db, autoload = True)
finally:
addresses.drop()
users.drop()
-
+
users.create()
addresses.create()
try:
# we can now as long as we use InnoDB
# if testbase.db.engine.__module__.endswith('mysql'):
# addresses.c.remote_user_id.append_item(ForeignKey('engine_users.user_id'))
+ print users
+ print addresses
j = join(users, addresses)
print str(j.onclause)
self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
b = Bar('barfoo')
objectstore.commit()
- b.foos.append(Foo('subfoo1'))
- b.foos.append(Foo('subfoo2'))
+ f1 = Foo('subfoo1')
+ f2 = Foo('subfoo2')
+ b.foos.append(f1)
+ b.foos.append(f2)
objectstore.commit()
objectstore.clear()
print l[0].foos
self.assert_result(l, Bar,
# {'id':1, 'data':'barfoo', 'bid':1, 'foos':(Foo, [{'id':2,'data':'subfoo1'}, {'id':3,'data':'subfoo2'}])},
- {'id':1, 'data':'barfoo', 'foos':(Foo, [{'id':2,'data':'subfoo1'}, {'id':3,'data':'subfoo2'}])},
+ {'id':b.id, 'data':'barfoo', 'foos':(Foo, [{'id':f1.id,'data':'subfoo1'}, {'id':f2.id,'data':'subfoo2'}])},
)
that PassiveDefault upon insert, even though PassiveDefault says
"let the database execute this", because in postgres we must have all the primary
key values in memory before insert; otherwise we cant locate the just inserted row."""
- if not db.engine.__module__.endswith('postgres'):
+ if db.engine.name != 'postgres':
return
try:
db.execute("""
x['x'] += 1
return x['x']
- use_function_defaults = db.engine.__module__.endswith('postgres') or db.engine.__module__.endswith('oracle')
- is_oracle = db.engine.__module__.endswith('oracle')
+ use_function_defaults = db.engine.name == 'postgres' or db.engine.name == 'oracle'
+ is_oracle = db.engine.name == 'oracle'
# select "count(1)" from the DB which returns different results
# on different DBs
import unittest
import StringIO
import sqlalchemy.engine as engine
+import sqlalchemy.ext.proxy
import re, sys
echo = True
global db, db_uri
DBTYPE = 'sqlite'
-
+ PROXY = False
+
if len(sys.argv) >= 3:
if sys.argv[1] == '--dburi':
(param, db_uri) = (sys.argv.pop(1), sys.argv.pop(1))
elif sys.argv[1] == '--db':
(param, DBTYPE) = (sys.argv.pop(1), sys.argv.pop(1))
+
if (None == db_uri):
+ p = DBTYPE.split('.')
+ if len(p) > 1:
+ arg = p[0]
+ DBTYPE = p[1]
+ if arg == 'proxy':
+ PROXY = True
if DBTYPE == 'sqlite':
db_uri = 'sqlite://filename=:memory:'
elif DBTYPE == 'sqlite_file':
if not db_uri:
raise "Could not create engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
+ if PROXY:
+ db = sqlalchemy.ext.proxy.ProxyEngine(echo=echo, default_ordering=True)
+ db.connect(db_uri)
+ else:
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
db = EngineAssert(db)
class PersistTest(unittest.TestCase):
else:
self.assert_(getattr(rowobj, key) == value, "attribute %s value %s does not match %s" % (key, getattr(rowobj, key), value))
def assert_sql(self, db, callable_, list, with_sequences=None):
- if with_sequences is not None and (db.engine.__module__.endswith('postgres') or db.engine.__module__.endswith('oracle')):
+ if with_sequences is not None and (db.engine.name == 'postgres' or db.engine.name == 'oracle'):
db.set_assert_list(self, with_sequences)
else:
db.set_assert_list(self, list)
callable_()
finally:
self.assert_(db.sql_count == count, "desired statement count %d does not match %d" % (count, db.sql_count))
-
+
class EngineAssert(object):
"""decorates a SQLEngine object to match the incoming queries against a set of assertions."""
def __init__(self, engine):
self.engine = engine
self.realexec = engine.post_exec
- engine.post_exec = self.post_exec
+ self.realexec.im_self.post_exec = self.post_exec
self.logger = engine.logger
self.set_assert_list(None, None)
self.sql_count = 0