--- /dev/null
+from sqlalchemy import *
+from sqlalchemy.ext.proxy import ProxyEngine
+
+from testbase import PersistTest
+import testbase
+
+#
+# Define an engine, table and mapper at the module level, to show that the
+# table and mapper can be used with different real engines in multiple threads
+#
+module_engine = ProxyEngine()
+
+users = Table('users', module_engine,
+ Column('user_id', Integer, primary_key=True),
+ Column('user_name', String(16)),
+ Column('password', String(20))
+ )
+
+class User(object):
+ pass
+
+assign_mapper(User, users)
+
+class ProxyEngineTest(PersistTest):
+
+ def setUp(self):
+ objectstore.clear()
+
+ def test_engine_connect(self):
+
+ # connect to a real engine
+ module_engine.connect(testbase.db_uri)
+ users.create()
+
+ objectstore.begin()
+
+ user = User()
+ user.user_name='fred'
+ user.password='*'
+ objectstore.commit()
+
+ # select
+ sqluser = User.select_by(user_name='fred')[0]
+ assert sqluser.user_name == 'fred'
+
+ # modify
+ sqluser.user_name = 'fred jones'
+
+ # commit - saves everything that changed
+ objectstore.commit()
+
+ allusers = [ user.user_name for user in User.select() ]
+ assert allusers == [ 'fred jones' ]
+ users.drop()
+
+ def test_multi_thread(self):
+ from threading import Thread
+ from Queue import Queue
+
+ # start 2 threads with different connection params
+ # and perform simultaneous operations, showing that the
+ # 2 threads don't share a connection
+ qa = Queue()
+ qb = Queue()
+ def run(db_uri, uname, queue):
+ def test():
+ try:
+ module_engine.connect(db_uri)
+ users.create()
+
+ objectstore.begin()
+
+ all = User.select()[:]
+ assert all == []
+
+ u = User()
+ u.user_name = uname
+ u.password = 'whatever'
+ objectstore.commit()
+
+ names = [ us.user_name for us in User.select() ]
+ assert names == [ uname ]
+ users.drop()
+
+ except Exception, e:
+ import traceback
+ traceback.print_exc()
+ queue.put(e)
+ else:
+ queue.put(False)
+ return test
+
+ # NOTE: I'm not sure how to give the test runner the option to
+ # override these uris, or how to safely clear them after test runs
+ a = Thread(target=run('sqlite://filename=threadtesta.db', 'jim', qa))
+ b = Thread(target=run('sqlite://filename=threadtestb.db', 'joe', qb))
+
+ a.start()
+ b.start()
+
+ # block and wait for the threads to push their results
+ res = qa.get(True)
+ if res != False:
+ raise res
+
+ res = qb.get(True)
+ if res != False:
+ raise res
+
+ def test_table_singleton_a(self):
+ """set up for table singleton check
+ """
+
+ #
+ # For this 'test', create a proxy engine instance, connect it
+ # to a real engine, and make it do some work
+ #
+ engine = ProxyEngine()
+ cats = Table('cats', engine,
+ Column('cat_id', Integer, primary_key=True),
+ Column('cat_name', String))
+
+ engine.connect(testbase.db_uri)
+ cats.create()
+ cats.drop()
+
+ ProxyEngineTest.cats_table_a = cats
+ assert isinstance(cats, Table)
+
+ def test_table_singleton_b(self):
+ """check that a table on a 2nd proxy engine instance gets 2nd table
+ instance
+ """
+
+ #
+ # Now create a new proxy engine instance and attach the same
+ # table as the first test. This should result in 2 table instances,
+ # since different proxy engine instances can't attach to the
+ # same table instance
+ #
+ engine = ProxyEngine()
+ cats = Table('cats', engine,
+ Column('cat_id', Integer, primary_key=True),
+ Column('cat_name', String))
+ assert id(cats) != id(ProxyEngineTest.cats_table_a)
+
+ # the real test -- if we're still using the old engine reference,
+ # this will fail because the old reference's local storage will
+ # not have the default attributes
+ engine.connect(testbase.db_uri)
+ cats.create()
+ cats.drop()
+
+ def test_type_engine_caching(self):
+ from sqlalchemy.engine import SQLEngine
+ import sqlalchemy.types as sqltypes
+
+ class EngineA(SQLEngine):
+ def __init__(self):
+ pass
+
+ def hash_key(self):
+ return 'a'
+
+ def type_descriptor(self, typeobj):
+ if typeobj == type(1):
+ return TypeEngineX2()
+ else:
+ return TypeEngineSTR()
+
+ class EngineB(SQLEngine):
+ def __init__(self):
+ pass
+
+ def hash_key(self):
+ return 'b'
+
+ def type_descriptor(self, typeobj):
+ return TypeEngineMonkey()
+
+ class TypeEngineX2(sqltypes.TypeEngine):
+ def convert_bind_param(self, value, engine):
+ return value * 2
+
+ class TypeEngineSTR(sqltypes.TypeEngine):
+ def convert_bind_param(self, value, engine):
+ return repr(str(value))
+
+ class TypeEngineMonkey(sqltypes.TypeEngine):
+ def convert_bind_param(self, value, engine):
+ return 'monkey'
+
+ engine = ProxyEngine()
+ engine.storage.engine = EngineA()
+
+ a = engine.type_descriptor(type(1))
+ assert a.convert_bind_param(12, engine) == 24
+ assert a.convert_bind_param([1,2,3], engine) == [1, 2, 3, 1, 2, 3]
+
+ a2 = engine.type_descriptor(type('hi'))
+ assert a2.convert_bind_param(12, engine) == "'12'"
+ assert a2.convert_bind_param([1,2,3], engine) == "'[1, 2, 3]'"
+
+ engine.storage.engine = EngineB()
+ b = engine.type_descriptor(type(1))
+ assert b.convert_bind_param(12, engine) == 'monkey'
+ assert b.convert_bind_param([1,2,3], engine) == 'monkey'
+
+
+if __name__ == "__main__":
+ testbase.main()
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
echo = True
#echo = 'debug'
db = None
-
+db_uri = None
def parse_argv():
# we are using the unittest main runner, so we are just popping out the
else:
DBTYPE = 'sqlite'
- global db
+ global db, db_uri
if DBTYPE == 'sqlite':
try:
- db = engine.create_engine('sqlite://filename=:memory:', echo=echo, default_ordering=True)
+ db_uri = 'sqlite://filename=:memory:'
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
except:
raise "Could not create sqlite engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
elif DBTYPE == 'sqlite_file':
- db = engine.create_engine('sqlite://filename=querytest.db', echo=echo, default_ordering=True)
+ db_uri = 'sqlite://filename=querytest.db'
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
elif DBTYPE == 'postgres':
- db = engine.create_engine('postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger',
- echo=echo, default_ordering=True)
+ db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
elif DBTYPE == 'mysql':
- db = engine.create_engine('mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger', echo=echo, default_ordering=True)
+ db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
elif DBTYPE == 'oracle':
- db = engine.create_engine('oracle://user=scott&password=tiger', echo=echo, default_ordering=True)
+ db_uri = 'oracle://user=scott&password=tiger'
+ db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
db = EngineAssert(db)
class PersistTest(unittest.TestCase):
def main():
unittest.main()
+
+
--- /dev/null
+"""Interactive wsgi test
+
+Small WSGI application that uses a table and mapper defined at the module
+level, with per-application uris enabled by the ProxyEngine.
+
+Requires the wsgiutils package from:
+
+http://www.owlfish.com/software/wsgiutils/
+
+Run the script with python wsgi_test.py, then visit http://localhost:8080/a
+and http://localhost:8080/b with a browser. You should see two distinct lists
+of colors.
+"""
+
+from sqlalchemy import *
+from sqlalchemy.ext.proxy import ProxyEngine
+from wsgiutils import wsgiServer
+
+engine = ProxyEngine()
+
+colors = Table('colors', engine,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(32)),
+ Column('hex', String(6)))
+
+class Color(object):
+ pass
+
+assign_mapper(Color, colors)
+
+data = { 'a': (('fff','white'), ('aaa','gray'), ('000','black'),
+ ('f00', 'red'), ('0f0', 'green')),
+ 'b': (('00f','blue'), ('ff0', 'yellow'), ('0ff','purple')) }
+
+db_uri = { 'a': 'sqlite://filename=wsgi_db_a.db',
+ 'b': 'sqlite://filename=wsgi_db_b.db' }
+
+def app(dataset):
+ print '... connecting to database %s: %s' % (dataset, db_uri[dataset])
+ engine.connect(db_uri[dataset], echo=True, echo_pool=True)
+ colors.create()
+
+ print '... populating data into %s' % db_uri[dataset]
+ for hex, name in data[dataset]:
+ c = Color()
+ c.hex = hex
+ c.name = name
+ objectstore.commit()
+ objectstore.clear()
+
+ def call(environ, start_response):
+ engine.connect(db_uri[dataset], echo=True, echo_pool=True)
+
+ # NOTE: must clear objectstore on each request, or you'll see
+ # objects from another thread here
+ objectstore.clear()
+ objectstore.begin()
+
+ c = Color.select()
+
+ start_response('200 OK', [('content-type','text/html')])
+ yield '<html><head><title>Test dataset %s</title></head>' % dataset
+ yield '<body>'
+ yield '<p>uri: %s</p>' % db_uri[dataset]
+ yield '<p>engine: <xmp>%s</xmp></p>' % engine.engine
+ yield '<p>Colors!</p>'
+ for color in c:
+ yield '<div style="background: #%s">%s</div>' % (color.hex,
+ color.name)
+ yield '</body></html>'
+ return call
+
+def cleanup():
+ for uri in db_uri.values():
+ print "Cleaning db %s" % uri
+ engine.connect(uri)
+ colors.drop()
+
+def run_server(apps, host='localhost', port=8080):
+ print "Serving test app at http://%s:%s/" % (host, port)
+ print "Visit http://%(host)s:%(port)s/a and " \
+ "http://%(host)s:%(port)s/b to test apps" % {'host': host,
+ 'port': port}
+
+ server = wsgiServer.WSGIServer((host, port), apps, serveFiles=False)
+ try:
+ server.serve_forever()
+ except:
+ cleanup()
+ raise
+
+if __name__ == '__main__':
+ run_server({'/a':app('a'), '/b':app('b')})
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+