sqltypes.CHAR: PGChar,
}
+ischema_names = {
+ 'integer' : PGInteger,
+ 'character varying' : PGString,
+ 'character' : PGChar,
+ 'text' : PGText,
+ 'numeric' : PGNumeric,
+ 'timestamp without time zone' : PGDateTime,
+ 'bytea' : PGBinary,
+}
+
def engine(opts, **params):
return PGSQLEngine(opts, **params)
Column("column_name", String),
Column("constraint_name", String),
schema="information_schema")
-
- s = columns.select(columns.c.table_name==table.name,
- from_obj=[sql.join(columns, column_constraints,
+
+ s = columns.select(columns.c.table_name==table.name, order_by=[columns.c.ordinal_position])
+
+ s.append_from(sql.join(columns, column_constraints,
sql.and_(
columns.c.table_name==column_constraints.c.table_name,
columns.c.table_schema==column_constraints.c.table_schema,
column_constraints.c.table_schema==constraints.c.table_schema,
column_constraints.c.constraint_name==constraints.c.constraint_name,
constraints.c.constraint_type=='PRIMARY KEY'
- ), isouter=True)],
- order_by=[columns.c.ordinal_position])
+ ), isouter=True)),
+
s.append_column(constraints.c.constraint_type)
+
if table.schema is not None:
s.append_whereclause(columns.c.table_schema==table.schema)
else:
for r in fromclause._get_from_objects():
self.froms[r.id] = r
+ def append_join(self, joinon, right, whereclause, **params):
+ self.append_from(self.froms[joinon], right, whereclause, **params)
def append_clause(self, keyword, clause):
if type(clause) == str:
# clear out table registry
db.tables.clear()
-# users = Table('users', db, autoload = True)
+ users = Table('users', db, autoload = True)
addresses = Table('email_addresses', db, autoload = True)
users.drop()