- support for MSSQL/PyODBC/FreeTDS on a mac. Win32 on deck..
and if on an older version of SQL Server, the operation fails. The behavior is exactly
the same except the error is raised by SQL server instead of the dialect, and no
flag setting is required to enable it.
+ - the "auto_identity_insert" flag is removed. This feature always takes effect
+ when an INSERT statement overrides a column that is known to have a sequence on it.
+ As with "has_window_funcs", if the underlying driver doesn't support this, then you
+ can't do this operation in any case, so there's no point in having a flag.
- using new dialect.initialize() feature to set up version-dependent behavior.
- types
For example, MySQL has a ``BIGINTEGER`` type and PostgreSQL has an
``INET`` type. To use these, import them from the module explicitly::
- from sqlalchemy.databases.mysql import MSBigInteger, MSEnum
+ from sqlalchemy.dialect.mysql import dialect as mysql
table = Table('foo', meta,
- Column('id', MSBigInteger),
- Column('enumerates', MSEnum('a', 'b', 'c'))
+ Column('id', mysql.BIGINTEGER),
+ Column('enumerates', mysql.ENUM('a', 'b', 'c'))
)
Or some PostgreSQL types::
- from sqlalchemy.databases.postgres import PGInet, PGArray
+ from sqlalchemy.dialect.postgres import dialect as postgresql
table = Table('foo', meta,
- Column('ipaddress', PGInet),
- Column('elements', PGArray(str))
+ Column('ipaddress', postgresql.INET),
+ Column('elements', postgresql.ARRAY(str))
)
+Each dialect should provide the full set of typenames supported by
+that backend, so that a backend-specific schema can be created without
+the need to locate types::
+
+ from sqlalchemy.dialects.postgresql import dialect as pg
+
+ t = Table('mytable', metadata,
+ Column('id', pg.INTEGER, primary_key=True),
+ Column('name', pg.VARCHAR(300)),
+ Column('inetaddr', pg.INET)
+ )
+
+Where above, the INTEGER and VARCHAR types are ultimately from
+sqlalchemy.types, but the Postgresql dialect makes them available.
Custom Types
------------
class that makes it easy to augment the bind parameter and result
processing capabilities of one of the built in types.
-To build a type object from scratch, subclass `:class:TypeEngine`.
+To build a type object from scratch, subclass `:class:UserDefinedType`.
.. autoclass:: TypeDecorator
:members:
:inherited-members:
:show-inheritance:
+.. autoclass:: UserDefinedType
+ :members:
+ :undoc-members:
+ :inherited-members:
+ :show-inheritance:
+
.. autoclass:: TypeEngine
:members:
:undoc-members:
arguments on the URL, or as keyword argument to
:func:`~sqlalchemy.create_engine()` are:
-* *auto_identity_insert* - enables support for IDENTITY inserts by
- automatically turning IDENTITY INSERT ON and OFF as required.
- Defaults to ``True``.
-
* *query_timeout* - allows you to override the default query timeout.
Defaults to ``None``. This is only supported on pymssql.
* *use_scope_identity* - allows you to specify that SCOPE_IDENTITY
should be used in place of the non-scoped version @@IDENTITY.
- Defaults to ``False``. On pymssql this defaults to ``True``, and on
- pyodbc this defaults to ``True`` if the version of pyodbc being
- used supports it.
+ Defaults to True.
* *max_identifier_length* - allows you to se the maximum length of
identfiers supported by the database. Defaults to 128. For pymssql
return tbl._ms_has_sequence
class MSExecutionContext(default.DefaultExecutionContext):
- IINSERT = False
- HASIDENT = False
-
+ _enable_identity_insert = False
+ _select_lastrowid = False
+
def pre_exec(self):
"""Activate IDENTITY_INSERT if needed."""
if self.isinsert:
tbl = self.compiled.statement.table
seq_column = _table_sequence_column(tbl)
- self.HASIDENT = bool(seq_column)
- if self.dialect.auto_identity_insert and self.HASIDENT:
- self.IINSERT = tbl._ms_has_sequence.key in self.compiled_parameters[0]
+ insert_has_sequence = bool(seq_column)
+
+ if insert_has_sequence:
+ self._enable_identity_insert = tbl._ms_has_sequence.key in self.compiled_parameters[0]
else:
- self.IINSERT = False
-
- if self.IINSERT:
+ self._enable_identity_insert = False
+
+ self._select_lastrowid = insert_has_sequence and \
+ not self._enable_identity_insert and \
+ not self.executemany
+
+ if self._enable_identity_insert:
self.cursor.execute("SET IDENTITY_INSERT %s ON" %
- self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
+ self.dialect.identifier_preparer.format_table(tbl))
+
+ def post_exec(self):
+ """Disable IDENTITY_INSERT if enabled."""
+
+ if self._select_lastrowid:
+ if self.dialect.use_scope_identity:
+ self.cursor.execute("SELECT scope_identity() AS lastrowid")
+ else:
+ self.cursor.execute("SELECT @@identity AS lastrowid")
+ row = self.cursor.fetchall()[0] # fetchall() ensures the cursor is consumed without closing it
+ self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:]
+
+ if self._enable_identity_insert:
+ self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
def handle_dbapi_exception(self, e):
- if self.IINSERT:
+ if self._enable_identity_insert:
try:
self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
except:
pass
- def post_exec(self):
- """Disable IDENTITY_INSERT if enabled."""
-
- if self.isinsert and not self.executemany and self.HASIDENT and not self.IINSERT:
- if not self._last_inserted_ids or self._last_inserted_ids[0] is None:
- if self.dialect.use_scope_identity:
- self.cursor.execute("SELECT scope_identity() AS lastrowid")
- else:
- self.cursor.execute("SELECT @@identity AS lastrowid")
- row = self.cursor.fetchone()
- self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:]
-
- if self.IINSERT:
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
colspecs = {
sqltypes.Unicode : MSNVarchar,
name = 'mssql'
supports_default_values = True
supports_empty_insert = False
- auto_identity_insert = True
execution_ctx_cls = MSExecutionContext
text_as_varchar = False
- use_scope_identity = False
+ use_scope_identity = True
max_identifier_length = 128
schema_name = "dbo"
colspecs = colspecs
preparer = MSIdentifierPreparer
def __init__(self,
- auto_identity_insert=True, query_timeout=None,
- use_scope_identity=False,
+ query_timeout=None,
+ use_scope_identity=True,
max_identifier_length=None,
schema_name="dbo", **opts):
- self.auto_identity_insert = bool(auto_identity_insert)
self.query_timeout = int(query_timeout or 0)
self.schema_name = schema_name
- self.use_scope_identity = bool(use_scope_identity)
+ self.use_scope_identity = use_scope_identity
self.max_identifier_length = int(max_identifier_length or 0) or \
self.max_identifier_length
super(MSDialect, self).__init__(**opts)
pass
def get_default_schema_name(self, connection):
- query = "SELECT user_name() as user_name;"
- user_name = connection.scalar(sql.text(query))
+ user_name = connection.scalar("SELECT user_name() as user_name;")
if user_name is not None:
# now, get the default schema
query = """
SELECT default_schema_name FROM
sys.database_principals
- WHERE name = :user_name
+ WHERE name = ?
AND type = 'S'
"""
try:
- default_schema_name = connection.scalar(sql.text(query),
- user_name=user_name)
+ default_schema_name = connection.scalar(query, [user_name])
if default_schema_name is not None:
return default_schema_name
except:
-from sqlalchemy import Table, MetaData, Column, ForeignKey, String, Unicode, Integer
+from sqlalchemy import Table, MetaData, Column, ForeignKey
+from sqlalchemy.types import String, Unicode, Integer, TypeDecorator
ischema = MetaData()
+class CoerceUnicode(TypeDecorator):
+ impl = Unicode
+
+ def process_bind_param(self, value, dialect):
+ if isinstance(value, str):
+ value = value.decode(dialect.encoding)
+ return value
+
schemata = Table("SCHEMATA", ischema,
- Column("CATALOG_NAME", Unicode, key="catalog_name"),
- Column("SCHEMA_NAME", Unicode, key="schema_name"),
- Column("SCHEMA_OWNER", Unicode, key="schema_owner"),
+ Column("CATALOG_NAME", CoerceUnicode, key="catalog_name"),
+ Column("SCHEMA_NAME", CoerceUnicode, key="schema_name"),
+ Column("SCHEMA_OWNER", CoerceUnicode, key="schema_owner"),
schema="INFORMATION_SCHEMA")
tables = Table("TABLES", ischema,
- Column("TABLE_CATALOG", Unicode, key="table_catalog"),
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
+ Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
Column("TABLE_TYPE", String, key="table_type"),
schema="INFORMATION_SCHEMA")
columns = Table("COLUMNS", ischema,
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
- Column("COLUMN_NAME", Unicode, key="column_name"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
+ Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
Column("IS_NULLABLE", Integer, key="is_nullable"),
Column("DATA_TYPE", String, key="data_type"),
Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
schema="INFORMATION_SCHEMA")
constraints = Table("TABLE_CONSTRAINTS", ischema,
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
- Column("CONSTRAINT_NAME", Unicode, key="constraint_name"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
+ Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
Column("CONSTRAINT_TYPE", String, key="constraint_type"),
schema="INFORMATION_SCHEMA")
column_constraints = Table("CONSTRAINT_COLUMN_USAGE", ischema,
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
- Column("COLUMN_NAME", Unicode, key="column_name"),
- Column("CONSTRAINT_NAME", Unicode, key="constraint_name"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
+ Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
+ Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
schema="INFORMATION_SCHEMA")
key_constraints = Table("KEY_COLUMN_USAGE", ischema,
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
- Column("COLUMN_NAME", Unicode, key="column_name"),
- Column("CONSTRAINT_NAME", Unicode, key="constraint_name"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
+ Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
+ Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
schema="INFORMATION_SCHEMA")
ref_constraints = Table("REFERENTIAL_CONSTRAINTS", ischema,
- Column("CONSTRAINT_CATALOG", Unicode, key="constraint_catalog"),
- Column("CONSTRAINT_SCHEMA", Unicode, key="constraint_schema"),
- Column("CONSTRAINT_NAME", Unicode, key="constraint_name"),
- Column("UNIQUE_CONSTRAINT_CATLOG", Unicode, key="unique_constraint_catalog"), # TODO: is CATLOG misspelled ?
- Column("UNIQUE_CONSTRAINT_SCHEMA", Unicode, key="unique_constraint_schema"),
- Column("UNIQUE_CONSTRAINT_NAME", Unicode, key="unique_constraint_name"),
+ Column("CONSTRAINT_CATALOG", CoerceUnicode, key="constraint_catalog"),
+ Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
+ Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
+ Column("UNIQUE_CONSTRAINT_CATLOG", CoerceUnicode, key="unique_constraint_catalog"), # TODO: is CATLOG misspelled ?
+ Column("UNIQUE_CONSTRAINT_SCHEMA", CoerceUnicode, key="unique_constraint_schema"),
+ Column("UNIQUE_CONSTRAINT_NAME", CoerceUnicode, key="unique_constraint_name"),
Column("MATCH_OPTION", String, key="match_option"),
Column("UPDATE_RULE", String, key="update_rule"),
Column("DELETE_RULE", String, key="delete_rule"),
schema="INFORMATION_SCHEMA")
views = Table("VIEWS", ischema,
- Column("TABLE_CATALOG", Unicode, key="table_catalog"),
- Column("TABLE_SCHEMA", Unicode, key="table_schema"),
- Column("TABLE_NAME", Unicode, key="table_name"),
- Column("VIEW_DEFINITION", Unicode, key="view_definition"),
+ Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
+ Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
+ Column("TABLE_NAME", CoerceUnicode, key="table_name"),
+ Column("VIEW_DEFINITION", CoerceUnicode, key="view_definition"),
Column("CHECK_OPTION", String, key="check_option"),
Column("IS_UPDATABLE", String, key="is_updatable"),
schema="INFORMATION_SCHEMA")
from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
from sqlalchemy.connectors.pyodbc import PyODBCConnector
from sqlalchemy import types as sqltypes
-
+import re
import sys
class MSExecutionContext_pyodbc(MSExecutionContext):
+ _embedded_scope_identity = False
+
def pre_exec(self):
- """where appropriate, issue "select scope_identity()" in the same statement"""
+ """where appropriate, issue "select scope_identity()" in the same statement.
+
+ Background on why "scope_identity()" is preferable to "@@identity":
+ http://msdn.microsoft.com/en-us/library/ms190315.aspx
+
+ Background on why we attempt to embed "scope_identity()" into the same
+ statement as the INSERT:
+ http://code.google.com/p/pyodbc/wiki/FAQs#How_do_I_retrieve_autogenerated/identity_values?
+
+ """
+
super(MSExecutionContext_pyodbc, self).pre_exec()
- if self.isinsert and self.HASIDENT and not self.IINSERT \
- and len(self.parameters) == 1 and self.dialect.use_scope_identity:
+
+ # don't embed the scope_identity select into an "INSERT .. DEFAULT VALUES"
+ if self._select_lastrowid and \
+ self.dialect.use_scope_identity and \
+ len(self.parameters[0]):
+ self._embedded_scope_identity = True
+
self.statement += "; select scope_identity()"
def post_exec(self):
- if self.HASIDENT and not self.IINSERT and self.dialect.use_scope_identity and not self.executemany:
+ if self._embedded_scope_identity:
# Fetch the last inserted id from the manipulated statement
# We may have to skip over a number of result sets with no data (due to triggers, etc.)
while True:
try:
- row = self.cursor.fetchone()
+ # fetchall() ensures the cursor is consumed without closing it (FreeTDS particularly)
+ row = self.cursor.fetchall()[0]
break
except self.dialect.dbapi.Error, e:
+ # no way around this - nextset() consumes the previous set
+ # so we need to just keep flipping
self.cursor.nextset()
- self._last_inserted_ids = [int(row[0])]
+
+ self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:]
else:
super(MSExecutionContext_pyodbc, self).post_exec()
class MSDialect_pyodbc(PyODBCConnector, MSDialect):
supports_sane_rowcount = False
supports_sane_multi_rowcount = False
- # PyODBC unicode is broken on UCS-4 builds
- supports_unicode = sys.maxunicode == 65535
- supports_unicode_binds = supports_unicode
- supports_unicode_statements = supports_unicode
+
execution_ctx_cls = MSExecutionContext_pyodbc
pyodbc_driver_name = 'SQL Server'
self.description_encoding = description_encoding
self.use_scope_identity = self.dbapi and hasattr(self.dbapi.Cursor, 'nextset')
- def is_disconnect(self, e):
- if isinstance(e, self.dbapi.ProgrammingError):
- return "The cursor's connection has been closed." in str(e) or 'Attempt to use a closed connection.' in str(e)
- elif isinstance(e, self.dbapi.Error):
- return '[08S01]' in str(e)
- else:
- return False
-
+ def initialize(self, connection):
+ pyodbc = self.dbapi
+ self.server_version_info = self._get_server_version_info(connection)
+
+ dbapi_con = connection.connection
+
+ self._free_tds = re.match(r".*libtdsodbc.*\.so", dbapi_con.getinfo(pyodbc.SQL_DRIVER_NAME))
+
+ # the "Py2K only" part here is theoretical.
+ # have not tried pyodbc + python3.1 yet.
+ # Py2K
+ self.supports_unicode_statements = not self._free_tds
+ self.supports_unicode_binds = not self._free_tds
+ # end Py2K
+
dialect = MSDialect_pyodbc
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy import types as sqltypes
+from sqlalchemy.types import INTEGER, BIGINT, SMALLINT, VARCHAR, \
+ CHAR, TEXT, FLOAT, NUMERIC, \
+ TIMESTAMP, TIME, DATE, BOOLEAN
-class PGInet(sqltypes.TypeEngine):
+class REAL(sqltypes.Float):
+ __visit_name__ = "REAL"
+
+class BYTEA(sqltypes.Binary):
+ __visit_name__ = 'BYTEA'
+
+class DOUBLE_PRECISION(sqltypes.Float):
+ __visit_name__ = 'DOUBLE_PRECISION'
+
+class INET(sqltypes.TypeEngine):
__visit_name__ = "INET"
+PGInet = INET
-class PGCidr(sqltypes.TypeEngine):
+class CIDR(sqltypes.TypeEngine):
__visit_name__ = "CIDR"
+PGCidr = CIDR
-class PGMacAddr(sqltypes.TypeEngine):
+class MACADDR(sqltypes.TypeEngine):
__visit_name__ = "MACADDR"
+PGMacAddr = MACADDR
-class PGInterval(sqltypes.TypeEngine):
+class INTERVAL(sqltypes.TypeEngine):
__visit_name__ = 'INTERVAL'
+PGInterval = INTERVAL
-class PGBit(sqltypes.TypeEngine):
+class BIT(sqltypes.TypeEngine):
__visit_name__ = 'BIT'
+PGBit = BIT
-class PGUuid(sqltypes.TypeEngine):
+class UUID(sqltypes.TypeEngine):
__visit_name__ = 'UUID'
+PGUuid = UUID
-class PGArray(sqltypes.MutableType, sqltypes.Concatenable, sqltypes.TypeEngine):
+class ARRAY(sqltypes.MutableType, sqltypes.Concatenable, sqltypes.TypeEngine):
__visit_name__ = 'ARRAY'
def __init__(self, item_type, mutable=True):
return item
return [convert_item(item) for item in value]
return process
-
+PGArray = ARRAY
colspecs = {
- sqltypes.Interval:PGInterval
+ sqltypes.Interval:INTERVAL
}
ischema_names = {
- 'integer' : sqltypes.INTEGER,
- 'bigint' : sqltypes.BigInteger,
- 'smallint' : sqltypes.SMALLINT,
- 'character varying' : sqltypes.VARCHAR,
- 'character' : sqltypes.CHAR,
+ 'integer' : INTEGER,
+ 'bigint' : BIGINT,
+ 'smallint' : SMALLINT,
+ 'character varying' : VARCHAR,
+ 'character' : CHAR,
'"char"' : sqltypes.String,
'name' : sqltypes.String,
- 'text' : sqltypes.TEXT,
- 'numeric' : sqltypes.NUMERIC,
- 'float' : sqltypes.FLOAT,
- 'real' : sqltypes.Float,
- 'inet': PGInet,
- 'cidr': PGCidr,
- 'uuid': PGUuid,
- 'bit':PGBit,
- 'macaddr': PGMacAddr,
- 'double precision' : sqltypes.Float,
- 'timestamp' : sqltypes.TIMESTAMP,
- 'timestamp with time zone' : sqltypes.TIMESTAMP,
- 'timestamp without time zone' : sqltypes.TIMESTAMP,
- 'time with time zone' : sqltypes.TIME,
- 'time without time zone' : sqltypes.TIME,
- 'date' : sqltypes.DATE,
- 'time': sqltypes.TIME,
- 'bytea' : sqltypes.Binary,
- 'boolean' : sqltypes.BOOLEAN,
- 'interval':PGInterval,
+ 'text' : TEXT,
+ 'numeric' : NUMERIC,
+ 'float' : FLOAT,
+ 'real' : REAL,
+ 'inet': INET,
+ 'cidr': CIDR,
+ 'uuid': UUID,
+ 'bit':BIT,
+ 'macaddr': MACADDR,
+ 'double precision' : DOUBLE_PRECISION,
+ 'timestamp' : TIMESTAMP,
+ 'timestamp with time zone' : TIMESTAMP,
+ 'timestamp without time zone' : TIMESTAMP,
+ 'time with time zone' : TIME,
+ 'time without time zone' : TIME,
+ 'date' : DATE,
+ 'time': TIME,
+ 'bytea' : BYTEA,
+ 'boolean' : BOOLEAN,
+ 'interval':INTERVAL,
}
if (isinstance(column.server_default, schema.DefaultClause) and
column.server_default.arg is not None):
return self.execute_string("select %s" % column.server_default.arg)
- elif (isinstance(column.type, sqltypes.Integer) and column.autoincrement) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
+ elif (isinstance(column.type, sqltypes.Integer) and column.autoincrement) \
+ and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
sch = column.table.schema
# TODO: this has to build into the Sequence object so we can get the quoting
# logic from it
return "FLOAT"
else:
return "FLOAT(%(precision)s)" % {'precision': type_.precision}
-
+
+ def visit_DOUBLE_PRECISION(self, type_):
+ return "DOUBLE PRECISION"
+
def visit_BIGINT(self, type_):
return "BIGINT"
def visit_BYTEA(self, type_):
return "BYTEA"
+ def visit_REAL(self, type_):
+ return "REAL"
+
def visit_ARRAY(self, type_):
return self.process(type_.item_type) + '[]'
# seems like case gets folded in pg_class...
if schema is None:
cursor = connection.execute(
- sql.text("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=:name""",
+ sql.text("select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=:name",
bindparams=[sql.bindparam('name', unicode(table_name.lower()), type_=sqltypes.Unicode)]
)
)
else:
cursor = connection.execute(
- sql.text("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=:schema and lower(relname)=:name""",
+ sql.text("select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where n.nspname=:schema and lower(relname)=:name",
bindparams=[sql.bindparam('name', unicode(table_name.lower()), type_=sqltypes.Unicode),
sql.bindparam('schema', unicode(schema), type_=sqltypes.Unicode)]
)
3. the "ischema_names" and "colspecs" dictionaries are now required members on
the Dialect class.
-4. "colspecs" now is a dictionary of generic or uppercased types from sqlalchemy.types
+4. The names of types within dialects are now important. If a dialect-specific type
+is a subclass of an existing generic type and is only provided for bind/result behavior,
+the current mixed case naming can remain, i.e. PGNumeric for Numeric - in this case,
+end users would never need to use PGNumeric directly. However, if a dialect-specific
+type is specifying a type *or* arguments that are not present generically, it should
+match the real name of the type on that backend, in uppercase. E.g. postgres.INET,
+mysql.ENUM, postgres.ARRAY.
+
+Ideally one should be able to specify a schema using names imported completely from a
+dialect, all matching the real name on that backend:
+
+ from sqlalchemy.dialects.postgres import base as pg
+
+ t = Table('mytable', metadata,
+ Column('id', pg.INTEGER, primary_key=True),
+ Column('name', pg.VARCHAR(300)),
+ Column('inetaddr', pg.INET)
+ )
+
+where above, the INTEGER and VARCHAR types are ultimately from sqlalchemy.types,
+but the PG dialect makes them available in its own namespace.
+
+5. "colspecs" now is a dictionary of generic or uppercased types from sqlalchemy.types
linked to types specified in the dialect. Again, if a type in the dialect does not
specify any special behavior for bind_processor() or result_processor() and does not
indicate a special type only available in this database, it must be *removed* from the
module and from this dictionary.
-5. "ischema_names" indicates string descriptions of types as returned from the database
+6. "ischema_names" indicates string descriptions of types as returned from the database
linked to TypeEngine classes.
a. The string name should be matched to the most specific type possible within
b. If the dialect contains a matching dialect-specific type that takes extra arguments
which the generic one does not, then point to the dialect-specific type. E.g.
- mssql MSString takes a "collation" parameter which should be preserved.
+ mssql.VARCHAR takes a "collation" parameter which should be preserved.
c. For an exact or almost exact match, point to the uppercase type. i.e. "float"
should point to "FLOAT", "varchar" should point to "VARCHAR"
# using VARCHAR/NCHAR so that we dont get the genericized "String"
# type which usually resolves to TEXT/CLOB
type_map = {
- str : VARCHAR,
+ str: String,
# Py2K
- unicode : NCHAR,
+ unicode : String,
# end Py2K
int : Integer,
float : Numeric,
self.assert_(last not in ids)
ids.add(last)
+ eq_(ids, set([1,2,3,4]))
+
eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
r = single.insert().execute()
id_ = r.last_inserted_ids()[0]
- assert id_ is not None
+ eq_(id_, 1)
eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
def test_autoincrement_fk(self):