return "CHAR(%(length)s)" % {'length' : self.length}
class PGBinary(sqltypes.Binary):
def get_col_spec(self):
- return "BLOB"
+ return "BYTEA"
class PGBoolean(sqltypes.Boolean):
def get_col_spec(self):
return "BOOLEAN"
return self.module
def reflecttable(self, table):
- raise NotImplementedError()
+
+ columns = schema.Table("columns", table.engine,
+ Column("table_schema", String),
+ Column("table_name", String),
+ Column("column_name", String),
+ Column("is_nullable", Integer),
+ Column("data_type", String),
+ Column("ordinal_position", Integer),
+ schema="information_schema")
+
+ constraints = schema.Table("table_constraints", table.engine,
+ Column("table_schema", String),
+ Column("table_name", String),
+ Column("constraint_name", String),
+ Column("constraint_type", String),
+ schema="information_schema")
+
+ column_constraints = schema.Table("constraint_column_usage", table.engine,
+ Column("table_schema", String),
+ Column("table_name", String),
+ Column("column_name", String),
+ Column("constraint_name", String),
+ schema="information_schema")
+
+ s = columns.select(columns.c.table_name==table.name,
+ from_obj=[sql.join(columns, column_constraints,
+ sql.and_(
+ columns.c.table_name==column_constraints.c.table_name,
+ columns.c.table_schema==column_constraints.c.table_schema,
+ columns.c.column_name==column_constraints.c.column_name,
+ ),
+ isouter=True).join(constraints,
+ sql.and_(
+ column_constraints.c.table_schema==constraints.c.table_schema,
+ column_constraints.c.constraint_name==constraints.c.constraint_name,
+ constraints.c.constraint_type=='PRIMARY KEY'
+ ), isouter=True)],
+ order_by=[columns.c.ordinal_position])
+ s.append_column(constraints.c.constraint_type)
+ if table.schema is not None:
+ s.append_whereclause(columns.c.table_schema==table.schema)
+ else:
+ current_schema = text("select current_schema()", table.engine).execute().fetchone()[0]
+ s.append_whereclause(columns.c.table_schema==current_schema)
+
+ c = s.execute()
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+ print "row! " + repr(row)
+ continue
+ (name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5])
+
+ match = re.match(r'(\w+)(\(.*?\))?', type)
+ coltype = match.group(1)
+ args = match.group(2)
+
+ #print "coltype: " + repr(coltype) + " args: " + repr(args)
+ coltype = pragma_names[coltype]
+ if args is not None:
+ args = re.findall(r'(\d+)', args)
+ #print "args! " +repr(args)
+ coltype = coltype(*args)
+ table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable))
+ c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+ (tablename, localcol, remotecol) = (row[2], row[3], row[4])
+ #print "row! " + repr(row)
+ remotetable = Table(tablename, self, autoload = True)
+ table.c[localcol].foreign_key = schema.ForeignKey(remotetable.c[remotecol])
class PGCompiler(ansisql.ANSICompiler):
def bindparam_string(self, name):
class EngineTest(PersistTest):
def testsqlitetableops(self):
import sqlalchemy.databases.sqlite as sqllite
- db = sqllite.engine(':memory:', {}, echo = testbase.echo)
+# db = sqllite.engine(':memory:', {}, echo = testbase.echo)
+ db = postgres.engine({'database':'test', 'host':'127.0.0.1', 'user':'scott', 'password':'tiger'}, echo = testbase.echo)
self.do_tableops(db)
def do_tableops(self, db):
Column('remote_user_id', Integer, foreign_key = ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
)
+
+ users.drop()
+ addresses.drop()
# users.c.parent_user_id.set_foreign_key(ForeignKey(users.c.user_id))
# clear out table registry
db.tables.clear()
- users = Table('users', db, autoload = True)
+# users = Table('users', db, autoload = True)
addresses = Table('email_addresses', db, autoload = True)
users.drop()