else:
return False
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
#TODO: map these better
column_func = {
14 : lambda r: sqltypes.String(r['FLEN']), # TEXT
while row:
name = row['FNAME']
python_name = lower_if_possible(name)
- if desired_columns and python_name not in desired_columns:
+ if include_columns and python_name not in include_columns:
continue
args = [python_name]
return self.cache[name]
-def reflecttable(connection, table, desired_columns, ischema_names):
+def reflecttable(connection, table, include_columns, ischema_names):
key_constraints = pg_key_constraints
if table.schema is not None:
row[columns.c.numeric_scale],
row[columns.c.column_default]
)
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
args = []
cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower() )
return bool( cursor.fetchone() is not None )
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower() )
rows = c.fetchall()
if not rows :
for name , colattr , collength , default , colno in rows:
name = name.lower()
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
# in 7.31, coltype = 0x000
row = c.fetchone()
return row is not None
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
import sqlalchemy.databases.information_schema as ischema
# Get base columns
row[columns.c.numeric_scale],
row[columns.c.column_default]
)
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
args = []
version.append(n)
return tuple(version)
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
"""Load column definitions from the server."""
decode_from = self._detect_charset(connection)
# leave column names as unicode
name = name.decode(decode_from)
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
match = re.match(r'(\w+)(\(.*?\))?\s*(\w+)?\s*(\w+)?', type)
return name, owner, dblink
raise
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
preparer = self.identifier_preparer
if not preparer.should_quote(table):
name = table.name.upper()
if (colname.upper() == colname):
colname = colname.lower()
- if desired_columns and colname not in desired_columns:
+ if include_columns and colname not in include_columns:
continue
# INTEGER if the scale is 0 and precision is null
else:
return False
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
if self.use_information_schema:
- ischema.reflecttable(connection, table, desired_columns, ischema_names)
+ ischema.reflecttable(connection, table, include_columns, ischema_names)
else:
preparer = self.identifier_preparer
if table.schema is not None:
domains = self._load_domains(connection)
for name, format_type, default, notnull, attnum, table_oid in rows:
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
## strip (30) from character varying(30)
return (row is not None)
- def reflecttable(self, connection, table, desired_columns):
+ def reflecttable(self, connection, table, include_columns):
c = connection.execute("PRAGMA table_info(%s)" % self.preparer().format_table(table), {})
found_table = False
while True:
found_table = True
(name, type, nullable, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4] is not None, row[5])
name = re.sub(r'^\"|\"$', '', name)
- if desired_columns and name not in desired_columns:
+ if include_columns and name not in include_columns:
continue
match = re.match(r'(\w+)(\(.*?\))?', type)
if match:
raise NotImplementedError()
- def reflecttable(self, connection, table, desired_columns=None):
+ def reflecttable(self, connection, table, include_columns=None):
"""Load table description from the database.
Given a [sqlalchemy.engine#Connection] and a [sqlalchemy.schema#Table] object, reflect its
- columns and properties from the database. If desired_columns (a list or set) is specified, limit the autoload
+ columns and properties from the database. If include_columns (a list or set) is specified, limit the autoload
to the given column names.
"""
return self.__engine.drop(entity, connection=self, **kwargs)
- def reflecttable(self, table, desired_columns=None):
+ def reflecttable(self, table, include_columns=None):
"""Reflect the columns in the given string table name from the database."""
- return self.__engine.reflecttable(table, self, desired_columns)
+ return self.__engine.reflecttable(table, self, include_columns)
def default_schema_name(self):
return self.__engine.dialect.get_default_schema_name(self)
return Connection(self, close_with_result=close_with_result, **kwargs)
- def reflecttable(self, table, connection=None, desired_columns=None):
+ def reflecttable(self, table, connection=None, include_columns=None):
"""Given a Table object, reflects its columns and properties from the database."""
if connection is None:
else:
conn = connection
try:
- self.dialect.reflecttable(conn, table, desired_columns)
+ self.dialect.reflecttable(conn, table, include_columns)
finally:
if connection is None:
conn.close()
autoload_with = kwargs.pop('autoload_with', False)
mustexist = kwargs.pop('mustexist', False)
useexisting = kwargs.pop('useexisting', False)
+ include_columns = kwargs.pop('include_columns', None)
key = _get_table_key(name, schema)
try:
table = metadata.tables[key]
# we do it after the table is in the singleton dictionary to support
# circular foreign keys
if autoload:
- try:
- iter(autoload)
- except:
- columns = None
- else:
- columns = autoload
try:
if autoload_with:
- autoload_with.reflecttable(table, desired_columns=columns)
+ autoload_with.reflecttable(table, include_columns=include_columns)
else:
- metadata._get_engine(raiseerr=True).reflecttable(table, desired_columns=columns)
+ metadata._get_engine(raiseerr=True).reflecttable(table, include_columns=include_columns)
except exceptions.NoSuchTableError:
del metadata.tables[key]
raise
options include:
schema
- Defaults to None: the *schema name* for this table, which is
+ The *schema name* for this table, which is
required if the table resides in a schema other than the
default selected schema for the engine's database
- connection.
+ connection. Defaults to ``None``.
autoload
Defaults to False: the Columns for this table should be
reflected from the database. Usually there will be no
Column objects in the constructor if this property is set.
+ autoload_with
+ if autoload==True, this is an optional Engine or Connection
+ instance to be used for the table reflection. If ``None``,
+ the underlying MetaData's bound connectable will be used.
+
+ include_columns
+ A list of strings indicating a subset of columns to be
+ loaded via the ``autoload`` operation; table columns who
+ aren't present in this list will not be represented on the resulting
+ ``Table`` object. Defaults to ``None`` which indicates all
+ columns should be reflected.
+
mustexist
Defaults to False: indicates that this Table must already
have been defined elsewhere in the application, else an
addresses.drop()
users.drop()
+ def test_autoload_partial(self):
+ meta = MetaData(testbase.db)
+ foo = Table('foo', meta,
+ Column('a', String(30)),
+ Column('b', String(30)),
+ Column('c', String(30)),
+ Column('d', String(30)),
+ Column('e', String(30)),
+ Column('f', String(30)),
+ )
+ meta.create_all()
+ try:
+ meta2 = MetaData(testbase.db)
+ foo2 = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
+ # test that cols come back in original order
+ assert [c.name for c in foo2.c] == ['b', 'e', 'f']
+ for c in ('b', 'f', 'e'):
+ assert c in foo2.c
+ for c in ('a', 'c', 'd'):
+ assert c not in foo2.c
+ finally:
+ meta.drop_all()
+
def testoverridecolumns(self):
"""test that you can override columns which contain foreign keys to other reflected tables"""
meta = MetaData(testbase.db)