"""Support for the PostgreSQL database.
-For information on connecting using specific drivers, see the documentation section
-regarding that driver.
+For information on connecting using specific drivers, see the documentation
+section regarding that driver.
Sequences/SERIAL
----------------
-PostgreSQL supports sequences, and SQLAlchemy uses these as the default means of creating
-new primary key values for integer-based primary key columns. When creating tables,
-SQLAlchemy will issue the ``SERIAL`` datatype for integer-based primary key columns,
-which generates a sequence corresponding to the column and associated with it based on
-a naming convention.
+PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
+of creating new primary key values for integer-based primary key columns. When
+creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
+integer-based primary key columns, which generates a sequence and server side
+default corresponding to the column.
-To specify a specific named sequence to be used for primary key generation, use the
-:func:`~sqlalchemy.schema.Sequence` construct::
+To specify a specific named sequence to be used for primary key generation,
+use the :func:`~sqlalchemy.schema.Sequence` construct::
Table('sometable', metadata,
Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
)
-Currently, when SQLAlchemy issues a single insert statement, to fulfill the contract of
-having the "last insert identifier" available, the sequence is executed independently
-beforehand and the new value is retrieved, to be used in the subsequent insert. Note
-that when an :func:`~sqlalchemy.sql.expression.insert()` construct is executed using
-"executemany" semantics, the sequence is not pre-executed and normal PG SERIAL behavior
-is used.
-
-PostgreSQL 8.2 supports an ``INSERT...RETURNING`` syntax which SQLAlchemy supports
-as well. A future release of SQLA will use this feature by default in lieu of
-sequence pre-execution in order to retrieve new primary key values, when available.
+When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
+having the "last insert identifier" available, a RETURNING clause is added to
+the INSERT statement which specifies the primary key columns should be
+returned after the statement completes. The RETURNING functionality only takes
+place if Postgresql 8.2 or later is in use. As a fallback approach, the
+sequence, whether specified explicitly or implicitly via ``SERIAL``, is
+executed independently beforehand, the returned value to be used in the
+subsequent insert. Note that when an
+:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
+"executemany" semantics, the "last inserted identifier" functionality does not
+apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
+case.
+
+To force the usage of RETURNING by default off, specify the flag
+``implicit_returning=False`` to :func:`create_engine`.
Transaction Isolation Level
---------------------------
-:func:`create_engine` accepts an ``isolation_level`` parameter which results in
-the command ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL <level>``
-being invoked for every new connection. Valid values for this parameter are
-``READ_COMMITTED``, ``READ_UNCOMMITTED``, ``REPEATABLE_READ``, and ``SERIALIZABLE``.
+:func:`create_engine` accepts an ``isolation_level`` parameter which results
+in the command ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL
+<level>`` being invoked for every new connection. Valid values for this
+parameter are ``READ_COMMITTED``, ``READ_UNCOMMITTED``, ``REPEATABLE_READ``,
+and ``SERIALIZABLE``.
INSERT/UPDATE...RETURNING
-------------------------
-The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and ``DELETE..RETURNING`` syntaxes,
-but must be explicitly enabled on a per-statement basis::
+The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
+``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
+for single-row INSERT statements in order to fetch newly generated
+primary key identifiers. To specify an explicit ``RETURNING`` clause,
+use the :meth:`_UpdateBase.returning` method on a per-statement basis::
# INSERT..RETURNING
result = table.insert().returning(table.c.col1, table.c.col2).\\
Arguments are:
- :param item_type: The data type of items of this array. Note that dimensionality is
- irrelevant here, so multi-dimensional arrays like ``INTEGER[][]``, are constructed as
- ``ARRAY(Integer)``, not as ``ARRAY(ARRAY(Integer))`` or such. The type mapping figures
- out on the fly
+ :param item_type: The data type of items of this array. Note that
+ dimensionality is irrelevant here, so multi-dimensional arrays like
+ ``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as
+ ``ARRAY(ARRAY(Integer))`` or such. The type mapping figures out on
+ the fly
- :param mutable: Defaults to True: specify whether lists passed to this class should be
- considered mutable. If so, generic copy operations (typically used by the ORM) will
- shallow-copy values.
+ :param mutable: Defaults to True: specify whether lists passed to this
+ class should be considered mutable. If so, generic copy operations
+ (typically used by the ORM) will shallow-copy values.
"""
if isinstance(item_type, ARRAY):
if not bind.dialect.supports_native_enum:
return
- if not checkfirst or not bind.dialect.has_type(bind, self.name, schema=self.schema):
+ if not checkfirst or \
+ not bind.dialect.has_type(bind, self.name, schema=self.schema):
bind.execute(CreateEnumType(self))
def drop(self, bind=None, checkfirst=True):
if not bind.dialect.supports_native_enum:
return
- if not checkfirst or bind.dialect.has_type(bind, self.name, schema=self.schema):
+ if not checkfirst or \
+ bind.dialect.has_type(bind, self.name, schema=self.schema):
bind.execute(DropEnumType(self))
def _on_table_create(self, event, target, bind, **kw):
class PGCompiler(compiler.SQLCompiler):
def visit_match_op(self, binary, **kw):
- return "%s @@ to_tsquery(%s)" % (self.process(binary.left), self.process(binary.right))
+ return "%s @@ to_tsquery(%s)" % (
+ self.process(binary.left),
+ self.process(binary.right))
def visit_ilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s ILIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s ILIKE %s' % \
+ (self.process(binary.left), self.process(binary.right)) \
+ (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
def visit_notilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s NOT ILIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s NOT ILIKE %s' % \
+ (self.process(binary.left), self.process(binary.right)) \
+ (escape and
(' ESCAPE ' + self.render_literal_value(escape, None))
or '')
return "DISTINCT "
elif isinstance(select._distinct, (list, tuple)):
return "DISTINCT ON (" + ', '.join(
- [(isinstance(col, basestring) and col or self.process(col)) for col in select._distinct]
+ [(isinstance(col, basestring) and col
+ or self.process(col)) for col in select._distinct]
)+ ") "
else:
return "DISTINCT ON (" + unicode(select._distinct) + ") "
column.autoincrement and \
isinstance(column.type, sqltypes.Integer) and \
not isinstance(column.type, sqltypes.SmallInteger) and \
- (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
+ (column.default is None or
+ (isinstance(column.default, schema.Sequence) and
+ column.default.optional)):
if isinstance(column.type, sqltypes.BigInteger):
colspec += " BIGSERIAL"
else:
if index.unique:
text += "UNIQUE "
text += "INDEX %s ON %s (%s)" \
- % (preparer.quote(self._validate_identifier(index.name, True), index.quote),
- preparer.format_table(index.table),
- ', '.join([preparer.format_column(c) for c in index.columns]))
+ % (preparer.quote(
+ self._validate_identifier(index.name, True), index.quote),
+ preparer.format_table(index.table),
+ ', '.join([preparer.format_column(c)
+ for c in index.columns]))
if "postgres_where" in index.kwargs:
whereclause = index.kwargs['postgres_where']
- util.warn_deprecated("The 'postgres_where' argument has been renamed to 'postgresql_where'.")
+ util.warn_deprecated(
+ "The 'postgres_where' argument has been renamed "
+ "to 'postgresql_where'.")
elif 'postgresql_where' in index.kwargs:
whereclause = index.kwargs['postgresql_where']
else:
def visit_TIMESTAMP(self, type_):
return "TIMESTAMP%s %s" % (
- getattr(type_, 'precision', None) and "(%d)" % type_.precision or "",
+ getattr(type_, 'precision', None) and "(%d)" %
+ type_.precision or "",
(type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
)
def visit_TIME(self, type_):
return "TIME%s %s" % (
- getattr(type_, 'precision', None) and "(%d)" % type_.precision or "",
+ getattr(type_, 'precision', None) and "(%d)" %
+ type_.precision or "",
(type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
)
class PGIdentifierPreparer(compiler.IdentifierPreparer):
def _unquote_identifier(self, value):
if value[0] == self.initial_quote:
- value = value[1:-1].replace(self.escape_to_quote, self.escape_quote)
+ value = value[1:-1].\
+ replace(self.escape_to_quote, self.escape_quote)
return value
def format_type(self, type_, use_schema=True):
def fire_sequence(self, seq):
if not seq.optional:
return self._execute_scalar(("select nextval('%s')" % \
- self.dialect.identifier_preparer.format_sequence(seq)))
+ self.dialect.identifier_preparer.format_sequence(seq)))
else:
return None
column.server_default.arg is not None):
# pre-execute passive defaults on primary key columns
- return self._execute_scalar("select %s" % column.server_default.arg)
+ return self._execute_scalar("select %s" %
+ column.server_default.arg)
elif column is column.table._autoincrement_column \
- and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
+ and (column.default is None or
+ (isinstance(column.default, schema.Sequence) and
+ column.default.optional)):
- # execute the sequence associated with a SERIAL primary key column.
- # for non-primary-key SERIAL, the ID just generates server side.
+ # execute the sequence associated with a SERIAL primary
+ # key column. for non-primary-key SERIAL, the ID just
+ # generates server side.
sch = column.table.schema
if sch is not None:
- exc = "select nextval('\"%s\".\"%s_%s_seq\"')" % (sch, column.table.name, column.name)
+ exc = "select nextval('\"%s\".\"%s_%s_seq\"')" % \
+ (sch, column.table.name, column.name)
else:
- exc = "select nextval('\"%s_%s_seq\"')" % (column.table.name, column.name)
+ exc = "select nextval('\"%s_%s_seq\"')" % \
+ (column.table.name, column.name)
return self._execute_scalar(exc)
def initialize(self, connection):
super(PGDialect, self).initialize(connection)
self.implicit_returning = self.server_version_info > (8, 2) and \
- self.__dict__.get('implicit_returning', True)
+ self.__dict__.get('implicit_returning', True)
self.supports_native_enum = self.server_version_info >= (8, 3)
if not self.supports_native_enum:
self.colspecs = self.colspecs.copy()
if self.isolation_level is not None:
def connect(conn):
cursor = conn.cursor()
- cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL %s"
- % self.isolation_level)
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION "
+ "ISOLATION LEVEL %s" % self.isolation_level)
cursor.execute("COMMIT")
cursor.close()
return connect
def do_prepare_twophase(self, connection, xid):
connection.execute("PREPARE TRANSACTION '%s'" % xid)
- def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
+ def do_rollback_twophase(self, connection, xid,
+ is_prepared=True, recover=False):
if is_prepared:
if recover:
- #FIXME: ugly hack to get out of transaction context when commiting recoverable transactions
- # Must find out a way how to make the dbapi not open a transaction.
+ #FIXME: ugly hack to get out of transaction
+ # context when commiting recoverable transactions
+ # Must find out a way how to make the dbapi not
+ # open a transaction.
connection.execute("ROLLBACK")
connection.execute("ROLLBACK PREPARED '%s'" % xid)
connection.execute("BEGIN")
else:
self.do_rollback(connection.connection)
- def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):
+ def do_commit_twophase(self, connection, xid,
+ is_prepared=True, recover=False):
if is_prepared:
if recover:
connection.execute("ROLLBACK")
self.do_commit(connection.connection)
def do_recover_twophase(self, connection):
- resultset = connection.execute(sql.text("SELECT gid FROM pg_prepared_xacts"))
+ resultset = connection.execute(
+ sql.text("SELECT gid FROM pg_prepared_xacts"))
return [row[0] for row in resultset]
def _get_default_schema_name(self, connection):
# seems like case gets folded in pg_class...
if schema is None:
cursor = connection.execute(
- sql.text("select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where n.nspname=current_schema() and "
- "lower(relname)=:name",
- bindparams=[
- sql.bindparam('name', unicode(table_name.lower()),
- type_=sqltypes.Unicode)]
+ sql.text(
+ "select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where n.nspname=current_schema() and "
+ "lower(relname)=:name",
+ bindparams=[
+ sql.bindparam('name', unicode(table_name.lower()),
+ type_=sqltypes.Unicode)]
)
)
else:
cursor = connection.execute(
- sql.text("select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where n.nspname=:schema and lower(relname)=:name",
+ sql.text(
+ "select relname from pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where n.nspname=:schema and "
+ "lower(relname)=:name",
bindparams=[
- sql.bindparam('name', unicode(table_name.lower()), type_=sqltypes.Unicode),
- sql.bindparam('schema', unicode(schema), type_=sqltypes.Unicode)]
+ sql.bindparam('name',
+ unicode(table_name.lower()), type_=sqltypes.Unicode),
+ sql.bindparam('schema',
+ unicode(schema), type_=sqltypes.Unicode)]
)
)
return bool(cursor.first())
def has_sequence(self, connection, sequence_name, schema=None):
if schema is None:
cursor = connection.execute(
- sql.text("SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and n.nspname=current_schema()"
- " and lower(relname)=:name",
- bindparams=[
- sql.bindparam('name', unicode(sequence_name.lower()),
- type_=sqltypes.Unicode)
- ]
- )
- )
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=current_schema() "
+ "and lower(relname)=:name",
+ bindparams=[
+ sql.bindparam('name', unicode(sequence_name.lower()),
+ type_=sqltypes.Unicode)
+ ]
+ )
+ )
else:
cursor = connection.execute(
- sql.text("SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and n.nspname=:schema and "
- "lower(relname)=:name",
- bindparams=[
- sql.bindparam('name', unicode(sequence_name.lower()),
- type_=sqltypes.Unicode),
- sql.bindparam('schema', unicode(schema), type_=sqltypes.Unicode)
- ]
- )
- )
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=:schema and lower(relname)=:name",
+ bindparams=[
+ sql.bindparam('name', unicode(sequence_name.lower()),
+ type_=sqltypes.Unicode),
+ sql.bindparam('schema',
+ unicode(schema), type_=sqltypes.Unicode)
+ ]
+ )
+ )
return bool(cursor.first())
v = connection.execute("select version()").scalar()
m = re.match('PostgreSQL (\d+)\.(\d+)(?:\.(\d+))?(?:devel)?', v)
if not m:
- raise AssertionError("Could not determine version from string '%s'" % v)
+ raise AssertionError(
+ "Could not determine version from string '%s'" % v)
return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
@reflection.cache
# Py3K
#view_names = [row[0] for row in connection.execute(s)]
# Py2K
- view_names = [row[0].decode(self.encoding) for row in connection.execute(s)]
+ view_names = [row[0].decode(self.encoding)
+ for row in connection.execute(s)]
# end Py2K
return view_names
SQL_COLS = """
SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod),
- (SELECT substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid) for 128)
+ (SELECT substring(pg_catalog.pg_get_expr(d.adbin, d.adrelid)
+ for 128)
FROM pg_catalog.pg_attrdef d
- WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
+ WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
+ AND a.atthasdef)
AS DEFAULT,
a.attnotnull, a.attnum, a.attrelid as table_oid
FROM pg_catalog.pg_attribute a
# format columns
columns = []
for name, format_type, default, notnull, attnum, table_oid in rows:
- ## strip (5) from character varying(5), timestamp(5) with time zone, etc
+ ## strip (5) from character varying(5), timestamp(5)
+ # with time zone, etc
attype = re.sub(r'\([\d,]+\)', '', format_type)
# strip '[]' from integer[], etc.
args = (53, )
elif attype == 'integer':
args = (32, 0)
- elif attype in ('timestamp with time zone', 'time with time zone'):
+ elif attype in ('timestamp with time zone',
+ 'time with time zone'):
kwargs['timezone'] = True
if charlen:
kwargs['precision'] = int(charlen)
args = ()
- elif attype in ('timestamp without time zone', 'time without time zone', 'time'):
+ elif attype in ('timestamp without time zone',
+ 'time without time zone', 'time'):
kwargs['timezone'] = False
if charlen:
kwargs['precision'] = int(charlen)
args = ()
- elif attype in ('interval','interval year to month','interval day to second'):
+ elif attype in ('interval','interval year to month',
+ 'interval day to second'):
if charlen:
kwargs['precision'] = int(charlen)
args = ()
# A table can't override whether the domain is nullable.
nullable = domain['nullable']
if domain['default'] and not default:
- # It can, however, override the default value, but can't set it to null.
+ # It can, however, override the default
+ # value, but can't set it to null.
default = domain['default']
coltype = self.ischema_names[domain['attype']]
else:
sch = schema
if '.' not in match.group(2) and sch is not None:
# unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules / "quote schema"
- default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3)
+ # later be enhanced to obey quoting rules /
+ # "quote schema"
+ default = match.group(1) + \
+ ('"%s"' % sch) + '.' + \
+ match.group(2) + match.group(3)
column_info = dict(name=name, type=coltype, nullable=nullable,
default=default, autoincrement=autoincrement)
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- cols = self.get_primary_keys(connection, table_name, schema=schema, **kw)
+ cols = self.get_primary_keys(connection, table_name,
+ schema=schema, **kw)
table_oid = self.get_table_oid(connection, table_name, schema,
info_cache=kw.get('info_cache'))
ORDER BY 1
"""
- t = sql.text(FK_SQL, typemap={'conname':sqltypes.Unicode, 'condef':sqltypes.Unicode})
+ t = sql.text(FK_SQL, typemap={
+ 'conname':sqltypes.Unicode,
+ 'condef':sqltypes.Unicode})
c = connection.execute(t, table=table_oid)
fkeys = []
for conname, condef in c.fetchall():
- m = re.search('FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
- (constrained_columns, referred_schema, referred_table, referred_columns) = m
- constrained_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s*', constrained_columns)]
+ m = re.search('FOREIGN KEY \((.*?)\) REFERENCES '
+ '(?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
+ constrained_columns, referred_schema, \
+ referred_table, referred_columns = m
+ constrained_columns = [preparer._unquote_identifier(x)
+ for x in re.split(r'\s*,\s*', constrained_columns)]
if referred_schema:
- referred_schema = preparer._unquote_identifier(referred_schema)
+ referred_schema =\
+ preparer._unquote_identifier(referred_schema)
elif schema is not None and schema == self.default_schema_name:
# no schema (i.e. its the default schema), and the table we're
# reflecting has the default schema explicit, then use that.
# i.e. try to use the user's conventions
referred_schema = schema
referred_table = preparer._unquote_identifier(referred_table)
- referred_columns = [preparer._unquote_identifier(x) for x in re.split(r'\s*,\s', referred_columns)]
+ referred_columns = [preparer._unquote_identifier(x)
+ for x in re.split(r'\s*,\s', referred_columns)]
fkey_d = {
'name' : conname,
'constrained_columns' : constrained_columns,
if expr:
if idx_name != sv_idx_name:
util.warn(
- "Skipped unsupported reflection of expression-based index %s"
+ "Skipped unsupported reflection of "
+ "expression-based index %s"
% idx_name)
sv_idx_name = idx_name
continue
## Load data types for enums:
SQL_ENUMS = """
SELECT t.typname as "name",
- -- t.typdefault as "default", -- no enum defaults in 8.4 at least
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema",
- e.enumlabel as "label"
+ -- no enum defaults in 8.4 at least
+ -- t.typdefault as "default",
+ pg_catalog.pg_type_is_visible(t.oid) as "visible",
+ n.nspname as "schema",
+ e.enumlabel as "label"
FROM pg_catalog.pg_type t
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid
ORDER BY "name", e.oid -- e.oid gives us label order
"""
- s = sql.text(SQL_ENUMS, typemap={'attname':sqltypes.Unicode, 'label':sqltypes.Unicode})
+ s = sql.text(SQL_ENUMS, typemap={
+ 'attname':sqltypes.Unicode,
+ 'label':sqltypes.Unicode})
c = connection.execute(s)
enums = {}
## Load data types for domains:
SQL_DOMAINS = """
SELECT t.typname as "name",
- pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
- not t.typnotnull as "nullable",
- t.typdefault as "default",
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema"
+ pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
+ not t.typnotnull as "nullable",
+ t.typdefault as "default",
+ pg_catalog.pg_type_is_visible(t.oid) as "visible",
+ n.nspname as "schema"
FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid
WHERE t.typtype = 'd'
"""