--- /dev/null
+.. change::
+ :tags: reflection, usecase
+ :tickets: 2056
+
+ Added new reflection method :meth:`.Inspector.get_sequence_names` which
+ returns all the sequences defined and :meth:`.Inspector.has_sequence` to
+ check if a particular sequence exits.
+ Support for this method has been added to the backend that support
+ :class:`.Sequence`: PostgreSQL, Oracle and MariaDB >= 10.3.
return c.first() is not None
+ @reflection.cache
+ @_db_plus_owner_listing
+ def get_sequence_names(self, connection, dbname, owner, schema, **kw):
+ sequences = ischema.sequences
+
+ s = sql.select([sequences.c.sequence_name])
+ if owner:
+ s = s.where(sequences.c.sequence_schema == owner)
+
+ c = connection.execute(s)
+
+ return [row[0] for row in c]
+
@reflection.cache
def get_schema_names(self, connection, **kw):
s = sql.select(
rs.close()
def has_sequence(self, connection, sequence_name, schema=None):
+ if not self.supports_sequences:
+ self._sequences_not_supported()
if not schema:
schema = self.default_schema_name
# MariaDB implements sequences as a special type of table
cursor = connection.execute(
sql.text(
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
- "WHERE TABLE_NAME=:name AND "
+ "WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND "
"TABLE_SCHEMA=:schema_name"
),
dict(name=sequence_name, schema_name=schema),
)
return cursor.first() is not None
+ def _sequences_not_supported(self):
+ raise NotImplementedError(
+ "Sequences are supported only by the "
+ "MariaDB series 10.3 or greater"
+ )
+
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, **kw):
+ if not self.supports_sequences:
+ self._sequences_not_supported()
+ if not schema:
+ schema = self.default_schema_name
+ # MariaDB implements sequences as a special type of table
+ cursor = connection.execute(
+ sql.text(
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_TYPE='SEQUENCE' and TABLE_SCHEMA=:schema_name"
+ ),
+ dict(schema_name=schema),
+ )
+ return [
+ row[0]
+ for row in self._compat_fetchall(
+ cursor, charset=self._connection_charset
+ )
+ ]
+
def initialize(self, connection):
self._connection_charset = self._detect_charset(connection)
self._detect_sql_mode(connection)
)
return [self.normalize_name(row[0]) for row in cursor]
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, **kw):
+ if not schema:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT sequence_name FROM all_sequences "
+ "WHERE sequence_owner = :schema_name"
+ ),
+ schema_name=self.denormalize_name(schema),
+ )
+ return [self.normalize_name(row[0]) for row in cursor]
+
@reflection.cache
def get_table_options(self, connection, table_name, schema=None, **kw):
options = {}
def has_sequence(self, connection, sequence_name, schema=None):
if schema is None:
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=current_schema() "
- "and relname=:name"
- ).bindparams(
- sql.bindparam(
- "name",
- util.text_type(sequence_name),
- type_=sqltypes.Unicode,
- )
- )
- )
- else:
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=:schema and relname=:name"
- ).bindparams(
- sql.bindparam(
- "name",
- util.text_type(sequence_name),
- type_=sqltypes.Unicode,
- ),
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- )
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=:schema and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(sequence_name),
+ type_=sqltypes.Unicode,
+ ),
+ sql.bindparam(
+ "schema", util.text_type(schema), type_=sqltypes.Unicode,
+ ),
)
+ )
return bool(cursor.first())
)
return [name for name, in result]
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, **kw):
+ if not schema:
+ schema = self.default_schema_name
+ cursor = connection.execute(
+ sql.text(
+ "SELECT relname FROM pg_class c join pg_namespace n on "
+ "n.oid=c.relnamespace where relkind='S' and "
+ "n.nspname=:schema"
+ ).bindparams(
+ sql.bindparam(
+ "schema", util.text_type(schema), type_=sqltypes.Unicode,
+ ),
+ )
+ )
+ return [row[0] for row in cursor]
+
@reflection.cache
def get_view_definition(self, connection, view_name, schema=None, **kw):
view_def = connection.scalar(
def get_view_names(self, connection, schema=None, **kw):
"""Return a list of all view names available in the database.
- schema:
- Optional, retrieve names from a non-default schema.
+ :param schema: schema name to query, if not the default schema.
+ """
+
+ raise NotImplementedError()
+
+ def get_sequence_names(self, connection, schema=None, **kw):
+ """Return a list of all sequence names available in the database.
+
+ :param schema: schema name to query, if not the default schema.
+
+ .. versionadded:: 1.4
"""
raise NotImplementedError()
support named schemas, behavior is undefined if ``schema`` is not
passed as ``None``. For special quoting, use :class:`.quoted_name`.
- :param order_by: Optional, may be the string "foreign_key" to sort
- the result on foreign key dependencies. Does not automatically
- resolve cycles, and will raise :class:`.CircularDependencyError`
- if cycles exist.
-
.. seealso::
:meth:`_reflection.Inspector.get_sorted_table_and_fkc_names`
def has_table(self, table_name, schema=None):
"""Return True if the backend has a table of the given name.
+
+ :param table_name: name of the table to check
+ :param schema: schema name to query, if not the default schema.
+
.. versionadded:: 1.4
"""
with self._operation_context() as conn:
return self.dialect.has_table(conn, table_name, schema)
+ def has_sequence(self, sequence_name, schema=None):
+ """Return True if the backend has a table of the given name.
+
+ :param sequence_name: name of the table to check
+ :param schema: schema name to query, if not the default schema.
+
+ .. versionadded:: 1.4
+
+ """
+ # TODO: info_cache?
+ with self._operation_context() as conn:
+ return self.dialect.has_sequence(conn, sequence_name, schema)
+
def get_sorted_table_and_fkc_names(self, schema=None):
"""Return dependency-sorted table and foreign key constraint names in
referred to within a particular schema.
conn, schema, info_cache=self.info_cache
)
+ def get_sequence_names(self, schema=None):
+ """Return all sequence names in `schema`.
+
+ :param schema: Optional, retrieve names from a non-default schema.
+ For special quoting, use :class:`.quoted_name`.
+
+ """
+
+ with self._operation_context() as conn:
+ return self.dialect.get_sequence_names(
+ conn, schema, info_cache=self.info_cache
+ )
+
def get_view_definition(self, view_name, schema=None):
"""Return definition for `view_name`.
metadata = None
tables = None
other = None
+ sequences = None
@classmethod
def setup_class(cls):
cls.other = adict()
cls.tables = adict()
+ cls.sequences = adict()
cls.bind = cls.setup_bind()
cls.metadata = sa.MetaData()
if cls.run_create_tables == "once":
cls.metadata.create_all(cls.bind)
cls.tables.update(cls.metadata.tables)
+ cls.sequences.update(cls.metadata._sequences)
def _setup_each_tables(self):
if self.run_define_tables == "each":
if self.run_create_tables == "each":
self.metadata.create_all(self.bind)
self.tables.update(self.metadata.tables)
+ self.sequences.update(self.metadata._sequences)
elif self.run_create_tables == "each":
self.metadata.create_all(self.bind)
from .. import config
from .. import fixtures
from ..assertions import eq_
+from ..assertions import is_true
from ..config import requirements
from ..schema import Column
from ..schema import Table
+from ... import inspect
from ... import Integer
from ... import MetaData
-from ... import schema
from ... import Sequence
from ... import String
from ... import testing
)
-class HasSequenceTest(fixtures.TestBase):
+class HasSequenceTest(fixtures.TablesTest):
+ run_deletes = None
+
__requires__ = ("sequences",)
__backend__ = True
- def test_has_sequence(self, connection):
- s1 = Sequence("user_id_seq")
- connection.execute(schema.CreateSequence(s1))
- try:
- eq_(
- connection.dialect.has_sequence(connection, "user_id_seq"),
- True,
+ @classmethod
+ def define_tables(cls, metadata):
+ Sequence("user_id_seq", metadata=metadata)
+ Sequence("other_seq", metadata=metadata)
+ if testing.requires.schemas.enabled:
+ Sequence(
+ "user_id_seq", schema=config.test_schema, metadata=metadata
+ )
+ Sequence(
+ "schema_seq", schema=config.test_schema, metadata=metadata
)
- finally:
- connection.execute(schema.DropSequence(s1))
+ Table(
+ "user_id_table", metadata, Column("id", Integer, primary_key=True),
+ )
+
+ def test_has_sequence(self, connection):
+ eq_(
+ inspect(connection).has_sequence("user_id_seq"), True,
+ )
+
+ def test_has_sequence_other_object(self, connection):
+ eq_(
+ inspect(connection).has_sequence("user_id_table"), False,
+ )
@testing.requires.schemas
def test_has_sequence_schema(self, connection):
- s1 = Sequence("user_id_seq", schema=config.test_schema)
- connection.execute(schema.CreateSequence(s1))
- try:
- eq_(
- connection.dialect.has_sequence(
- connection, "user_id_seq", schema=config.test_schema
- ),
- True,
- )
- finally:
- connection.execute(schema.DropSequence(s1))
+ eq_(
+ inspect(connection).has_sequence(
+ "user_id_seq", schema=config.test_schema
+ ),
+ True,
+ )
def test_has_sequence_neg(self, connection):
- eq_(connection.dialect.has_sequence(connection, "user_id_seq"), False)
+ eq_(
+ inspect(connection).has_sequence("some_sequence"), False,
+ )
@testing.requires.schemas
def test_has_sequence_schemas_neg(self, connection):
eq_(
- connection.dialect.has_sequence(
- connection, "user_id_seq", schema=config.test_schema
+ inspect(connection).has_sequence(
+ "some_sequence", schema=config.test_schema
),
False,
)
@testing.requires.schemas
def test_has_sequence_default_not_in_remote(self, connection):
- s1 = Sequence("user_id_seq")
- connection.execute(schema.CreateSequence(s1))
- try:
- eq_(
- connection.dialect.has_sequence(
- connection, "user_id_seq", schema=config.test_schema
- ),
- False,
- )
- finally:
- connection.execute(schema.DropSequence(s1))
+ eq_(
+ inspect(connection).has_sequence(
+ "other_sequence", schema=config.test_schema
+ ),
+ False,
+ )
@testing.requires.schemas
def test_has_sequence_remote_not_in_default(self, connection):
- s1 = Sequence("user_id_seq", schema=config.test_schema)
- connection.execute(schema.CreateSequence(s1))
- try:
- eq_(
- connection.dialect.has_sequence(connection, "user_id_seq"),
- False,
- )
- finally:
- connection.execute(schema.DropSequence(s1))
+ eq_(
+ inspect(connection).has_sequence("schema_seq"), False,
+ )
+
+ def test_get_sequence_names(self, connection):
+ exp = {"other_seq", "user_id_seq"}
+
+ res = set(inspect(connection).get_sequence_names())
+ is_true(res.intersection(exp) == exp)
+ is_true("schema_seq" not in res)
+
+ @testing.requires.schemas
+ def test_get_sequence_names_no_sequence_schema(self, connection):
+ eq_(
+ inspect(connection).get_sequence_names(
+ schema=config.test_schema_2
+ ),
+ [],
+ )
+
+ @testing.requires.schemas
+ def test_get_sequence_names_sequences_schema(self, connection):
+ eq_(
+ sorted(
+ inspect(connection).get_sequence_names(
+ schema=config.test_schema
+ )
+ ),
+ ["schema_seq", "user_id_seq"],
+ )
+
+
+class HasSequenceTestEmpty(fixtures.TestBase):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+ def test_get_sequence_names_no_sequence(self, connection):
+ eq_(
+ inspect(connection).get_sequence_names(), [],
+ )