related error messages. Additionally, when a "connection no
longer open" condition is detected, the entire connection pool
is discarded and replaced with a new instance. #516
+ - the dialects within sqlalchemy.databases become a setuptools
+ entry points. loading the built-in database dialects works the
+ same as always, but if none found will fall back to trying
+ pkg_resources to load an external module [ticket:521]
- sql:
- preliminary support for unicode table names, column names and
SQL statements added, for databases which can support them.
import sqlalchemy.types as sqltypes
import sqlalchemy.exceptions as exceptions
-def dbapi():
- import kinterbasdb
- return kinterbasdb
_initialized_kb = False
self.type_conv = type_conv
self.concurrency_level= concurrency_level
+ def dbapi(cls):
+ import kinterbasdb
+ return kinterbasdb
+ dbapi = classmethod(dbapi)
+
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
if opts.get('port'):
import sqlalchemy.types as sqltypes
import sqlalchemy.exceptions as exceptions
-def dbapi(module_name=None):
- if module_name:
- try:
- dialect_cls = dialect_mapping[module_name]
- return dialect_cls.import_dbapi()
- except KeyError:
- raise exceptions.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name)
- else:
- for dialect_cls in [MSSQLDialect_adodbapi, MSSQLDialect_pymssql, MSSQLDialect_pyodbc]:
- try:
- return dialect_cls.import_dbapi()
- except ImportError, e:
- pass
- else:
- raise ImportError('No DBAPI module detected for MSSQL - please install adodbapi, pymssql or pyodbc')
class MSNumeric(sqltypes.Numeric):
def convert_result_value(self, value, dialect):
self.auto_identity_insert = auto_identity_insert
self.text_as_varchar = False
self.set_default_schema_name("dbo")
-
+
+ def dbapi(cls, module_name=None):
+ if module_name:
+ try:
+ dialect_cls = dialect_mapping[module_name]
+ return dialect_cls.import_dbapi()
+ except KeyError:
+ raise exceptions.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name)
+ else:
+ for dialect_cls in [MSSQLDialect_adodbapi, MSSQLDialect_pymssql, MSSQLDialect_pyodbc]:
+ try:
+ return dialect_cls.import_dbapi()
+ except ImportError, e:
+ pass
+ else:
+ raise ImportError('No DBAPI module detected for MSSQL - please install adodbapi, pymssql or pyodbc')
+ dbapi = classmethod(dbapi)
+
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
opts.update(url.query)
import sqlalchemy.exceptions as exceptions
from array import array
-def dbapi():
- import MySQLdb as mysql
- return mysql
def kw_colspec(self, spec):
if self.unsigned:
def __init__(self, **kwargs):
ansisql.ANSIDialect.__init__(self, default_paramstyle='format', **kwargs)
+ def dbapi(cls):
+ import MySQLdb as mysql
+ return mysql
+ dbapi = classmethod(dbapi)
+
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'db', 'user', 'passwd', 'port'])
opts.update(url.query)
from sqlalchemy.engine import default, base
import sqlalchemy.types as sqltypes
-def dbapi():
- import cx_Oracle
- return cx_Oracle
class OracleNumeric(sqltypes.Numeric):
self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(self.dbapi, k)]
else:
self.ORACLE_BINARY_TYPES = []
-
+
+ def dbapi(cls):
+ import cx_Oracle
+ return cx_Oracle
+ dbapi = classmethod(dbapi)
+
def create_connect_args(self, url):
if url.database:
# if we have a database, then we have a remote host
except:
mxDateTime = None
-def dbapi():
- try:
- import psycopg2 as psycopg
- except ImportError, e:
- try:
- import psycopg
- except ImportError, e2:
- raise e
- return psycopg
class PGInet(sqltypes.TypeEngine):
def get_col_spec(self):
self.use_information_schema = use_information_schema
self.paramstyle = 'pyformat'
+ def dbapi(cls):
+ try:
+ import psycopg2 as psycopg
+ except ImportError, e:
+ try:
+ import psycopg
+ except ImportError, e2:
+ raise e
+ return psycopg
+ dbapi = classmethod(dbapi)
+
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
if opts.has_key('port'):
import sqlalchemy.types as sqltypes
import datetime,time
-def dbapi():
- try:
- from pysqlite2 import dbapi2 as sqlite
- except ImportError, e:
- try:
- from sqlite3 import dbapi2 as sqlite #try the 2.5+ stdlib name.
- except ImportError:
- try:
- sqlite = __import__('sqlite') # skip ourselves
- except ImportError:
- raise e
- return sqlite
class SLNumeric(sqltypes.Numeric):
def get_col_spec(self):
return tuple([int(x) for x in num.split('.')])
self.supports_cast = (self.dbapi is None or vers(self.dbapi.sqlite_version) >= vers("3.2.3"))
+ def dbapi(cls):
+ try:
+ from pysqlite2 import dbapi2 as sqlite
+ except ImportError, e:
+ try:
+ from sqlite3 import dbapi2 as sqlite #try the 2.5+ stdlib name.
+ except ImportError:
+ try:
+ sqlite = __import__('sqlite') # skip ourselves
+ except ImportError:
+ raise e
+ return sqlite
+ dbapi = classmethod(dbapi)
+
def compiler(self, statement, bindparams, **kwargs):
return SQLiteCompiler(self, statement, bindparams, **kwargs)
super(SQLiteIdentifierPreparer, self).__init__(dialect, omit_schema=True)
dialect = SQLiteDialect
-poolclass = pool.SingletonThreadPool
+dialect.poolclass = pool.SingletonThreadPool
# create url.URL object
u = url.make_url(name_or_url)
- # get module from sqlalchemy.databases
- module = u.get_module()
+ dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(module.dialect):
+ for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
dbapi = kwargs.pop('module', None)
if dbapi is None:
dbapi_args = {}
- for k in util.get_func_kwargs(module.dbapi):
+ for k in util.get_func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = kwargs.pop(k)
- dbapi = module.dbapi(**dbapi_args)
+ dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args['dbapi'] = dbapi
# create dialect
- dialect = module.dialect(**dialect_args)
+ dialect = dialect_cls(**dialect_args)
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
raise exceptions.DBAPIError("Connection failed", e)
creator = kwargs.pop('creator', connect)
- poolclass = kwargs.pop('poolclass', getattr(module, 'poolclass', poollib.QueuePool))
+ poolclass = kwargs.pop('poolclass', getattr(dialect_cls, 'poolclass', poollib.QueuePool))
pool_args = {}
# consume pool arguments from kwargs, translating a few of the arguments
# create url.URL object
u = url.make_url(name_or_url)
- # get module from sqlalchemy.databases
- module = u.get_module()
+ dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(module.dialect):
+ for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
# create dialect
- dialect = module.dialect(**dialect_args)
+ dialect = dialect_cls(**dialect_args)
return MockEngineStrategy.MockConnection(dialect, executor)
s += '?' + "&".join(["%s=%s" % (k, self.query[k]) for k in keys])
return s
- def get_module(self):
- """Return the SQLAlchemy database module corresponding to this URL's driver name."""
+ def get_dialect(self):
+ """Return the SQLAlchemy database dialect class corresponding to this URL's driver name."""
+ dialect=None
if self.drivername == 'ansi':
import sqlalchemy.ansisql
- return sqlalchemy.ansisql
-
+ return sqlalchemy.ansisql.dialect
+
try:
- return getattr(__import__('sqlalchemy.databases.%s' % self.drivername).databases, self.drivername)
+ module=getattr(__import__('sqlalchemy.databases.%s' % self.drivername).databases, self.drivername)
+ dialect=module.dialect
except ImportError:
if sys.exc_info()[2].tb_next is None:
- raise exceptions.ArgumentError('unknown database %r' % self.drivername)
- raise
-
+ import pkg_resources
+ for res in pkg_resources.iter_entry_points('sqlalchemy.databases'):
+ if res.name==self.drivername:
+ dialect=res.load()
+ else:
+ raise
+ if dialect is not None:
+ return dialect
+ raise ImportError('unknown database %r' % self.drivername)
+
def translate_connect_args(self, names):
"""Translate this URL's attributes into a dictionary of connection arguments.
url = "http://www.sqlalchemy.org",
packages = find_packages('lib'),
package_dir = {'':'lib'},
+ entry_points = {
+ 'sqlalchemy.databases': [
+ '%s = sqlalchemy.databases.%s:dialect' % (f,f) for f in
+ ['sqlite', 'postgres', 'mysql', 'oracle', 'mssql', 'firebird']]},
license = "MIT License",
long_description = """\
SQLAlchemy is:
def testbadargs(self):
# good arg, use MockDBAPI to prevent oracle import errors
e = create_engine('oracle://', use_ansi=True, module=MockDBAPI())
-
+
+ try:
+ e = create_engine("foobar://", module=MockDBAPI())
+ assert False
+ except ImportError:
+ assert True
+
# bad arg
try:
e = create_engine('postgres://', use_ansi=True, module=MockDBAPI())
class AdaptTest(PersistTest):
def testadapt(self):
- e1 = url.URL('postgres').get_module().dialect()
- e2 = url.URL('mysql').get_module().dialect()
- e3 = url.URL('sqlite').get_module().dialect()
+ e1 = url.URL('postgres').get_dialect()()
+ e2 = url.URL('mysql').get_dialect()()
+ e3 = url.URL('sqlite').get_dialect()()
type = String(40)