return None
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
# Query to extract the PK/FK constrained fields of the given table
keyqry = """
SELECT se.rdb$field_name AS fname
# get primary key fields
c = connection.execute(keyqry, ["PRIMARY KEY", tablename])
pkfields = [self.normalize_name(r['fname']) for r in c.fetchall()]
- return pkfields
+ return {'constrained_columns':pkfields, 'name':None}
@reflection.cache
def get_column_sequence(self, connection,
ORDER BY r.rdb$field_position
"""
# get the PK, used to determine the eventual associated sequence
- pkey_cols = self.get_primary_keys(connection, table_name)
+ pk_constraint = self.get_pk_constraint(connection, table_name)
+ pkey_cols = pk_constraint['constrained_columns']
tablename = self.denormalize_name(table_name)
# get all of the fields for this table
and t3.tabid = t2.tabid and t3.colno = t1.colno
order by t1.colno""", table_name, schema)
- primary_cols = self.get_primary_keys(connection, table_name, schema, **kw)
+ pk_constraint = self.get_pk_constraint(connection, table_name, schema, **kw)
+ primary_cols = pk_constraint['constrained_columns']
columns = []
rows = c.fetchall()
return fkeys.values()
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
schema = schema or self.default_schema_name
# Select the column positions from sysindexes for sysconstraints
colpositions |= colpos
if not len(colpositions):
- return []
+ return {'constrained_columns':[], 'name':None}
# Select the column names using the columnpositions
# TODO: Maybe cache a bit of those col infos (eg select all colnames for one table)
table_name, *colpositions
).fetchall()
- return reduce(lambda x,y: list(x)+list(y), c, [])
+ cols = reduce(lambda x,y: list(x)+list(y), c, [])
+ return {'constrained_columns':cols, 'name':None}
@reflection.cache
def get_indexes(self, connection, table_name, schema, **kw):
return cols
@reflection.cache
- def get_primary_keys(self, connection, tablename, schema=None, **kw):
+ def get_pk_constraint(self, connection, tablename, schema=None, **kw):
current_schema = schema or self.default_schema_name
pkeys = []
# information_schema.referential_constraints
for row in c:
if 'PRIMARY' in row[TC.c.constraint_type.name]:
pkeys.append(row[0])
- return pkeys
+ return {'constrained_columns':pkeys, 'name':None}
@reflection.cache
def get_foreign_keys(self, connection, tablename, schema=None, **kw):
return parsed_state.columns
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
for key in parsed_state.keys:
if key['type'] == 'PRIMARY':
# There can be only one.
- ##raise Exception, str(key)
- return [s[0] for s in key['columns']]
- return []
+ cols = [s[0] for s in key['columns']]
+ return {'constrained_columns':cols, 'name':None}
+ return {'constrained_columns':[], 'name':None}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
schema=self.denormalize_name(schema))
indexes = []
last_index_name = None
- pkeys = self.get_primary_keys(connection, table_name, schema,
- resolve_synonyms=resolve_synonyms,
- dblink=dblink,
- info_cache=kw.get('info_cache'))
+ pk_constraint = self.get_pk_constraint(
+ connection, table_name, schema, resolve_synonyms=resolve_synonyms,
+ dblink=dblink, info_cache=kw.get('info_cache'))
+ pkeys = pk_constraint['constrained_columns']
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
constraint_data = rp.fetchall()
return constraint_data
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
- """
-
- kw arguments can be:
-
- oracle_resolve_synonyms
-
- dblink
-
- """
- return self._get_primary_keys(connection, table_name, schema, **kw)[0]
-
@reflection.cache
- def _get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
dblink = kw.get('dblink', '')
info_cache = kw.get('info_cache')
info_cache=kw.get('info_cache'))
for row in constraint_data:
- #print "ROW:" , row
(cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
if cons_type == 'P':
if constraint_name is None:
constraint_name = self.normalize_name(cons_name)
pkeys.append(local_column)
- return pkeys, constraint_name
-
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw)
-
- return {
- 'constrained_columns':cols,
- 'name':name
- }
+ return {'constrained_columns':pkeys, 'name':constraint_name}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return columns
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
table_oid = self.get_table_oid(connection, table_name, schema,
info_cache=kw.get('info_cache'))
"""
t = sql.text(PK_SQL, typemap={'attname':sqltypes.Unicode})
c = connection.execute(t, table_oid=table_oid)
- primary_keys = [r[0] for r in c.fetchall()]
- return primary_keys
-
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- cols = self.get_primary_keys(connection, table_name,
- schema=schema, **kw)
-
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=kw.get('info_cache'))
+ cols = [r[0] for r in c.fetchall()]
PK_CONS_SQL = """
SELECT conname
t = sql.text(PK_CONS_SQL, typemap={'conname':sqltypes.Unicode})
c = connection.execute(t, table_oid=table_oid)
name = c.scalar()
- return {
- 'constrained_columns':cols,
- 'name':name
- }
+
+ return {'constrained_columns':cols, 'name':name}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return columns
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
cols = self.get_columns(connection, table_name, schema, **kw)
pkeys = []
for col in cols:
if col['primary_key']:
pkeys.append(col['name'])
- return pkeys
+ return {'constrained_columns':pkeys, 'name':None}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
raise NotImplementedError()
- def get_primary_keys(self, connection, table_name, schema=None, **kw):
- """Return information about primary keys in `table_name`.
-
- Given a :class:`.Connection`, a string
- `table_name`, and an optional string `schema`, return primary
- key information as a list of column names.
-
- """
- raise NotImplementedError()
-
- def get_pk_constraint(self, table_name, schema=None, **kw):
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
"""Return information about the primary key constraint on
table_name`.
- Given a string `table_name`, and an optional string `schema`, return
- primary key information as a dictionary with these keys:
+ Given a :class:`.Connection`, a string
+ `table_name`, and an optional string `schema`, return primary
+ key information as a dictionary with these keys:
constrained_columns
a list of column names that make up the primary key
insp = reflection.Inspector.from_engine(connection)
return insp.reflecttable(table, include_columns, exclude_columns)
- def get_pk_constraint(self, conn, table_name, schema=None, **kw):
- """Compatibility method, adapts the result of get_primary_keys()
- for those dialects which don't implement get_pk_constraint().
-
- """
- return {
- 'constrained_columns':
- self.get_primary_keys(conn, table_name,
- schema=schema, **kw)
- }
-
def validate_identifier(self, ident):
if len(ident) > self.max_identifier_length:
raise exc.IdentifierError(
import sqlalchemy
from sqlalchemy import exc, sql
+from sqlalchemy import schema as sa_schema
from sqlalchemy import util
-from sqlalchemy.util import topological
from sqlalchemy.types import TypeEngine
-from sqlalchemy import schema as sa_schema
+from sqlalchemy.util import deprecated
+from sqlalchemy.util import topological
@util.decorator
col_def['type'] = coltype()
return col_defs
+ @deprecated('0.7', 'Call to deprecated method get_primary_keys.'
+ ' Use get_pk_constraint instead.')
def get_primary_keys(self, table_name, schema=None, **kw):
"""Return information about primary keys in `table_name`.
primary key information as a list of column names.
"""
- pkeys = self.dialect.get_primary_keys(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)
-
+ pkeys = self.dialect.get_pk_constraint(self.bind, table_name, schema,
+ info_cache=self.info_cache,
+ **kw)['constrained_columns']
return pkeys
def get_pk_constraint(self, table_name, schema=None, **kw):
-from test.lib.testing import eq_, assert_raises, assert_raises_message
-import StringIO, unicodedata
+import unicodedata
+import sqlalchemy as sa
+from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
from sqlalchemy import schema, events, event
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy import MetaData, Integer, String
-from test.lib.schema import Table, Column
-import sqlalchemy as sa
+from sqlalchemy.engine.reflection import Inspector
from test.lib import ComparesTables, \
- testing, engines, AssertsCompiledSQL
-from test.lib import fixtures
+ testing, engines, AssertsCompiledSQL, fixtures
+from test.lib.schema import Table, Column
+from test.lib.testing import eq_, assert_raises, assert_raises_message
create_inspector = Inspector.from_engine
self._test_get_columns(schema='test_schema', table_type='view')
@testing.provide_metadata
- def _test_get_primary_keys(self, schema=None):
+ def _test_get_pk_constraint(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = createTables(meta, schema)
+ users, addresses, _ = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
- users_pkeys = insp.get_primary_keys(users.name,
- schema=schema)
+
+ users_cons = insp.get_pk_constraint(users.name, schema=schema)
+ users_pkeys = users_cons['constrained_columns']
eq_(users_pkeys, ['user_id'])
- addr_cons = insp.get_pk_constraint(addresses.name,
- schema=schema)
+ addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
addr_pkeys = addr_cons['constrained_columns']
eq_(addr_pkeys, ['address_id'])
eq_(addr_cons['name'], 'email_ad_pk')
go()
- def test_get_primary_keys(self):
- self._test_get_primary_keys()
+ def test_get_pk_constraint(self):
+ self._test_get_pk_constraint()
@testing.fails_on('sqlite', 'no schemas')
- def test_get_primary_keys_with_schema(self):
- self._test_get_primary_keys(schema='test_schema')
+ def test_get_pk_constraint_with_schema(self):
+ self._test_get_pk_constraint(schema='test_schema')
+
+ @testing.provide_metadata
+ def test_deprecated_get_primary_keys(self):
+ meta = self.metadata
+ users, _, _ = createTables(meta, schema=None)
+ meta.create_all()
+ insp = Inspector(meta.bind)
+ assert_raises_message(
+ sa_exc.SADeprecationWarning,
+ "Call to deprecated method get_primary_keys."
+ " Use get_pk_constraint instead.",
+ insp.get_primary_keys, users.name
+ )
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):