For a statement execution which is not an executemany, the returned `ResultProxy` will contain a collection accessible via `result.postfetch_cols()` which contains a list of all `Column` objects which had an inline-executed default. Similarly, all parameters which were bound to the statement, including all Python and SQL expressions which were pre-executed, are present in the `last_inserted_params()` or `last_updated_params()` collections on `ResultProxy`. The `last_inserted_ids()` collection contains a list of primary key values for the row inserted.
-#### DDL-Level Defaults {@name=passive}
+#### DDL-Level Defaults {@name=passive}
-A variant on a SQL expression default is the `PassiveDefault`, which gets placed in the CREATE TABLE statement during a `create()` operation:
+A variant on a SQL expression default is the `server_default`, which gets placed in the CREATE TABLE statement during a `create()` operation:
{python}
- t = Table('test', meta,
- Column('mycolumn', DateTime, PassiveDefault(text("sysdate")))
+ t = Table('test', meta,
+ Column('abc', String(20), server_default='abc'),
+ Column('created_at', DateTime, server_default=text("sysdate"))
)
-
+
A create call for the above table will produce:
{code}
CREATE TABLE test (
- mycolumn datetime default sysdate
+ abc varchar(20) default 'abc',
+ created_at datetime default sysdate
)
-
-The behavior of `PassiveDefault` is similar to that of a regular SQL default; if it's placed on a primary key column for a database which doesn't have a way to "postfetch" the ID, and the statement is not "inlined", the SQL expression is pre-executed; otherwise, SQLAlchemy lets the default fire off on the database side normally.
-#### Defining Sequences {@name=sequences}
+The behavior of `server_default` is similar to that of a regular SQL default; if it's placed on a primary key column for a database which doesn't have a way to "postfetch" the ID, and the statement is not "inlined", the SQL expression is pre-executed; otherwise, SQLAlchemy lets the default fire off on the database side normally.
+
+#### Triggered Columns {@name=triggered}
+
+Columns with values set by a database trigger or other external process may be called out with a marker:
+
+ {python}
+ t = Table('test', meta,
+ Column('abc', String(20), server_default=FetchedValue())
+ Column('def', String(20), server_onupdate=FetchedValue())
+ )
+
+These markers do not emit a ``default`` clause when the table is created, however they do set the same internal flags as a static `server_default` clause, providing hints to higher-level tools that a "post-fetch" of these rows should be performed after an insert or update.
+
+#### Defining Sequences {@name=sequences}
A table with a sequence looks like:
MetaData, ThreadLocalMetaData, Table, Column, ForeignKey, \
Sequence, Index, ForeignKeyConstraint, PrimaryKeyConstraint, \
CheckConstraint, UniqueConstraint, Constraint, \
- PassiveDefault, ColumnDefault, DDL
+ DefaultClause, FetchedValue, PassiveDefault, ColumnDefault, DDL
from sqlalchemy.engine import create_engine, engine_from_config
else:
return str(value)
return process
-
+
def get_col_spec(self):
return "NUMERIC"
return str(value)
return None
return process
-
+
class AcInteger(types.Integer):
def get_col_spec(self):
return "INTEGER"
return None
class AcChar(types.CHAR):
- def get_col_spec(self):
+ def get_col_spec(self):
return "TEXT" + (self.length and ("(%d)" % self.length) or "")
class AcBinary(types.Binary):
return None
return value and True or False
return process
-
+
def bind_processor(self, dialect):
def process(value):
if value is True:
else:
return value and True or False
return process
-
+
class AcTimeStamp(types.TIMESTAMP):
def get_col_spec(self):
return "TIMESTAMP"
except Exception, e:
return False
- def reflecttable(self, connection, table, include_columns):
+ def reflecttable(self, connection, table, include_columns):
# This is defined in the function, as it relies on win32com constants,
# that aren't imported until dbapi method is called
if not hasattr(self, 'ischema_names'):
const.dbBoolean: AcBoolean,
const.dbText: AcUnicode, # All Access strings are unicode
}
-
+
# A fresh DAO connection is opened for each reflection
# This is necessary, so we get the latest updates
dtbs = daoEngine.OpenDatabase(connection.engine.url.database)
-
+
try:
for tbl in dtbs.TableDefs:
if tbl.Name.lower() == table.name.lower():
elif default:
if col.Type == const.dbBoolean:
default = default == 'Yes' and '1' or '0'
- colargs['default'] = schema.PassiveDefault(sql.text(default))
+ colargs['server_default'] = schema.DefaultClause(sql.text(default))
table.append_column(schema.Column(col.Name, coltype, **colargs))
col.unique = idx.Unique
else:
pass # TBD: multi-column indexes
-
+
for fk in dtbs.Relations:
if fk.ForeignTable != table.name:
# the value comes down as "DEFAULT 'value'"
assert row['fdefault'].startswith('DEFAULT ')
defvalue = row['fdefault'][8:]
- args.append(schema.PassiveDefault(sql.text(defvalue)))
+ args.append(schema.DefaultClause(sql.text(defvalue)))
col = schema.Column(*args, **kw)
if kw['primary_key']:
import sqlalchemy.sql as sql
import sqlalchemy.exc as exc
from sqlalchemy import select, MetaData, Table, Column, String, Integer
-from sqlalchemy.schema import PassiveDefault, ForeignKeyConstraint
+from sqlalchemy.schema import DefaultClause, ForeignKeyConstraint
ischema = MetaData()
Column("numeric_scale", Integer),
Column("column_default", Integer),
schema="information_schema")
-
+
constraints = Table("table_constraints", ischema,
Column("table_schema", String),
Column("table_name", String),
def reflecttable(connection, table, include_columns, ischema_names):
key_constraints = pg_key_constraints
-
+
if table.schema is not None:
current_schema = table.schema
else:
current_schema = connection.default_schema_name()
-
- s = select([columns],
+
+ s = select([columns],
sql.and_(columns.c.table_name==table.name,
columns.c.table_schema==current_schema),
order_by=[columns.c.ordinal_position])
-
+
c = connection.execute(s)
found_table = False
while True:
# continue
found_table = True
(name, type, nullable, charlen, numericprec, numericscale, default) = (
- row[columns.c.column_name],
- row[columns.c.data_type],
- row[columns.c.is_nullable] == 'YES',
+ row[columns.c.column_name],
+ row[columns.c.data_type],
+ row[columns.c.is_nullable] == 'YES',
row[columns.c.character_maximum_length],
row[columns.c.numeric_precision],
row[columns.c.numeric_scale],
)
if include_columns and name not in include_columns:
continue
-
+
args = []
for a in (charlen, numericprec, numericscale):
if a is not None:
coltype = coltype(*args)
colargs = []
if default is not None:
- colargs.append(PassiveDefault(sql.text(default)))
+ colargs.append(DefaultClause(sql.text(default)))
table.append_column(Column(name, coltype, nullable=nullable, *colargs))
-
+
if not found_table:
raise exc.NoSuchTableError(table.name)
row[colmap[5]],
row[colmap[6]]
)
- #print "type %s on column %s to remote %s.%s.%s" % (type, constrained_column, referred_schema, referred_table, referred_column)
+ #print "type %s on column %s to remote %s.%s.%s" % (type, constrained_column, referred_schema, referred_table, referred_column)
if type == 'PRIMARY KEY':
table.primary_key.add(table.c[constrained_column])
elif type == 'FOREIGN KEY':
fk[0].append(constrained_column)
if refspec not in fk[1]:
fk[1].append(refspec)
-
+
for name, value in fks.iteritems():
- table.append_constraint(ForeignKeyConstraint(value[0], value[1], name=name))
-
+ table.append_constraint(ForeignKeyConstraint(value[0], value[1], name=name))
colargs = []
if default is not None:
- colargs.append(schema.PassiveDefault(sql.text(default)))
+ colargs.append(schema.DefaultClause(sql.text(default)))
table.append_column(schema.Column(name, coltype, nullable = (nullable == 0), *colargs))
(thetas with (+) outer indicators) are already done and available for
integration- email the devel list if you're interested in working on
this.
-"""
+"""
import datetime, itertools, re
from sqlalchemy import exc, schema, sql, util
col_kw['autoincrement'] = True
else:
# strip current numbering
- col_kw['default'] = schema.PassiveDefault(
+ col_kw['server_default'] = schema.DefaultClause(
sql.text('SERIAL'))
col_kw['autoincrement'] = True
else:
- col_kw['default'] = schema.PassiveDefault(
+ col_kw['server_default'] = schema.DefaultClause(
sql.text(func_def))
elif constant_def is not None:
- col_kw['default'] = schema.PassiveDefault(sql.text(
+ col_kw['server_default'] = schema.DefaultClause(sql.text(
"'%s'" % constant_def.replace("'", "''")))
table.append_column(schema.Column(name, type_instance, **col_kw))
# Assign DEFAULT SERIAL heuristically
elif column.primary_key and column.autoincrement:
# For SERIAL on a non-primary key member, use
- # PassiveDefault(text('SERIAL'))
+ # DefaultClause(text('SERIAL'))
try:
first = [c for c in column.table.primary_key.columns
if (c.autoincrement and
return ' '.join(colspec)
def get_column_default_string(self, column):
- if isinstance(column.default, schema.PassiveDefault):
+ if isinstance(column.server_default, schema.DefaultClause):
if isinstance(column.default.arg, basestring):
if isinstance(column.type, sqltypes.Integer):
return str(column.default.arg)
if col.default.optional:
return index, col
elif (col.default is None or
- (not isinstance(col.default, schema.PassiveDefault))):
+ (not isinstance(col.server_default, schema.DefaultClause))):
return index, col
return None, None
does **not** work around
"""
-
import datetime, operator, re, sys
from sqlalchemy import sql, schema, exc, util
coltype = coltype(*args)
colargs = []
if default is not None:
- colargs.append(schema.PassiveDefault(sql.text(default)))
+ colargs.append(schema.DefaultClause(sql.text(default)))
table.append_column(schema.Column(name, coltype, nullable=nullable, autoincrement=False, *colargs))
"""MySQL TIMESTAMP type.
To signal the orm to automatically re-select modified rows to retrieve
- the updated timestamp, add a PassiveDefault to your column specification::
+ the updated timestamp, add a DefaultClause to your column specification::
from sqlalchemy.databases import mysql
Column('updated', mysql.MSTimeStamp,
- PassiveDefault(sql.text('CURRENT_TIMESTAMP')))
+ server_default=sql.text('CURRENT_TIMESTAMP'))
The full range of MySQL 4.1+ TIMESTAMP defaults can be specified in
- the PassiveDefault::
+ the the default:
- PassiveDefault(sql.text('CURRENT TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))
+ server_default=sql.text('CURRENT TIMESTAMP ON UPDATE CURRENT_TIMESTAMP')
"""
-
def get_col_spec(self):
return "TIMESTAMP"
default = sql.text(default)
else:
default = default[1:-1]
- col_args.append(schema.PassiveDefault(default))
+ col_args.append(schema.DefaultClause(default))
table.append_column(schema.Column(name, type_instance,
*col_args, **col_kw))
colargs = []
if default is not None:
- colargs.append(schema.PassiveDefault(sql.text(default)))
+ colargs.append(schema.DefaultClause(sql.text(default)))
table.append_column(schema.Column(colname, coltype, nullable=nullable, *colargs))
# unconditionally quote the schema name. this could
# later be enhanced to obey quoting rules / "quote schema"
default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3)
- colargs.append(schema.PassiveDefault(sql.text(default)))
+ colargs.append(schema.DefaultClause(sql.text(default)))
table.append_column(schema.Column(name, coltype, nullable=nullable, *colargs))
def get_column_default(self, column, isinsert=True):
if column.primary_key:
# pre-execute passive defaults on primary keys
- if isinstance(column.default, schema.PassiveDefault):
- return self.execute_string("select %s" % column.default.arg)
+ if (isinstance(column.server_default, schema.DefaultClause) and
+ column.server_default.arg is not None):
+ return self.execute_string("select %s" % column.server_default.arg)
elif (isinstance(column.type, sqltypes.Integer) and column.autoincrement) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
sch = column.table.schema
# TODO: this has to build into the Sequence object so we can get the quoting
import datetime, re, time
-from sqlalchemy import schema, exc, pool, PassiveDefault
+from sqlalchemy import schema, exc, pool, DefaultClause
from sqlalchemy.engine import default
import sqlalchemy.types as sqltypes
import sqlalchemy.util as util
colargs = []
if has_default:
- colargs.append(PassiveDefault('?'))
+ colargs.append(DefaultClause('?'))
table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
if not found_table:
coltype = coltype(*args)
colargs = []
if default is not None:
- colargs.append(schema.PassiveDefault(sql.text(default)))
+ colargs.append(schema.DefaultClause(sql.text(default)))
# any sequences ?
col = schema.Column(name, coltype, nullable=nullable, primary_key=primary_key, *colargs)
if self.__should_log_debug:
self.__log_debug("Using polymorphic identity '%s' for insert column '%s'" % (mapper.polymorphic_identity, col.key))
value = mapper.polymorphic_identity
- if col.default is None or value is not None:
+ if ((col.default is None and
+ col.server_default is None) or
+ value is not None):
params[col.key] = value
else:
value = mapper._get_state_attr_by_column(state, col)
- if col.default is None or value is not None:
+ if ((col.default is None and
+ col.server_default is None) or
+ value is not None):
if isinstance(value, sql.ClauseElement):
value_params[col] = value
else:
are defined in [sqlalchemy.sql.expression#], specifically
[sqlalchemy.schema#Table] and [sqlalchemy.schema#Column]. Since these objects
are part of the SQL expression language, they are usable as components in SQL
-expressions. """
+expressions.
+"""
import re, inspect
from sqlalchemy import types, exc, util, databases
from sqlalchemy.sql import expression, visitors
'ForeignKeyConstraint', 'PrimaryKeyConstraint', 'CheckConstraint',
'UniqueConstraint', 'DefaultGenerator', 'Constraint', 'MetaData',
'ThreadLocalMetaData', 'SchemaVisitor', 'PassiveDefault',
- 'ColumnDefault', 'DDL']
+ 'DefaulClause', 'FetchedValue', 'ColumnDefault', 'DDL']
+
class SchemaItem(object):
"""Base class for items that define a database schema."""
__metaclass__ = expression._FigureVisitName
quote = None
-
+
def _init_items(self, *args):
"""Initialize the list of child items for this SchemaItem."""
-
+
for item in args:
if item is not None:
item._set_parent(self)
quote
Force quoting of the identifier on or off, based on `True` or
- `False`. Defaults to `None`. This flag is rarely needed,
+ `False`. Defaults to `None`. This flag is rarely needed,
as quoting is normally applied
automatically for known reserved words, as well as for
"case sensitive" identifiers. An identifier is "case sensitive"
- if it contains non-lowercase letters, otherwise it's
+ if it contains non-lowercase letters, otherwise it's
considered to be "case insensitive".
quote_schema
same as 'quote' but applies to the schema identifier.
-
+
"""
super(Table, self).__init__(name)
self.metadata = metadata
list or is given a value of None. The default expression will be
converted into a ``ColumnDefault`` object upon initialization.
+ server_default
+ Defaults to None: A FetchedValue instance, str, Unicode or
+ sqlalchemy.text() string representing the DDL DEFAULT value for
+ the column.
+
+ String types will be emitted as-is, surrounded by single quotes::
+
+ Column('x', Text, server_default="val")
+
+ x TEXT DEFAULT 'val'
+
+ A sqlalchemy.text() expression will be rendered as-is, without
+ quotes::
+
+ Column('y', DateTime, server_default=text('NOW()'))0
+
+ y DATETIME DEFAULT NOW()
+
+ Strings and text() will be converted into a ``DefaultClause``
+ object upon initialization.
+
_is_oid
Defaults to False: used internally to indicate that this column is
used as the quasi-hidden "oid" column
``INSERT`` statement execution such that they will assume primary
key values are created in this manner. If a ``Column`` has an
explicit ``ColumnDefault`` object (such as via the `default`
- keyword, or a ``Sequence`` or ``PassiveDefault``), then the value
+ keyword, or a ``Sequence`` or ``DefaultClause``), then the value
of `autoincrement` is ignored and is assumed to be False.
`autoincrement` value is only significant for a column with a type
or subtype of Integer.
quote
Force quoting of the identifier on or off, based on `True` or
- `False`. Defaults to `None`. This flag is rarely needed,
+ `False`. Defaults to `None`. This flag is rarely needed,
as quoting is normally applied
automatically for known reserved words, as well as for
"case sensitive" identifiers. An identifier is "case sensitive"
- if it contains non-lowercase letters, otherwise it's
+ if it contains non-lowercase letters, otherwise it's
considered to be "case insensitive".
"""
self.nullable = kwargs.pop('nullable', not self.primary_key)
self._is_oid = kwargs.pop('_is_oid', False)
self.default = kwargs.pop('default', None)
+ self.server_default = kwargs.pop('server_default', None)
+ self.server_onupdate = kwargs.pop('server_onupdate', None)
self.index = kwargs.pop('index', None)
self.unique = kwargs.pop('unique', None)
self.quote = kwargs.pop('quote', None)
kwarg.append('onupdate')
if self.default:
kwarg.append('default')
+ if self.server_default:
+ kwarg.append('server_default')
return "Column(%s)" % ', '.join(
[repr(self.name)] + [repr(self.type)] +
[repr(x) for x in self.foreign_keys if x is not None] +
toinit = list(self.args)
if self.default is not None:
- toinit.append(ColumnDefault(self.default))
+ if isinstance(self.default, ColumnDefault):
+ toinit.append(self.default)
+ else:
+ toinit.append(ColumnDefault(self.default))
+ if self.server_default is not None:
+ if isinstance(self.server_default, FetchedValue):
+ toinit.append(self.server_default)
+ else:
+ toinit.append(DefaultClause(self.server_default))
if self.onupdate is not None:
toinit.append(ColumnDefault(self.onupdate, for_update=True))
+ if self.server_onupdate is not None:
+ if isinstance(self.server_onupdate, FetchedValue):
+ toinit.append(self.server_default)
+ else:
+ toinit.append(DefaultClause(self.server_onupdate,
+ for_update=True))
self._init_items(*toinit)
self.args = None
"""
return Column(self.name, self.type, self.default, key = self.key, primary_key = self.primary_key, nullable = self.nullable, _is_oid = self._is_oid, quote=self.quote, index=self.index, autoincrement=self.autoincrement, *[c.copy() for c in self.constraints])
-
+
def _make_proxy(self, selectable, name=None):
"""Create a *proxy* for this column.
def __repr__(self):
return "DefaultGenerator()"
-class PassiveDefault(DefaultGenerator):
- """A default that takes effect on the database side."""
-
- def __init__(self, arg, **kwargs):
- super(PassiveDefault, self).__init__(**kwargs)
- self.arg = arg
-
- def __repr__(self):
- return "PassiveDefault(%s)" % repr(self.arg)
class ColumnDefault(DefaultGenerator):
"""A plain default value on a column.
def __init__(self, arg, **kwargs):
super(ColumnDefault, self).__init__(**kwargs)
+ if isinstance(arg, FetchedValue):
+ raise exc.ArgumentError(
+ "ColumnDefault may not be a server-side default type.")
if callable(arg):
arg = self._maybe_wrap_callable(arg)
self.arg = arg
bind = _bind_or_error(self)
bind.drop(self, checkfirst=checkfirst)
+
+class FetchedValue(object):
+ """A default that takes effect on the database side."""
+
+ def __init__(self, for_update=False):
+ self.for_update = for_update
+
+ def _set_parent(self, column):
+ self.column = column
+ if self.for_update:
+ self.column.server_onupdate = self
+ else:
+ self.column.server_default = self
+
+ def __repr__(self):
+ return 'FetchedValue(for_update=%r)' % self.for_update
+
+
+class DefaultClause(FetchedValue):
+ """A DDL-specified DEFAULT column value."""
+
+ def __init__(self, arg, for_update=False):
+ util.assert_arg_type(arg, (basestring,
+ expression.ClauseElement,
+ expression._TextClause), 'arg')
+ super(DefaultClause, self).__init__(for_update)
+ self.arg = arg
+
+ def __repr__(self):
+ return "DefaultClause(%r, for_update=%r)" % (self.arg, self.for_update)
+
+# alias; deprecated starting 0.5.0
+PassiveDefault = DefaultClause
+
+
class Constraint(SchemaItem):
"""A table-level SQL constraint, such as a KEY.
def __contains__(self, x):
return x in self.columns
-
+
def contains_column(self, col):
return self.columns.contains_column(col)
-
+
def keys(self):
return self.columns.keys()
else:
values.append((c, create_bind_param(c, None)))
self.prefetch.append(c)
- elif isinstance(c.default, schema.PassiveDefault):
+ elif c.server_default is not None:
if not c.primary_key:
self.postfetch.append(c)
elif isinstance(c.default, schema.Sequence):
else:
values.append((c, create_bind_param(c, None)))
self.prefetch.append(c)
- elif isinstance(c.onupdate, schema.PassiveDefault):
+ elif c.server_onupdate is not None:
+ self.postfetch.append(c)
+ # deprecated? or remove?
+ elif isinstance(c.onupdate, schema.FetchedValue):
self.postfetch.append(c)
return values
return ''
def get_column_default_string(self, column):
- if isinstance(column.default, schema.PassiveDefault):
- if isinstance(column.default.arg, basestring):
- return "'%s'" % column.default.arg
+ if isinstance(column.server_default, schema.DefaultClause):
+ if isinstance(column.server_default.arg, basestring):
+ return "'%s'" % column.server_default.arg
else:
- return unicode(self._compile(column.default.arg, None))
+ return unicode(self._compile(column.server_default.arg, None))
else:
return None
([mysql.MSTimeStamp],
'TIMESTAMP'),
([mysql.MSTimeStamp,
- PassiveDefault(sql.text('CURRENT_TIMESTAMP'))],
+ DefaultClause(sql.text('CURRENT_TIMESTAMP'))],
"TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
([mysql.MSTimeStamp,
- PassiveDefault(sql.text("'1999-09-09 09:09:09'"))],
+ DefaultClause(sql.text("'1999-09-09 09:09:09'"))],
"TIMESTAMP DEFAULT '1999-09-09 09:09:09'"),
([mysql.MSTimeStamp,
- PassiveDefault(sql.text("'1999-09-09 09:09:09' "
+ DefaultClause(sql.text("'1999-09-09 09:09:09' "
"ON UPDATE CURRENT_TIMESTAMP"))],
"TIMESTAMP DEFAULT '1999-09-09 09:09:09' "
"ON UPDATE CURRENT_TIMESTAMP"),
([mysql.MSTimeStamp,
- PassiveDefault(sql.text("CURRENT_TIMESTAMP "
+ DefaultClause(sql.text("CURRENT_TIMESTAMP "
"ON UPDATE CURRENT_TIMESTAMP"))],
"TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
"ON UPDATE CURRENT_TIMESTAMP"),
"""Test reflection of column defaults."""
def_table = Table('mysql_def', MetaData(testing.db),
- Column('c1', String(10), PassiveDefault('')),
- Column('c2', String(10), PassiveDefault('0')),
- Column('c3', String(10), PassiveDefault('abc')))
+ Column('c1', String(10), DefaultClause('')),
+ Column('c2', String(10), DefaultClause('0')),
+ Column('c3', String(10), DefaultClause('abc')))
try:
def_table.create()
reflected = Table('mysql_def', MetaData(testing.db),
autoload=True)
for t in def_table, reflected:
- assert t.c.c1.default.arg == ''
- assert t.c.c2.default.arg == '0'
- assert t.c.c3.default.arg == 'abc'
+ assert t.c.c1.server_default.arg == ''
+ assert t.c.c2.server_default.arg == '0'
+ assert t.c.c3.server_default.arg == 'abc'
finally:
def_table.drop()
try:
Table('ai_1', meta,
Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, PassiveDefault('0'),
+ Column('int_n', Integer, DefaultClause('0'),
primary_key=True))
Table('ai_2', meta,
Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, PassiveDefault('0'),
+ Column('int_n', Integer, DefaultClause('0'),
primary_key=True))
Table('ai_3', meta,
- Column('int_n', Integer, PassiveDefault('0'),
+ Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
Column('int_y', Integer, primary_key=True))
Table('ai_4', meta,
- Column('int_n', Integer, PassiveDefault('0'),
+ Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False),
- Column('int_n2', Integer, PassiveDefault('0'),
+ Column('int_n2', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False))
Table('ai_5', meta,
Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, PassiveDefault('0'),
+ Column('int_n', Integer, DefaultClause('0'),
primary_key=True, autoincrement=False))
Table('ai_6', meta,
- Column('o1', String(1), PassiveDefault('x'),
+ Column('o1', String(1), DefaultClause('x'),
primary_key=True),
Column('int_y', Integer, primary_key=True))
Table('ai_7', meta,
- Column('o1', String(1), PassiveDefault('x'),
+ Column('o1', String(1), DefaultClause('x'),
primary_key=True),
- Column('o2', String(1), PassiveDefault('x'),
+ Column('o2', String(1), DefaultClause('x'),
primary_key=True),
Column('int_y', Integer, primary_key=True))
Table('ai_8', meta,
- Column('o1', String(1), PassiveDefault('x'),
+ Column('o1', String(1), DefaultClause('x'),
primary_key=True),
- Column('o2', String(1), PassiveDefault('x'),
+ Column('o2', String(1), DefaultClause('x'),
primary_key=True))
meta.create_all()
def test_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
- self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
+ self.assertEquals(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
def test_table_is_reflected_alt_schema(self):
def test_schema_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
- self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
def test_crosschema_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('crosschema', metadata, autoload=True)
- self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
class MiscTest(TestBase, AssertsExecutionResults):
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
-
+
def test_schema_roundtrips(self):
meta = MetaData(testing.db)
- users = Table('users', meta,
+ users = Table('users', meta,
Column('id', Integer, primary_key=True),
Column('name', String(50)), schema='alt_schema')
users.create()
users.insert().execute(id=2, name='name2')
users.insert().execute(id=3, name='name3')
users.insert().execute(id=4, name='name4')
-
+
self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
-
+
users.delete().where(users.c.id==3).execute()
self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), [])
-
+
users.update().where(users.c.name=='name4').execute(name='newname')
self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
-
+
finally:
users.drop()
-
+
def test_preexecute_passivedefault(self):
"""test that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
- that PassiveDefault upon insert."""
+ that DefaultClause upon insert."""
try:
meta = MetaData(testing.db)
mapper(Foo, footable)
metadata.create_all()
sess = create_session()
-
+
foo = Foo()
foo.id = 1
foo.intarr = [1,2,3]
sess.clear()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3])
-
+
foo.intarr.append(4)
sess.flush()
sess.clear()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3,4])
-
+
foo.intarr = []
sess.flush()
sess.clear()
self.assertEquals(foo.intarr, [])
-
+
foo.intarr = None
sess.flush()
sess.clear()
sess.save(foo)
sess.flush()
-class TimeStampTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
- def test_timestamp(self):
+class TimeStampTest(TestBase, AssertsExecutionResults):
+ __only_on__ = 'postgres'
+ def test_timestamp(self):
engine = testing.db
- connection = engine.connect()
- s = select([func.TIMESTAMP("12/25/07").label("ts")])
- result = connection.execute(s).fetchone()
- self.assertEqual(result[0], datetime.datetime(2007, 12, 25, 0, 0))
+ connection = engine.connect()
+ s = select([func.TIMESTAMP("12/25/07").label("ts")])
+ result = connection.execute(s).fetchone()
+ self.assertEqual(result[0], datetime.datetime(2007, 12, 25, 0, 0))
class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
-
+
def setUpAll(self):
global ss_engine
ss_engine = engines.testing_engine(options={'server_side_cursors':True})
-
+
def tearDownAll(self):
ss_engine.dispose()
-
+
def test_roundtrip(self):
test_table = Table('test_table', MetaData(ss_engine),
Column('id', Integer, primary_key=True),
test_table.create(checkfirst=True)
try:
test_table.insert().execute(data='data1')
-
+
nextid = ss_engine.execute(Sequence('test_table_id_seq'))
test_table.insert().execute(id=nextid, data='data2')
-
+
self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2')])
-
+
test_table.update().where(test_table.c.id==2).values(data=test_table.c.data + ' updated').execute()
self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')])
test_table.delete().execute()
self.assertEquals(test_table.count().scalar(), 0)
finally:
test_table.drop(checkfirst=True)
-
-
+
+
if __name__ == "__main__":
testenv.main()
self._test_empty_insert,
Table('c', MetaData(testing.db),
Column('x', Integer, primary_key=True),
- Column('y', Integer, PassiveDefault('123'),
+ Column('y', Integer, DefaultClause('123'),
primary_key=True)))
@testing.exclude('sqlite', '<', (3, 4), 'no database support')
self._test_empty_insert(
Table('d', MetaData(testing.db),
Column('x', Integer, primary_key=True),
- Column('y', Integer, PassiveDefault('123'))))
+ Column('y', Integer, DefaultClause('123'))))
@testing.exclude('sqlite', '<', (3, 4), 'no database support')
def test_empty_insert_nopk1(self):
Column('test6', sa.DateTime, nullable=False),
Column('test7', sa.Text),
Column('test8', sa.Binary),
- Column('test_passivedefault2', sa.Integer, sa.PassiveDefault("5")),
+ Column('test_passivedefault2', sa.Integer, server_default='5'),
Column('test9', sa.Binary(100)),
Column('test_numeric', sa.Numeric()),
test_needs_fk=True,
def define_tables(self, metadata):
# determine a literal value for "false" based on the dialect
- # FIXME: this PassiveDefault setup is bogus.
+ # FIXME: this DefaultClause setup is bogus.
dialect = testing.db.dialect
bp = sa.Boolean().dialect_impl(dialect).bind_processor(dialect)
primary_key=True, nullable=False),
Column('owner_id', Integer, ForeignKey('owners.id'),
primary_key=True, nullable=False),
- Column('someoption', sa.Boolean, sa.PassiveDefault(false),
+ Column('someoption', sa.Boolean, server_default=false,
nullable=False))
def setup_classes(self):
dt = Table('default_t', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
- Column('hoho', hohotype, sa.PassiveDefault(str(hohoval))),
+ Column('hoho', hohotype, server_default=str(hohoval)),
Column('counter', Integer, default=sa.func.length("1234567")),
Column('foober', String(30), default="im foober",
onupdate="im the update"))
@testing.resolve_artifact_names
def test_insert_nopostfetch(self):
- # populates the PassiveDefaults explicitly so there is no
+ # populates from the FetchValues explicitly so there is no
# "post-update"
mapper(Hoho, default_t)
Column('id', Integer, primary_key=True),
Column('item_id', Integer, ForeignKey('items.id'),
nullable=False),
- Column('name', String(100), PassiveDefault('no name')),
+ Column('name', String(100), server_default='no name'),
test_needs_acid=True)
customers = Table('customers', metadata,
Column('id', Integer, primary_key=True),
from sqlalchemy import *
from sqlalchemy import exc, schema, util
from sqlalchemy.orm import mapper, create_session
+from testlib import sa, testing
+from testlib.testing import eq_
from testlib import *
use_function_defaults = testing.against('postgres', 'oracle')
is_oracle = testing.against('oracle')
- # select "count(1)" returns different results on different DBs
- # also correct for "current_date" compatible as column default, value differences
+ # select "count(1)" returns different results on different DBs also
+ # correct for "current_date" compatible as column default, value
+ # differences
currenttime = func.current_date(type_=Date, bind=db)
if is_oracle:
t = Table('default_test1', metadata,
# python function
- Column('col1', Integer, primary_key=True, default=mydefault),
+ Column('col1', Integer, primary_key=True,
+ default=mydefault),
# python literal
- Column('col2', String(20), default="imthedefault", onupdate="im the update"),
+ Column('col2', String(20),
+ default="imthedefault",
+ onupdate="im the update"),
# preexecute expression
- Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')),
+ Column('col3', Integer,
+ default=func.length('abcdef'),
+ onupdate=func.length('abcdefghijk')),
# SQL-side default from sql expression
- Column('col4', deftype, PassiveDefault(def1)),
+ Column('col4', deftype,
+ server_default=def1),
# SQL-side default from literal expression
- Column('col5', deftype, PassiveDefault(def2)),
+ Column('col5', deftype,
+ server_default=def2),
# preexecute + update timestamp
- Column('col6', Date, default=currenttime, onupdate=currenttime),
+ Column('col6', Date,
+ default=currenttime,
+ onupdate=currenttime),
Column('boolcol1', Boolean, default=True),
Column('boolcol2', Boolean, default=False),
# python function which uses ExecutionContext
- Column('col7', Integer, default=mydefault_using_connection, onupdate=myupdate_with_ctx),
+ Column('col7', Integer,
+ default=mydefault_using_connection,
+ onupdate=myupdate_with_ctx),
# python builtin
- Column('col8', Date, default=datetime.date.today, onupdate=datetime.date.today)
- )
+ Column('col8', Date,
+ default=datetime.date.today,
+ onupdate=datetime.date.today),
+ # combo
+ Column('col9', String(20),
+ default='py',
+ server_default='ddl'))
t.create()
def tearDownAll(self):
self.assert_(z == f)
self.assert_(f2==11)
- def testinsert(self):
+ def test_py_vs_server_default_detection(self):
+
+ def has_(name, *wanted):
+ slots = ['default', 'onupdate', 'server_default', 'server_onupdate']
+ col = tbl.c[name]
+ for slot in wanted:
+ slots.remove(slot)
+ assert getattr(col, slot) is not None, getattr(col, slot)
+ for slot in slots:
+ assert getattr(col, slot) is None, getattr(col, slot)
+
+ tbl = t
+ has_('col1', 'default')
+ has_('col2', 'default', 'onupdate')
+ has_('col3', 'default', 'onupdate')
+ has_('col4', 'server_default')
+ has_('col5', 'server_default')
+ has_('col6', 'default', 'onupdate')
+ has_('boolcol1', 'default')
+ has_('boolcol2', 'default')
+ has_('col7', 'default', 'onupdate')
+ has_('col8', 'default', 'onupdate')
+ has_('col9', 'default', 'server_default')
+
+ t2 = Table('t2', MetaData(),
+ Column('col1', Integer, Sequence('foo')),
+ Column('col2', Integer,
+ default=Sequence('foo'),
+ server_default='y'),
+ Column('col3', Integer,
+ Sequence('foo'),
+ server_default='x'),
+ Column('col4', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y')),
+ Column('col4', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y'),
+ DefaultClause('y', for_update=True)),
+ Column('col5', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y'),
+ onupdate='z'),
+ Column('col6', Integer,
+ ColumnDefault('x'),
+ server_default='y',
+ onupdate='z'),
+ Column('col7', Integer,
+ default='x',
+ server_default='y',
+ onupdate='z'),
+ Column('col8', Integer,
+ server_onupdate='u',
+ default='x',
+ server_default='y',
+ onupdate='z'))
+ tbl = t2
+ has_('col1', 'default')
+ has_('col2', 'default', 'server_default')
+ has_('col3', 'default', 'server_default')
+ has_('col4', 'default', 'server_default', 'server_onupdate')
+ has_('col5', 'default', 'server_default', 'onupdate')
+ has_('col6', 'default', 'server_default', 'onupdate')
+ has_('col7', 'default', 'server_default', 'onupdate')
+ has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
+
+ def test_insert(self):
r = t.insert().execute()
assert r.lastrow_has_defaults()
- assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])
+ eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
r = t.insert(inline=True).execute()
assert r.lastrow_has_defaults()
- assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])
+ eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
- t.insert().execute()
t.insert().execute()
ctexec = select([currenttime.label('now')], bind=testing.db).scalar()
l = t.select().execute()
today = datetime.date.today()
- self.assertEquals(l.fetchall(), [
- (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
- (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
- (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
- (54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
- ])
-
- def testinsertmany(self):
+ eq_(l.fetchall(), [
+ (x, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')
+ for x in range(51, 54)])
+
+ t.insert().execute(col9=None)
+ assert r.lastrow_has_defaults()
+ eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+
+ eq_(t.select(t.c.col1==54).execute().fetchall(),
+ [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)])
+
+ def test_insertmany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
ctexec = currenttime.scalar()
l = t.select().execute()
today = datetime.date.today()
- self.assert_(l.fetchall() == [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today)])
+ eq_(l.fetchall(),
+ [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')])
- def testinsertvalues(self):
+ def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
self.assert_(l.fetchone()['col3'] == 50)
- def testupdatemany(self):
+ def test_updatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
l = t.select().execute()
ctexec = currenttime.scalar()
today = datetime.date.today()
- self.assert_(l.fetchall() == [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today), (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today), (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today)])
+ eq_(l.fetchall(),
+ [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today, 'py'),
+ (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py'),
+ (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py')])
def testupdate(self):
r = t.insert().execute()
ctexec = currenttime.scalar()
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
- self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today()))
+ self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py'))
self.assert_(f2==11)
def testupdatevalues(self):
@testing.fails_on_everything_except('postgres')
def testpassiveoverride(self):
- """primarily for postgres, tests that when we get a primary key column back
- from reflecting a table which has a default value on it, we pre-execute
- that PassiveDefault upon insert, even though PassiveDefault says
- "let the database execute this", because in postgres we must have all the primary
- key values in memory before insert; otherwise we cant locate the just inserted row."""
-
+ """
+ Primarily for postgres, tests that when we get a primary key column
+ back from reflecting a table which has a default value on it, we
+ pre-execute that DefaultClause upon insert, even though DefaultClause
+ says "let the database execute this", because in postgres we must have
+ all the primary key values in memory before insert; otherwise we can't
+ locate the just inserted row.
+
+ """
try:
meta = MetaData(testing.db)
testing.db.execute("""
Table("t2", metadata,
Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
- Column('bar', String(30), PassiveDefault('hi'))
+ Column('bar', String(30), server_default='hi')
),
{'foo':'hi'},
{'id':1, 'foo':'hi', 'bar':'hi'}
Table("t4", metadata,
Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
Column('foo', String(30), primary_key=True),
- Column('bar', String(30), PassiveDefault('hi'))
+ Column('bar', String(30), server_default='hi')
),
{'foo':'hi', 'id':1},
{'id':1, 'foo':'hi', 'bar':'hi'}
{'unsupported':[]},
Table("t5", metadata,
Column('id', String(10), primary_key=True),
- Column('bar', String(30), PassiveDefault('hi'))
+ Column('bar', String(30), server_default='hi')
),
{'id':'id1'},
{'id':'id1', 'bar':'hi'},
unsupported('mysql', 'not supported by database'),
unsupported('mssql', 'not supported by database'),
)
+
+def row_triggers(fn):
+ """Target must support standard statement-running EACH ROW triggers."""
+ return _chain_decorators_on(
+ fn,
+ # no access to same table
+ exclude('mysql', '<', (5, 0, 10), 'not supported by database'),
+ unsupported('postgres', 'not supported by database: no statements'),
+ )
self.assertEquals(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]))
if c.default:
- assert isinstance(reflected_c.default, schema.PassiveDefault)
+ assert isinstance(reflected_c.server_default,
+ schema.FetchedValue)
elif against(('mysql', '<', (5, 0))):
- # ignore reflection of bogus db-generated PassiveDefault()
+ # ignore reflection of bogus db-generated DefaultClause()
pass
elif not c.primary_key or not against('postgres'):
print repr(c)