"join sys.schemas as sch on sch.schema_id=tab.schema_id "
"where tab.name = :tabname "
"and sch.name=:schname "
- "and ind.is_primary_key=0 and ind.type != 0",
- bindparams=[
- sql.bindparam(
- "tabname",
- tablename,
- sqltypes.String(convert_unicode=True),
- ),
- sql.bindparam(
- "schname", owner, sqltypes.String(convert_unicode=True)
- ),
- ],
- typemap={"name": sqltypes.Unicode()},
+ "and ind.is_primary_key=0 and ind.type != 0"
)
+ .bindparams(
+ sql.bindparam(
+ "tabname", tablename, sqltypes.String(convert_unicode=True)
+ ),
+ sql.bindparam(
+ "schname", owner, sqltypes.String(convert_unicode=True)
+ ),
+ )
+ .columns(name=sqltypes.Unicode())
)
indexes = {}
for row in rp:
"ind_col.object_id=tab.object_id) "
"join sys.schemas as sch on sch.schema_id=tab.schema_id "
"where tab.name=:tabname "
- "and sch.name=:schname",
- bindparams=[
- sql.bindparam(
- "tabname",
- tablename,
- sqltypes.String(convert_unicode=True),
- ),
- sql.bindparam(
- "schname", owner, sqltypes.String(convert_unicode=True)
- ),
- ],
- typemap={"name": sqltypes.Unicode()},
+ "and sch.name=:schname"
)
+ .bindparams(
+ sql.bindparam(
+ "tabname", tablename, sqltypes.String(convert_unicode=True)
+ ),
+ sql.bindparam(
+ "schname", owner, sqltypes.String(convert_unicode=True)
+ ),
+ )
+ .columns(name=sqltypes.Unicode())
)
for row in rp:
if row["index_id"] in indexes:
" where "
"mod.object_id=views.object_id and "
"views.schema_id=sch.schema_id and "
- "views.name=:viewname and sch.name=:schname",
- bindparams=[
- sql.bindparam(
- "viewname",
- viewname,
- sqltypes.String(convert_unicode=True),
- ),
- sql.bindparam(
- "schname", owner, sqltypes.String(convert_unicode=True)
- ),
- ],
+ "views.name=:viewname and sch.name=:schname"
+ ).bindparams(
+ sql.bindparam(
+ "viewname", viewname, sqltypes.String(convert_unicode=True)
+ ),
+ sql.bindparam(
+ "schname", owner, sqltypes.String(convert_unicode=True)
+ ),
)
)
"select nspname from pg_namespace " "where lower(nspname)=:schema"
)
cursor = connection.execute(
- sql.text(
- query,
- bindparams=[
- sql.bindparam(
- "schema",
- util.text_type(schema.lower()),
- type_=sqltypes.Unicode,
- )
- ],
+ sql.text(query).bindparams(
+ sql.bindparam(
+ "schema",
+ util.text_type(schema.lower()),
+ type_=sqltypes.Unicode,
+ )
)
)
"select relname from pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where "
"pg_catalog.pg_table_is_visible(c.oid) "
- "and relname=:name",
- bindparams=[
- sql.bindparam(
- "name",
- util.text_type(table_name),
- type_=sqltypes.Unicode,
- )
- ],
+ "and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(table_name),
+ type_=sqltypes.Unicode,
+ )
)
)
else:
sql.text(
"select relname from pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where n.nspname=:schema and "
- "relname=:name",
- bindparams=[
- sql.bindparam(
- "name",
- util.text_type(table_name),
- type_=sqltypes.Unicode,
- ),
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- ],
+ "relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(table_name),
+ type_=sqltypes.Unicode,
+ ),
+ sql.bindparam(
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
+ ),
)
)
return bool(cursor.first())
"SELECT relname FROM pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where relkind='S' and "
"n.nspname=current_schema() "
- "and relname=:name",
- bindparams=[
- sql.bindparam(
- "name",
- util.text_type(sequence_name),
- type_=sqltypes.Unicode,
- )
- ],
+ "and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(sequence_name),
+ type_=sqltypes.Unicode,
+ )
)
)
else:
sql.text(
"SELECT relname FROM pg_class c join pg_namespace n on "
"n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=:schema and relname=:name",
- bindparams=[
- sql.bindparam(
- "name",
- util.text_type(sequence_name),
- type_=sqltypes.Unicode,
- ),
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- ],
+ "n.nspname=:schema and relname=:name"
+ ).bindparams(
+ sql.bindparam(
+ "name",
+ util.text_type(sequence_name),
+ type_=sqltypes.Unicode,
+ ),
+ sql.bindparam(
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
+ ),
)
)
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
"""
- s = sql.text(
- SQL_COLS,
- bindparams=[sql.bindparam("table_oid", type_=sqltypes.Integer)],
- typemap={"attname": sqltypes.Unicode, "default": sqltypes.Unicode},
+ s = (
+ sql.text(SQL_COLS)
+ .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
+ .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
)
c = connection.execute(s, table_oid=table_oid)
rows = c.fetchall()
WHERE a.attrelid = :table_oid
ORDER BY k.ord
"""
- t = sql.text(PK_SQL, typemap={"attname": sqltypes.Unicode})
+ t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
c = connection.execute(t, table_oid=table_oid)
cols = [r[0] for r in c.fetchall()]
WHERE r.conrelid = :table_oid AND r.contype = 'p'
ORDER BY 1
"""
- t = sql.text(PK_CONS_SQL, typemap={"conname": sqltypes.Unicode})
+ t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
c = connection.execute(t, table_oid=table_oid)
name = c.scalar()
r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
)
- t = sql.text(
- FK_SQL,
- typemap={"conname": sqltypes.Unicode, "condef": sqltypes.Unicode},
+ t = sql.text(FK_SQL).columns(
+ conname=sqltypes.Unicode, condef=sqltypes.Unicode
)
c = connection.execute(t, table=table_oid)
fkeys = []
i.relname
"""
- t = sql.text(
- IDX_SQL,
- typemap={"relname": sqltypes.Unicode, "attname": sqltypes.Unicode},
+ t = sql.text(IDX_SQL).columns(
+ relname=sqltypes.Unicode, attname=sqltypes.Unicode
)
c = connection.execute(t, table_oid=table_oid)
cons.contype = 'u'
"""
- t = sql.text(UNIQUE_SQL, typemap={"col_name": sqltypes.Unicode})
+ t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
c = connection.execute(t, table_oid=table_oid)
uniques = defaultdict(lambda: defaultdict(dict))
# e.oid gives us label order within an enum
SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
- s = sql.text(
- SQL_ENUMS,
- typemap={"attname": sqltypes.Unicode, "label": sqltypes.Unicode},
+ s = sql.text(SQL_ENUMS).columns(
+ attname=sqltypes.Unicode, label=sqltypes.Unicode
)
if schema != "*":
WHERE t.typtype = 'd'
"""
- s = sql.text(SQL_DOMAINS, typemap={"attname": sqltypes.Unicode})
+ s = sql.text(SQL_DOMAINS).columns(attname=sqltypes.Unicode)
c = connection.execute(s)
domains = {}