A full series of setup assuming sa/master:
- disk init name="translog", physname="/opt/sybase/data/translog.dat", size="10M"
- create database sqlalchemy on default log on translog="10M"
- sp_dboption sqlalchemy, "trunc log on chkpt", true
- sp_addlogin scott, "tiger7"
- sp_addlogin test_schema, "tiger7"
- use sqlalchemy
- sp_adduser scott
- sp_adduser test_schema
- grant all to scott
+ disk init name="translog", physname="/opt/sybase/data/translog.dat", size="10M"
+ create database sqlalchemy on default log on translog="10M"
+ sp_dboption sqlalchemy, "trunc log on chkpt", true
+ sp_addlogin scott, "tiger7"
+ sp_addlogin test_schema, "tiger7"
+ use sqlalchemy
+ sp_adduser scott
+ sp_adduser test_schema
+ grant all to scott
sp_role "grant", sa_role, scott
Sybase will still freeze for up to a minute when the log becomes
======
.. automodule:: sqlalchemy.dialects.sybase.base
+
+python-sybase notes
+-------------------
+
+.. automodule:: sqlalchemy.dialects.sybase.pysybase
+
+pyodbc notes
+------------
+
+.. automodule:: sqlalchemy.dialects.sybase.pyodbc
+
+mxodbc notes
+------------
+
+.. automodule:: sqlalchemy.dialects.sybase.mxodbc
+
from sqlalchemy.dialects.firebird.base import FBDialect, FBCompiler
-class Firebird_kinterbasdb(FBDialect):
+class FBDialect_kinterbasdb(FBDialect):
driver = 'kinterbasdb'
supports_sane_rowcount = False
supports_sane_multi_rowcount = False
def __init__(self, type_conv=200, concurrency_level=1, **kwargs):
- super(Firebird_kinterbasdb, self).__init__(**kwargs)
+ super(FBDialect_kinterbasdb, self).__init__(**kwargs)
self.type_conv = type_conv
self.concurrency_level = concurrency_level
else:
return False
-dialect = Firebird_kinterbasdb
+dialect = FBDialect_kinterbasdb
from sqlalchemy.dialects.informix.base import InformixDialect
from sqlalchemy.engine import default
-class InfoExecutionContext(default.DefaultExecutionContext):
+class InformixExecutionContext_informixdb(default.DefaultExecutionContext):
def post_exec(self):
if self.isinsert:
self._lastrowid = [self.cursor.sqlerrd[1]]
-class Informix_informixdb(InformixDialect):
+class InformixDialect_informixdb(InformixDialect):
driver = 'informixdb'
default_paramstyle = 'qmark'
- execution_context_cls = InfoExecutionContext
+ execution_context_cls = InformixExecutionContext_informixdb
@classmethod
def dbapi(cls):
return False
-dialect = Informix_informixdb
+dialect = InformixDialect_informixdb
from sqlalchemy.dialects.maxdb.base import MaxDBDialect
-class MaxDB_sapdb(MaxDBDialect):
+class MaxDBDialect_sapdb(MaxDBDialect):
driver = 'sapdb'
@classmethod
return [], opts
-dialect = MaxDB_sapdb
\ No newline at end of file
+dialect = MaxDBDialect_sapdb
\ No newline at end of file
from sqlalchemy.dialects.mssql.base import MSDialect, MSExecutionContext
from sqlalchemy.engine import base
-class MS_zxjdbcExecutionContext(MSExecutionContext):
+class MSExecutionContext_zxjdbc(MSExecutionContext):
_embedded_scope_identity = False
def pre_exec(self):
- super(MS_zxjdbcExecutionContext, self).pre_exec()
+ super(MSExecutionContext_zxjdbc, self).pre_exec()
# scope_identity after the fact returns null in jTDS so we must
# embed it
if self._select_lastrowid and self.dialect.use_scope_identity:
self.cursor.execute("SET IDENTITY_INSERT %s OFF" % table)
-class MS_zxjdbc(ZxJDBCConnector, MSDialect):
+class MSDialect_zxjdbc(ZxJDBCConnector, MSDialect):
jdbc_db_name = 'jtds:sqlserver'
jdbc_driver_name = 'net.sourceforge.jtds.jdbc.Driver'
- execution_ctx_cls = MS_zxjdbcExecutionContext
+ execution_ctx_cls = MSExecutionContext_zxjdbc
def _get_server_version_info(self, connection):
return tuple(int(x) for x in connection.connection.dbversion.split('.'))
-dialect = MS_zxjdbc
+dialect = MSDialect_zxjdbc
from sqlalchemy import exc, log, schema, sql, types as sqltypes, util
from sqlalchemy import processors
-class MySQL_mysqlconnectorExecutionContext(MySQLExecutionContext):
+class MySQLExecutionContext_mysqlconnector(MySQLExecutionContext):
def get_lastrowid(self):
return self.cursor.lastrowid
-class MySQL_mysqlconnectorCompiler(MySQLCompiler):
+class MySQLCompiler_mysqlconnector(MySQLCompiler):
def visit_mod(self, binary, **kw):
return self.process(binary.left) + " %% " + self.process(binary.right)
class _myconnpyNumeric(_DecimalType, NUMERIC):
pass
-class MySQL_mysqlconnectorIdentifierPreparer(MySQLIdentifierPreparer):
+class MySQLIdentifierPreparer_mysqlconnector(MySQLIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return None
-class MySQL_mysqlconnector(MySQLDialect):
+class MySQLDialect_mysqlconnector(MySQLDialect):
driver = 'mysqlconnector'
supports_unicode_statements = True
supports_unicode_binds = True
supports_sane_multi_rowcount = True
default_paramstyle = 'format'
- execution_ctx_cls = MySQL_mysqlconnectorExecutionContext
- statement_compiler = MySQL_mysqlconnectorCompiler
+ execution_ctx_cls = MySQLExecutionContext_mysqlconnector
+ statement_compiler = MySQLCompiler_mysqlconnector
- preparer = MySQL_mysqlconnectorIdentifierPreparer
+ preparer = MySQLIdentifierPreparer_mysqlconnector
colspecs = util.update_copy(
MySQLDialect.colspecs,
def _compat_fetchone(self, rp, charset=None):
return rp.fetchone()
-dialect = MySQL_mysqlconnector
+dialect = MySQLDialect_mysqlconnector
from sqlalchemy import exc, log, schema, sql, types as sqltypes, util
from sqlalchemy import processors
-class MySQL_mysqldbExecutionContext(MySQLExecutionContext):
+class MySQLExecutionContext_mysqldb(MySQLExecutionContext):
@property
def rowcount(self):
return self.cursor.rowcount
-class MySQL_mysqldbCompiler(MySQLCompiler):
+class MySQLCompiler_mysqldb(MySQLCompiler):
def visit_mod(self, binary, **kw):
return self.process(binary.left) + " %% " + self.process(binary.right)
class _MySQLdbDecimal(_DecimalType, DECIMAL):
pass
-class MySQL_mysqldbIdentifierPreparer(MySQLIdentifierPreparer):
+class MySQLIdentifierPreparer_mysqldb(MySQLIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace("%", "%%")
-class MySQL_mysqldb(MySQLDialect):
+class MySQLDialect_mysqldb(MySQLDialect):
driver = 'mysqldb'
supports_unicode_statements = False
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
default_paramstyle = 'format'
- execution_ctx_cls = MySQL_mysqldbExecutionContext
- statement_compiler = MySQL_mysqldbCompiler
- preparer = MySQL_mysqldbIdentifierPreparer
+ execution_ctx_cls = MySQLExecutionContext_mysqldb
+ statement_compiler = MySQLCompiler_mysqldb
+ preparer = MySQLIdentifierPreparer_mysqldb
colspecs = util.update_copy(
MySQLDialect.colspecs,
return 'latin1'
-dialect = MySQL_mysqldb
+dialect = MySQLDialect_mysqldb
return None
-class MySQL_oursqlExecutionContext(MySQLExecutionContext):
+class MySQLExecutionContext_oursql(MySQLExecutionContext):
@property
def plain_query(self):
return self.execution_options.get('_oursql_plain_query', False)
-class MySQL_oursql(MySQLDialect):
+class MySQLDialect_oursql(MySQLDialect):
driver = 'oursql'
# Py3K
# description_encoding = None
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
- execution_ctx_cls = MySQL_oursqlExecutionContext
+ execution_ctx_cls = MySQLExecutionContext_oursql
colspecs = util.update_copy(
MySQLDialect.colspecs,
return rp.first()
-dialect = MySQL_oursql
+dialect = MySQLDialect_oursql
from sqlalchemy import util
import re
-class MySQL_pyodbcExecutionContext(MySQLExecutionContext):
+class MySQLExecutionContext_pyodbc(MySQLExecutionContext):
def get_lastrowid(self):
cursor = self.create_cursor()
cursor.close()
return lastrowid
-class MySQL_pyodbc(PyODBCConnector, MySQLDialect):
+class MySQLDialect_pyodbc(PyODBCConnector, MySQLDialect):
supports_unicode_statements = False
- execution_ctx_cls = MySQL_pyodbcExecutionContext
+ execution_ctx_cls = MySQLExecutionContext_pyodbc
pyodbc_driver_name = "MySQL"
def __init__(self, **kw):
# deal with http://code.google.com/p/pyodbc/issues/detail?id=25
kw.setdefault('convert_unicode', True)
- super(MySQL_pyodbc, self).__init__(**kw)
+ super(MySQLDialect_pyodbc, self).__init__(**kw)
def _detect_charset(self, connection):
"""Sniff out the character set in use for connection results."""
else:
return None
-dialect = MySQL_pyodbc
+dialect = MySQLDialect_pyodbc
return process
-class MySQL_zxjdbcExecutionContext(MySQLExecutionContext):
+class MySQLExecutionContext_zxjdbc(MySQLExecutionContext):
def get_lastrowid(self):
cursor = self.create_cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
return lastrowid
-class MySQL_zxjdbc(ZxJDBCConnector, MySQLDialect):
+class MySQLDialect_zxjdbc(ZxJDBCConnector, MySQLDialect):
jdbc_db_name = 'mysql'
jdbc_driver_name = 'com.mysql.jdbc.Driver'
- execution_ctx_cls = MySQL_zxjdbcExecutionContext
+ execution_ctx_cls = MySQLExecutionContext_zxjdbc
colspecs = util.update_copy(
MySQLDialect.colspecs,
version.append(n)
return tuple(version)
-dialect = MySQL_zxjdbc
+dialect = MySQLDialect_zxjdbc
sqltypes.NVARCHAR : _OracleNVarChar,
}
-class Oracle_cx_oracleCompiler(OracleCompiler):
+class OracleCompiler_cx_oracle(OracleCompiler):
def bindparam_string(self, name):
if self.preparer._bindparam_requires_quotes(name):
quoted_name = '"%s"' % name
return OracleCompiler.bindparam_string(self, name)
-class Oracle_cx_oracleExecutionContext(OracleExecutionContext):
+class OracleExecutionContext_cx_oracle(OracleExecutionContext):
def pre_exec(self):
quoted_bind_names = getattr(self.compiled, '_quoted_bind_names', {})
return result
-class Oracle_cx_oracle_with_unicodeExecutionContext(Oracle_cx_oracleExecutionContext):
+class OracleExecutionContext_cx_oracle_with_unicode(OracleExecutionContext_cx_oracle):
"""Support WITH_UNICODE in Python 2.xx.
WITH_UNICODE allows cx_Oracle's Python 3 unicode handling behavior under Python 2.x.
"""
def __init__(self, *arg, **kw):
- OracleExecutionContext.__init__(self, *arg, **kw)
+ OracleExecutionContext_cx_oracle.__init__(self, *arg, **kw)
self.statement = unicode(self.statement)
def _execute_scalar(self, stmt):
- return super(Oracle_cx_oracle_with_unicodeExecutionContext, self).\
+ return super(OracleExecutionContext_cx_oracle_with_unicode, self).\
_execute_scalar(unicode(stmt))
class ReturningResultProxy(base.FullyBufferedResultProxy):
return [tuple(self._returning_params["ret_%d" % i]
for i, c in enumerate(self._returning_params))]
-class Oracle_cx_oracle(OracleDialect):
- execution_ctx_cls = Oracle_cx_oracleExecutionContext
- statement_compiler = Oracle_cx_oracleCompiler
+class OracleDialect_cx_oracle(OracleDialect):
+ execution_ctx_cls = OracleExecutionContext_cx_oracle
+ statement_compiler = OracleCompiler_cx_oracle
driver = "cx_oracle"
colspecs = colspecs
"or otherwise be passed as Python unicode. Plain Python strings "
"passed as bind parameters will be silently corrupted by cx_Oracle."
)
- self.execution_ctx_cls = Oracle_cx_oracle_with_unicodeExecutionContext
+ self.execution_ctx_cls = OracleExecutionContext_cx_oracle_with_unicode
# end Py2K
else:
self._cx_oracle_with_unicode = False
def do_recover_twophase(self, connection):
pass
-dialect = Oracle_cx_oracle
+dialect = OracleDialect_cx_oracle
return process
-class Oracle_zxjdbcCompiler(OracleCompiler):
+class OracleCompiler_zxjdbc(OracleCompiler):
def returning_clause(self, stmt, returning_cols):
self.returning_cols = list(expression._select_iterables(returning_cols))
return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
-class Oracle_zxjdbcExecutionContext(OracleExecutionContext):
+class OracleExecutionContext_zxjdbc(OracleExecutionContext):
def pre_exec(self):
if hasattr(self.compiled, 'returning_parameters'):
self.type)
-class Oracle_zxjdbc(ZxJDBCConnector, OracleDialect):
+class OracleDialect_zxjdbc(ZxJDBCConnector, OracleDialect):
jdbc_db_name = 'oracle'
jdbc_driver_name = 'oracle.jdbc.OracleDriver'
- statement_compiler = Oracle_zxjdbcCompiler
- execution_ctx_cls = Oracle_zxjdbcExecutionContext
+ statement_compiler = OracleCompiler_zxjdbc
+ execution_ctx_cls = OracleExecutionContext_zxjdbc
colspecs = util.update_copy(
OracleDialect.colspecs,
)
def __init__(self, *args, **kwargs):
- super(Oracle_zxjdbc, self).__init__(*args, **kwargs)
+ super(OracleDialect_zxjdbc, self).__init__(*args, **kwargs)
global SQLException, zxJDBC
from java.sql import SQLException
from com.ziclix.python.sql import zxJDBC
self.DataHandler = OracleReturningDataHandler
def initialize(self, connection):
- super(Oracle_zxjdbc, self).initialize(connection)
+ super(OracleDialect_zxjdbc, self).initialize(connection)
self.implicit_returning = connection.connection.driverversion >= '10.2'
def _create_jdbc_url(self, url):
version = re.search(r'Release ([\d\.]+)', connection.connection.dbversion).group(1)
return tuple(int(x) for x in version.split('.'))
-dialect = Oracle_zxjdbc
+dialect = OracleDialect_zxjdbc
else:
raise exc.InvalidRequestError("Unknown PG numeric type: %d" % coltype)
-class PostgreSQL_pg8000ExecutionContext(PGExecutionContext):
+class PGExecutionContext_pg8000(PGExecutionContext):
pass
-class PostgreSQL_pg8000Compiler(PGCompiler):
+class PGCompiler_pg8000(PGCompiler):
def visit_mod(self, binary, **kw):
return self.process(binary.left) + " %% " + self.process(binary.right)
return text.replace('%', '%%')
-class PostgreSQL_pg8000IdentifierPreparer(PGIdentifierPreparer):
+class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
-class PostgreSQL_pg8000(PGDialect):
+class PGDialect_pg8000(PGDialect):
driver = 'pg8000'
supports_unicode_statements = True
default_paramstyle = 'format'
supports_sane_multi_rowcount = False
- execution_ctx_cls = PostgreSQL_pg8000ExecutionContext
- statement_compiler = PostgreSQL_pg8000Compiler
- preparer = PostgreSQL_pg8000IdentifierPreparer
+ execution_ctx_cls = PGExecutionContext_pg8000
+ statement_compiler = PGCompiler_pg8000
+ preparer = PGIdentifierPreparer_pg8000
colspecs = util.update_copy(
PGDialect.colspecs,
def is_disconnect(self, e):
return "connection is closed" in str(e)
-dialect = PostgreSQL_pg8000
+dialect = PGDialect_pg8000
r'\s*SELECT',
re.I | re.UNICODE)
-class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext):
+class PGExecutionContext_psycopg2(PGExecutionContext):
def create_cursor(self):
# TODO: coverage for server side cursors + select.for_update()
return base.ResultProxy(self)
-class PostgreSQL_psycopg2Compiler(PGCompiler):
+class PGCompiler_psycopg2(PGCompiler):
def visit_mod(self, binary, **kw):
return self.process(binary.left) + " %% " + self.process(binary.right)
return text.replace('%', '%%')
-class PostgreSQL_psycopg2IdentifierPreparer(PGIdentifierPreparer):
+class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
-class PostgreSQL_psycopg2(PGDialect):
+class PGDialect_psycopg2(PGDialect):
driver = 'psycopg2'
supports_unicode_statements = False
default_paramstyle = 'pyformat'
supports_sane_multi_rowcount = False
- execution_ctx_cls = PostgreSQL_psycopg2ExecutionContext
- statement_compiler = PostgreSQL_psycopg2Compiler
- preparer = PostgreSQL_psycopg2IdentifierPreparer
+ execution_ctx_cls = PGExecutionContext_psycopg2
+ statement_compiler = PGCompiler_psycopg2
+ preparer = PGIdentifierPreparer_psycopg2
colspecs = util.update_copy(
PGDialect.colspecs,
return
extensions.register_type(extensions.UNICODE, conn)
pool.add_listener({'first_connect': connect, 'connect':connect})
- super(PostgreSQL_psycopg2, self).visit_pool(pool)
+ super(PGDialect_psycopg2, self).visit_pool(pool)
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
else:
return False
-dialect = PostgreSQL_psycopg2
+dialect = PGDialect_psycopg2
else:
return processors.to_float
-class PostgreSQL_pypostgresqlExecutionContext(PGExecutionContext):
+class PGExecutionContext_pypostgresql(PGExecutionContext):
pass
-class PostgreSQL_pypostgresql(PGDialect):
+class PGDialect_pypostgresql(PGDialect):
driver = 'pypostgresql'
supports_unicode_statements = True
supports_sane_rowcount = True
supports_sane_multi_rowcount = False
- execution_ctx_cls = PostgreSQL_pypostgresqlExecutionContext
+ execution_ctx_cls = PGExecutionContext_pypostgresql
colspecs = util.update_copy(
PGDialect.colspecs,
{
def is_disconnect(self, e):
return "connection is closed" in str(e)
-dialect = PostgreSQL_pypostgresql
+dialect = PGDialect_pypostgresql
from sqlalchemy.connectors.zxJDBC import ZxJDBCConnector
from sqlalchemy.dialects.postgresql.base import PGDialect
-class PostgreSQL_zxjdbc(ZxJDBCConnector, PGDialect):
+class PGDialect_zxjdbc(ZxJDBCConnector, PGDialect):
jdbc_db_name = 'postgresql'
jdbc_driver_name = 'org.postgresql.Driver'
def _get_server_version_info(self, connection):
return tuple(int(x) for x in connection.connection.dbversion.split('.'))
-dialect = PostgreSQL_zxjdbc
+dialect = PGDialect_zxjdbc
else:
return DATE.result_processor(self, dialect, coltype)
-class SQLite_pysqlite(SQLiteDialect):
+class SQLiteDialect_pysqlite(SQLiteDialect):
default_paramstyle = 'qmark'
poolclass = pool.SingletonThreadPool
def is_disconnect(self, e):
return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e)
-dialect = SQLite_pysqlite
+dialect = SQLiteDialect_pysqlite
class SybaseExecutionContext_mxodbc(SybaseExecutionContext):
pass
-class Sybase_mxodbc(MxODBCConnector, SybaseDialect):
+class SybaseDialect_mxodbc(MxODBCConnector, SybaseDialect):
execution_ctx_cls = SybaseExecutionContext_mxodbc
-dialect = Sybase_mxodbc
+dialect = SybaseDialect_mxodbc
pass
-class Sybase_pyodbc(PyODBCConnector, SybaseDialect):
+class SybaseDialect_pyodbc(PyODBCConnector, SybaseDialect):
execution_ctx_cls = SybaseExecutionContext_pyodbc
-dialect = Sybase_pyodbc
+dialect = SybaseDialect_pyodbc