^^^^^^^^^^^^^^^^^
PostgreSQL allows the specification of an *operator class* for each column of
-an index (see http://www.postgresql.org/docs/8.3/interactive/indexes-opclass.html).
-The :class:`.Index` construct allows these to be specified via the ``postgresql_ops``
-keyword argument::
+an index (see
+http://www.postgresql.org/docs/8.3/interactive/indexes-opclass.html).
+The :class:`.Index` construct allows these to be specified via the
+``postgresql_ops`` keyword argument::
Index('my_index', my_table.c.id, my_table.c.data,
postgresql_ops={
``postgresql_ops`` keyword argument to :class:`.Index` construct.
Note that the keys in the ``postgresql_ops`` dictionary are the "key" name of
-the :class:`.Column`, i.e. the name used to access it from the ``.c`` collection
-of :class:`.Table`, which can be configured to be different than the actual
-name of the column as expressed in the database.
+the :class:`.Column`, i.e. the name used to access it from the ``.c``
+collection of :class:`.Table`, which can be configured to be different than
+the actual name of the column as expressed in the database.
Index Types
^^^^^^^^^^^^
-PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well as
-the ability for users to create their own (see
+PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
+as the ability for users to create their own (see
http://www.postgresql.org/docs/8.3/static/indexes-types.html). These can be
specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
_FLOAT_TYPES = (700, 701, 1021, 1022)
_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
+
class BYTEA(sqltypes.LargeBinary):
__visit_name__ = 'BYTEA'
+
class DOUBLE_PRECISION(sqltypes.Float):
__visit_name__ = 'DOUBLE_PRECISION'
+
class INET(sqltypes.TypeEngine):
__visit_name__ = "INET"
PGInet = INET
+
class CIDR(sqltypes.TypeEngine):
__visit_name__ = "CIDR"
PGCidr = CIDR
+
class MACADDR(sqltypes.TypeEngine):
__visit_name__ = "MACADDR"
PGMacAddr = MACADDR
+
class TIMESTAMP(sqltypes.TIMESTAMP):
def __init__(self, timezone=False, precision=None):
super(TIMESTAMP, self).__init__(timezone=timezone)
super(TIME, self).__init__(timezone=timezone)
self.precision = precision
+
class INTERVAL(sqltypes.TypeEngine):
"""Postgresql INTERVAL type.
"""
__visit_name__ = 'INTERVAL'
+
def __init__(self, precision=None):
self.precision = precision
PGInterval = INTERVAL
+
class BIT(sqltypes.TypeEngine):
__visit_name__ = 'BIT'
+
def __init__(self, length=None, varying=False):
if not varying:
# BIT without VARYING defaults to length 1
PGBit = BIT
+
class UUID(sqltypes.TypeEngine):
"""Postgresql UUID type.
"""
if as_uuid and _python_UUID is None:
raise NotImplementedError(
- "This version of Python does not support the native UUID type."
- )
+ "This version of Python does not support the native UUID type."
+ )
self.as_uuid = as_uuid
def bind_processor(self, dialect):
PGUuid = UUID
+
class _Slice(expression.ColumnElement):
__visit_name__ = 'slice'
type = sqltypes.NULLTYPE
+
def __init__(self, slice_, source_comparator):
self.start = source_comparator._check_literal(
source_comparator.expr,
source_comparator.expr,
operators.getitem, slice_.stop)
+
class array(expression.Tuple):
"""A Postgresql ARRAY literal.
def self_group(self, against):
return self
+
class ARRAY(sqltypes.Concatenable, sqltypes.TypeEngine):
"""Postgresql ARRAY type.
to optimize itself to expect exactly that number of dimensions.
Note that Postgresql itself still allows N dimensions with such a type.
- SQL expressions of type :class:`.ARRAY` have support for "index" and "slice"
- behavior. The Python ``[]`` operator works normally here, given
+ SQL expressions of type :class:`.ARRAY` have support for "index" and
+ "slice" behavior. The Python ``[]`` operator works normally here, given
integer indexes or slices. Note that Postgresql arrays default
to 1-based indexing. The operator produces binary expression
constructs which will produce the appropriate SQL, both for
item_proc = self.item_type.\
dialect_impl(dialect).\
bind_processor(dialect)
+
def process(value):
if value is None:
return value
item_proc = self.item_type.\
dialect_impl(dialect).\
result_processor(dialect, coltype)
+
def process(value):
if value is None:
return value
PGArray = ARRAY
+
class ENUM(sqltypes.Enum):
"""Postgresql ENUM type.
self.drop(bind=bind, checkfirst=checkfirst)
colspecs = {
- sqltypes.Interval:INTERVAL,
- sqltypes.Enum:ENUM,
+ sqltypes.Interval: INTERVAL,
+ sqltypes.Enum: ENUM,
}
ischema_names = {
- 'integer' : INTEGER,
- 'bigint' : BIGINT,
- 'smallint' : SMALLINT,
- 'character varying' : VARCHAR,
- 'character' : CHAR,
- '"char"' : sqltypes.String,
- 'name' : sqltypes.String,
- 'text' : TEXT,
- 'numeric' : NUMERIC,
- 'float' : FLOAT,
- 'real' : REAL,
+ 'integer': INTEGER,
+ 'bigint': BIGINT,
+ 'smallint': SMALLINT,
+ 'character varying': VARCHAR,
+ 'character': CHAR,
+ '"char"': sqltypes.String,
+ 'name': sqltypes.String,
+ 'text': TEXT,
+ 'numeric': NUMERIC,
+ 'float': FLOAT,
+ 'real': REAL,
'inet': INET,
'cidr': CIDR,
'uuid': UUID,
'bit': BIT,
'bit varying': BIT,
'macaddr': MACADDR,
- 'double precision' : DOUBLE_PRECISION,
- 'timestamp' : TIMESTAMP,
- 'timestamp with time zone' : TIMESTAMP,
- 'timestamp without time zone' : TIMESTAMP,
- 'time with time zone' : TIME,
- 'time without time zone' : TIME,
- 'date' : DATE,
+ 'double precision': DOUBLE_PRECISION,
+ 'timestamp': TIMESTAMP,
+ 'timestamp with time zone': TIMESTAMP,
+ 'timestamp without time zone': TIMESTAMP,
+ 'time with time zone': TIME,
+ 'time without time zone': TIME,
+ 'date': DATE,
'time': TIME,
- 'bytea' : BYTEA,
- 'boolean' : BOOLEAN,
- 'interval':INTERVAL,
- 'interval year to month':INTERVAL,
- 'interval day to second':INTERVAL,
+ 'bytea': BYTEA,
+ 'boolean': BOOLEAN,
+ 'interval': INTERVAL,
+ 'interval year to month': INTERVAL,
+ 'interval day to second': INTERVAL,
}
-
class PGCompiler(compiler.SQLCompiler):
def visit_array(self, element, **kw):
elif isinstance(select._distinct, (list, tuple)):
return "DISTINCT ON (" + ', '.join(
[self.process(col) for col in select._distinct]
- )+ ") "
+ ) + ") "
else:
return "DISTINCT ON (" + self.process(select._distinct) + ") "
else:
return "EXTRACT(%s FROM %s)" % (
field, self.process(expr))
+
class PGDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column)
name = self.quote_schema(type_.schema, type_.quote) + "." + name
return name
+
class PGInspector(reflection.Inspector):
def __init__(self, conn):
return self.dialect.get_table_oid(self.bind, table_name, schema,
info_cache=self.info_cache)
+
class CreateEnumType(schema._CreateDropBase):
- __visit_name__ = "create_enum_type"
+ __visit_name__ = "create_enum_type"
+
class DropEnumType(schema._CreateDropBase):
- __visit_name__ = "drop_enum_type"
+ __visit_name__ = "drop_enum_type"
+
class PGExecutionContext(default.DefaultExecutionContext):
def fire_sequence(self, seq, type_):
col = column.name
tab = tab[0:29 + max(0, (29 - len(col)))]
col = col[0:29 + max(0, (29 - len(tab)))]
- column._postgresql_seq_name = seq_name = "%s_%s_seq" % (tab, col)
+ name = "%s_%s_seq" % (tab, col)
+ column._postgresql_seq_name = seq_name = name
sch = column.table.schema
if sch is not None:
return super(PGExecutionContext, self).get_insert_default(column)
+
class PGDialect(default.DefaultDialect):
name = 'postgresql'
supports_alter = True
return connection.scalar("select current_schema()")
def has_schema(self, connection, schema):
+ query = "select nspname from pg_namespace where lower(nspname)=:schema"
cursor = connection.execute(
sql.text(
- "select nspname from pg_namespace where lower(nspname)=:schema",
+ query,
bindparams=[
sql.bindparam(
'schema', unicode(schema.lower()),
sql.bindparam('table_name', type_=sqltypes.Unicode),
sql.bindparam('schema', type_=sqltypes.Unicode)
],
- typemap={'oid':sqltypes.Integer}
+ typemap={'oid': sqltypes.Integer}
)
c = connection.execute(s, table_name=table_name, schema=schema)
table_oid = c.scalar()
"AND '%s' = (select nspname from pg_namespace n "
"where n.oid = c.relnamespace) " %
current_schema,
- typemap = {'relname':sqltypes.Unicode}
+ typemap={'relname': sqltypes.Unicode}
)
)
return [row[0] for row in result]
-
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
if schema is not None:
# format columns
columns = []
for name, format_type, default, notnull, attnum, table_oid in rows:
- column_info = self._get_column_info(name, format_type, default,
- notnull, domains, enums, schema)
+ column_info = self._get_column_info(
+ name, format_type, default, notnull, domains, enums, schema)
columns.append(column_info)
return columns
"""
t = sql.text(FK_SQL, typemap={
- 'conname':sqltypes.Unicode,
- 'condef':sqltypes.Unicode})
+ 'conname': sqltypes.Unicode,
+ 'condef': sqltypes.Unicode})
c = connection.execute(t, table=table_oid)
fkeys = []
for conname, condef, conschema in c.fetchall():
referred_columns = [preparer._unquote_identifier(x)
for x in re.split(r'\s*,\s', referred_columns)]
fkey_d = {
- 'name' : conname,
- 'constrained_columns' : constrained_columns,
- 'referred_schema' : referred_schema,
- 'referred_table' : referred_table,
- 'referred_columns' : referred_columns
+ 'name': conname,
+ 'constrained_columns': constrained_columns,
+ 'referred_schema': referred_schema,
+ 'referred_table': referred_table,
+ 'referred_columns': referred_columns
}
fkeys.append(fkey_d)
return fkeys
i.relname
"""
- t = sql.text(IDX_SQL, typemap={'attname':sqltypes.Unicode})
+ t = sql.text(IDX_SQL, typemap={'attname': sqltypes.Unicode})
c = connection.execute(t, table_oid=table_oid)
index_names = {}
if idx_name in index_names:
index_d = index_names[idx_name]
else:
- index_d = {'column_names':[]}
+ index_d = {'column_names': []}
indexes.append(index_d)
index_names[idx_name] = index_d
index_d['name'] = idx_name
"""
s = sql.text(SQL_ENUMS, typemap={
- 'attname':sqltypes.Unicode,
- 'label':sqltypes.Unicode})
+ 'attname': sqltypes.Unicode,
+ 'label': sqltypes.Unicode})
c = connection.execute(s)
enums = {}
WHERE t.typtype = 'd'
"""
- s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode})
+ s = sql.text(SQL_DOMAINS, typemap={'attname': sqltypes.Unicode})
c = connection.execute(s)
domains = {}
name = "%s.%s" % (domain['schema'], domain['name'])
domains[name] = {
- 'attype':attype,
+ 'attype': attype,
'nullable': domain['nullable'],
'default': domain['default']
}
return domains
-
:meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
* isolation_level - Set the transaction isolation level for the lifespan of a
- :class:`.Connection` (can only be set on a connection, not a statement or query).
- This includes the options ``SERIALIZABLE``, ``READ COMMITTED``,
+ :class:`.Connection` (can only be set on a connection, not a statement
+ or query). This includes the options ``SERIALIZABLE``, ``READ COMMITTED``,
``READ UNCOMMITTED`` and ``REPEATABLE READ``.
* stream_results - Enable or disable usage of server side cursors.
- If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used.
+ If ``None`` or not set, the ``server_side_cursors`` option of the
+ :class:`.Engine` is used.
Unicode
-------
This overrides the encoding specified in the Postgresql client configuration.
.. versionadded:: 0.7.3
- The psycopg2-specific ``client_encoding`` parameter to :func:`.create_engine`.
+ The psycopg2-specific ``client_encoding`` parameter to
+ :func:`.create_engine`.
SQLAlchemy can also be instructed to skip the usage of the psycopg2
``UNICODE`` extension and to instead utilize it's own unicode encode/decode
services, which are normally reserved only for those DBAPIs that don't
-fully support unicode directly. Passing ``use_native_unicode=False``
-to :func:`.create_engine` will disable usage of ``psycopg2.extensions.UNICODE``.
+fully support unicode directly. Passing ``use_native_unicode=False`` to
+:func:`.create_engine` will disable usage of ``psycopg2.extensions.UNICODE``.
SQLAlchemy will instead encode data itself into Python bytestrings on the way
in and coerce from bytes on the way back,
using the value of the :func:`.create_engine` ``encoding`` parameter, which
raise exc.InvalidRequestError(
"Unknown PG numeric type: %d" % coltype)
+
class _PGEnum(ENUM):
def __init__(self, *arg, **kw):
super(_PGEnum, self).__init__(*arg, **kw)
self.convert_unicode = "force"
# end Py2K
+
class _PGArray(ARRAY):
def __init__(self, *arg, **kw):
super(_PGArray, self).__init__(*arg, **kw)
self.item_type.convert_unicode = "force"
# end Py2K
+
class _PGHStore(HSTORE):
def bind_processor(self, dialect):
if dialect._has_native_hstore:
_server_side_id = util.counter()
+
class PGExecutionContext_psycopg2(PGExecutionContext):
def create_cursor(self):
# TODO: coverage for server side cursors + select.for_update()
)
)
else:
- is_server_side = self.execution_options.get('stream_results', False)
+ is_server_side = \
+ self.execution_options.get('stream_results', False)
self.__is_server_side = is_server_side
if is_server_side:
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
+
class PGDialect_psycopg2(PGDialect):
driver = 'psycopg2'
# Py2K
colspecs = util.update_copy(
PGDialect.colspecs,
{
- sqltypes.Numeric : _PGNumeric,
- ENUM : _PGEnum, # needs force_unicode
- sqltypes.Enum : _PGEnum, # needs force_unicode
- ARRAY : _PGArray, # needs force_unicode
- HSTORE : _PGHStore,
+ sqltypes.Numeric: _PGNumeric,
+ ENUM: _PGEnum, # needs force_unicode
+ sqltypes.Enum: _PGEnum, # needs force_unicode
+ ARRAY: _PGArray, # needs force_unicode
+ HSTORE: _PGHStore,
}
)
for x in m.group(1, 2, 3)
if x is not None)
-
def initialize(self, connection):
super(PGDialect_psycopg2, self).initialize(connection)
self._has_native_hstore = self.use_native_hstore and \
def _isolation_lookup(self):
extensions = __import__('psycopg2.extensions').extensions
return {
- 'READ COMMITTED':extensions.ISOLATION_LEVEL_READ_COMMITTED,
- 'READ UNCOMMITTED':extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
- 'REPEATABLE READ':extensions.ISOLATION_LEVEL_REPEATABLE_READ,
- 'SERIALIZABLE':extensions.ISOLATION_LEVEL_SERIALIZABLE
+ 'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
+ 'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
+ 'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
+ 'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def set_isolation_level(self, connection, level):
return False
dialect = PGDialect_psycopg2
-