#import pdb;pdb.set_trace()
# get all of the fields for this table
c = connection.execute(tblqry, [table.name.upper()])
+ found_table = False
while True:
row = c.fetchone()
if not row: break
+ found_table = True
args = [row['FNAME']]
kw = {}
# get the data types and lengths
# is it a primary key?
table.append_item(schema.Column(*args, **kw))
# does the field have indexes
+
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
def last_inserted_ids(self):
return self.context.last_inserted_ids
import sqlalchemy.schema as schema
import sqlalchemy.ansisql as ansisql
import sqlalchemy.types as sqltypes
-from sqlalchemy.exceptions import *
+import sqlalchemy.exceptions as exceptions
from sqlalchemy import *
from sqlalchemy.ansisql import *
order_by=[columns.c.ordinal_position])
c = connection.execute(s)
+ found_table = False
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
# continue
+ found_table = True
(name, type, nullable, charlen, numericprec, numericscale, default) = (
row[columns.c.column_name],
row[columns.c.data_type],
if default is not None:
colargs.append(PassiveDefault(sql.text(default)))
table.append_item(schema.Column(name, coltype, nullable=nullable, *colargs))
+
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
s = select([constraints.c.constraint_name, constraints.c.constraint_type, constraints.c.table_name, key_constraints], use_labels=True, from_obj=[constraints.join(column_constraints, column_constraints.c.constraint_name==constraints.c.constraint_name).join(key_constraints, key_constraints.c.constraint_name==column_constraints.c.constraint_name)])
if not use_mysql:
order_by=[columns.c.ordinal_position])
c = connection.execute(s)
+ found_table = False
while True:
row = c.fetchone()
if row is None:
break
-
+ found_table = True
(name, type, nullable, charlen, numericprec, numericscale, default) = (
row[columns.c.column_name],
row[columns.c.data_type],
colargs.append(PassiveDefault(sql.text(default)))
table.append_item(schema.Column(name, coltype, nullable=nullable, *colargs))
-
-
+
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
+
# We also run an sp_columns to check for identity columns:
# FIXME: note that this only fetches the existence of an identity column, not it's properties like (seed, increment)
# also, add a check to make sure we specify the schema name of the table
# to use information_schema:
#ischema.reflecttable(self, table, ischema_names, use_mysql=True)
- tabletype, foreignkeyD = self.moretableinfo(connection, table=table)
- table.kwargs['mysql_engine'] = tabletype
-
c = connection.execute("describe " + table.name, {})
+ found_table = False
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
+ if not found_table:
+ tabletype, foreignkeyD = self.moretableinfo(connection, table=table)
+ table.kwargs['mysql_engine'] = tabletype
+ found_table = True
+
(name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4])
match = re.match(r'(\w+)(\(.*?\))?', type)
nullable=nullable,
default=default
)))
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
def moretableinfo(self, connection, table):
"""Return (tabletype, {colname:foreignkey,...})
self.append("\nDROP INDEX " + index.name + " ON " + index.table.name)
self.execute()
-dialect = MySQLDialect
\ No newline at end of file
+dialect = MySQLDialect
def reflecttable(self, connection, table):
c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS where TABLE_NAME = :table_name", {'table_name':table.name.upper()})
+ found_table = False
while True:
row = c.fetchone()
if row is None:
break
+ found_table = True
#print "ROW:" , row
(name, coltype, length, precision, scale, nullable, default) = (row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
table.append_item (schema.Column(name, coltype, nullable=nullable, *colargs))
-
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
+
c = connection.execute(constraintSQL, {'table_name' : table.name.upper()})
while True:
row = c.fetchone()
def reflecttable(self, connection, table):
c = connection.execute("PRAGMA table_info(" + table.name + ")", {})
+ found_table = False
while True:
row = c.fetchone()
if row is None:
break
#print "row! " + repr(row)
+ found_table = True
(name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5])
match = re.match(r'(\w+)(\(.*?\))?', type)
#print "args! " +repr(args)
coltype = coltype(*[int(a) for a in args])
table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable))
+
+ if not found_table:
+ raise exceptions.NoSuchTableError(table.name)
+
c = connection.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
while True:
row = c.fetchone()
This error generally corresponds to runtime state errors."""
pass
+class NoSuchTableError(InvalidRequestError):
+ """sqlalchemy was asked to load a table's definition from the database,
+ but the table doesn't exist."""
+ pass
+
class AssertionError(SQLAlchemyError):
"""corresponds to internal state being detected in an invalid state"""
pass
import sqlalchemy.databases.postgres as postgres
from sqlalchemy import *
+from sqlalchemy.exceptions import *
from testbase import PersistTest
import testbase
assert not table2.c.name.nullable
assert table2.c.description.nullable
+ def test_nonexistent(self):
+ self.assertRaises(NoSuchTableError, Table,
+ 'fake_table',
+ testbase.db, autoload=True)
+
def testoverride(self):
table = Table(
'override_test', testbase.db,