- postgres reflection moved to use pg_schema tables, can be overridden
with use_information_schema=True argument to create_engine
[ticket:60], [ticket:71]
-- added natural_case argument to Table, Column, semi-experimental
-flag for use with table reflection to help with quoting rules
-[ticket:155]
+- added case_sensitive argument to MetaData, Table, Column, determines
+itself automatically based on if a parent schemaitem has a non-None
+setting for the flag, or if not, then whether the identifier name is all lower
+case or not. when set to True, quoting is applied to identifiers with mixed or
+uppercase identifiers. quoting is also applied automatically in all cases to
+identifiers that are known to be reserved words or contain other non-standard
+characters. various database dialects can override all of this behavior, but
+currently they are all using the default behavior. tested with postgres, mysql,
+sqlite. needs more testing with firebird, oracle, ms-sql. part of the ongoing
+work with [ticket:155]
- unit tests updated to run without any pysqlite installed; pool
test uses a mock DBAPI
- urls support escaped characters in passwords [ticket:281]
])
+RESERVED_WORDS = util.Set(['all', 'analyse', 'analyze', 'and', 'any', 'array', 'as', 'asc', 'asymmetric', 'authorization', 'between', 'binary', 'both', 'case', 'cast', 'check', 'collate', 'column', 'constraint', 'create', 'cross', 'current_date', 'current_role', 'current_time', 'current_timestamp', 'current_user', 'default', 'deferrable', 'desc', 'distinct', 'do', 'else', 'end', 'except', 'false', 'for', 'foreign', 'freeze', 'from', 'full', 'grant', 'group', 'having', 'ilike', 'in', 'initially', 'inner', 'intersect', 'into', 'is', 'isnull', 'join', 'leading', 'left', 'like', 'limit', 'localtime', 'localtimestamp', 'natural', 'new', 'not', 'notnull', 'null', 'off', 'offset', 'old', 'on', 'only', 'or', 'order', 'outer', 'overlaps', 'placing', 'primary', 'references', 'right', 'select', 'session_user', 'similar', 'some', 'symmetric', 'table', 'then', 'to', 'trailing', 'true', 'union', 'unique', 'user', 'using', 'verbose', 'when', 'where'])
+
+LEGAL_CHARACTERS = util.Set(string.ascii_lowercase + string.ascii_uppercase + string.digits + '_$')
+ILLEGAL_INITIAL_CHARACTERS = util.Set(string.digits + '$')
+
def create_engine():
return engine.ComposedSQLEngine(None, ANSIDialect())
# some tests would need to be rewritten if this is done.
#return value.upper()
- def _requires_quotes(self, value, natural_case):
+ def _reserved_words(self):
+ return RESERVED_WORDS
+
+ def _legal_characters(self):
+ return LEGAL_CHARACTERS
+
+ def _illegal_initial_characters(self):
+ return ILLEGAL_INITIAL_CHARACTERS
+
+ def _requires_quotes(self, value, case_sensitive):
"""return true if the given identifier requires quoting."""
- return False
+ return \
+ value in self._reserved_words() \
+ or (value[0] in self._illegal_initial_characters()) \
+ or bool(len([x for x in str(value) if x not in self._legal_characters()])) \
+ or (case_sensitive and value.lower() != value)
def visit_table(self, table):
if table in self.__visited:
return
- if table.quote or self._requires_quotes(table.name, table.natural_case):
+ if table.quote or self._requires_quotes(table.name, table.case_sensitive):
tablestring = self._quote_identifier(table.name)
else:
tablestring = table.name
if table.schema:
- if table.quote_schema or self._requires_quotes(table.schema, table.natural_case_schema):
+ if table.quote_schema or self._requires_quotes(table.schema, table.case_sensitive_schema):
schemastring = self._quote_identifier(table.schema)
else:
schemastring = table.schema
def visit_column(self, column):
if column in self.__visited:
return
- if column.quote or self._requires_quotes(column.name, column.natural_case):
+ if column.quote or self._requires_quotes(column.name, column.case_sensitive):
self.__strings[column] = self._quote_identifier(column.name)
else:
self.__strings[column] = column.name
def visit_sequence(self, sequence):
if sequence in self.__visited:
return
- if sequence.quote or self._requires_quotes(sequence.name, sequence.natural_case):
+ if sequence.quote or self._requires_quotes(sequence.name, sequence.case_sensitive):
self.__strings[sequence] = self._quote_identifier(sequence.name)
else:
self.__strings[sequence] = sequence.name
'time' : PG1Time
})
-reserved_words = util.Set(['all', 'analyse', 'analyze', 'and', 'any', 'array', 'as', 'asc', 'asymmetric', 'authorization', 'between', 'binary', 'both', 'case', 'cast', 'check', 'collate', 'column', 'constraint', 'create', 'cross', 'current_date', 'current_role', 'current_time', 'current_timestamp', 'current_user', 'default', 'deferrable', 'desc', 'distinct', 'do', 'else', 'end', 'except', 'false', 'for', 'foreign', 'freeze', 'from', 'full', 'grant', 'group', 'having', 'ilike', 'in', 'initially', 'inner', 'intersect', 'into', 'is', 'isnull', 'join', 'leading', 'left', 'like', 'limit', 'localtime', 'localtimestamp', 'natural', 'new', 'not', 'notnull', 'null', 'off', 'offset', 'old', 'on', 'only', 'or', 'order', 'outer', 'overlaps', 'placing', 'primary', 'references', 'right', 'select', 'session_user', 'similar', 'some', 'symmetric', 'table', 'then', 'to', 'trailing', 'true', 'union', 'unique', 'user', 'using', 'verbose', 'when', 'where'])
-
-legal_characters = util.Set(string.ascii_lowercase + string.digits + '_$')
-illegal_initial_characters = util.Set(string.digits + '$')
def engine(opts, **params):
return PGSQLEngine(opts, **params)
colargs= []
if default is not None:
colargs.append(PassiveDefault(sql.text(default)))
- table.append_item(schema.Column(name, coltype, nullable=nullable, natural_case=natural_case, *colargs))
+ table.append_item(schema.Column(name, coltype, nullable=nullable, case_sensitive=not natural_case, *colargs))
if not found_table:
if referred_schema is not None:
natural_case_schema = preparer._is_natural_case(referred_schema)
schema.Table(referred_table, table.metadata, autoload=True, schema=referred_schema,
- autoload_with=connection, natural_case=natural_case, natural_case_schema = natural_case_schema)
+ autoload_with=connection, case_sensitive= not natural_case, case_sensitive_schema = not natural_case_schema)
for column in referred_columns:
refspec.append(".".join([referred_schema, referred_table, column]))
else:
- schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection, natural_case=natural_case)
+ schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection, case_sensitive=not natural_case)
for column in referred_columns:
refspec.append(".".join([referred_table, column]))
# TODO: this has to build into the Sequence object so we can get the quoting
# logic from it
if sch is not None:
- exc = "select nextval('%s.%s_%s_seq')" % (sch, column.table.name, column.name)
+ exc = "select nextval('\"%s.%s_%s_seq\"')" % (sch, column.table.name, column.name)
else:
- exc = "select nextval('%s_%s_seq')" % (column.table.name, column.name)
+ exc = "select nextval('\"%s_%s_seq\"')" % (column.table.name, column.name)
c = self.proxy(exc)
return c.fetchone()[0]
else:
class PGIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
def _fold_identifier_case(self, value):
return value.lower()
- def _requires_quotes(self, value, natural_case):
- if natural_case:
- value = self._fold_identifier_case(str(value))
- retval = bool(len([x for x in str(value) if x not in legal_characters]))
- if not retval and (value[0] in illegal_initial_characters or value in reserved_words):
- retval = True
- return retval
def _unquote_identifier(self, value):
if value[0] == self.initial_quote:
value = value[1:-1].replace('""','"')
for item in args:
if item is not None:
item._set_parent(self)
+ def _get_parent(self):
+ raise NotImplementedError()
def _set_parent(self, parent):
"""associate with this SchemaItem's parent object."""
raise NotImplementedError()
return None
def _get_engine(self):
return self._derived_metadata().engine
+
+ def _set_casing_strategy(self, name, kwargs, keyname='case_sensitive'):
+ setattr(self, '_SchemaItem__%s_setting' % keyname, kwargs.pop(keyname, None))
+ def _determine_case_sensitive(self, name, keyname='case_sensitive'):
+ local = getattr(self, '_SchemaItem__%s_setting' % keyname, None)
+ if local is not None:
+ return local
+ parent = self
+ while parent is not None:
+ parent = parent._get_parent()
+ if parent is not None:
+ parentval = getattr(parent, '_SchemaItem__case_sensitive_setting', None)
+ if parentval is not None:
+ return parentval
+ return name is not None and name.lower() != name
+ def _get_case_sensitive(self):
+ try:
+ return self.__case_sensitive
+ except AttributeError:
+ self.__case_sensitive = self._determine_case_sensitive(self.name)
+ return self.__case_sensitive
+ case_sensitive = property(_get_case_sensitive)
+
engine = property(lambda s:s._get_engine())
metadata = property(lambda s:s._derived_metadata())
quote_schema=False : indicates that the Namespace identifier must be properly escaped and quoted before being sent
to the database. This flag overrides all other quoting behavior.
- natural_case=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
+ case_sensitive=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
Mixed case is not sufficient to cause this identifier to be quoted; it must contain an illegal character.
- natural_case_schema=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
+ case_sensitive_schema=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
Mixed case is not sufficient to cause this identifier to be quoted; it must contain an illegal character.
"""
super(Table, self).__init__(name)
self.indexes = util.OrderedProperties()
self.constraints = []
self.primary_key = PrimaryKeyConstraint()
-
+ self.quote = kwargs.get('quote', False)
+ self.quote_schema = kwargs.get('quote_schema', False)
if self.schema is not None:
self.fullname = "%s.%s" % (self.schema, self.name)
else:
self.fullname = self.name
self.owner = kwargs.pop('owner', None)
- self.quote = kwargs.pop('quote', False)
- self.quote_schema = kwargs.pop('quote_schema', False)
- default_natural_case = metadata.natural_case
- if default_natural_case is None:
- default_natural_case = True
- self.natural_case = kwargs.pop('natural_case', default_natural_case)
- self.natural_case_schema = kwargs.pop('natural_case_schema', default_natural_case)
+
+ self._set_casing_strategy(name, kwargs)
+ self._set_casing_strategy(self.schema or '', kwargs, keyname='case_sensitive_schema')
self.kwargs = kwargs
+ def _get_case_sensitive_schema(self):
+ try:
+ return getattr(self, '_SchemaItem__case_sensitive_schema')
+ except AttributeError:
+ setattr(self, '_SchemaItem__case_sensitive_schema', self._determine_case_sensitive(self.schema or '', keyname='case_sensitive_schema'))
+ return getattr(self, '_SchemaItem__case_sensitive_schema')
+ case_sensitive_schema = property(_get_case_sensitive_schema)
+
def _set_primary_key(self, pk):
if getattr(self, '_primary_key', None) in self.constraints:
self.constraints.remove(self._primary_key)
def append_index(self, index):
self.indexes[index.name] = index
-
+
+ def _get_parent(self):
+ return self._metadata
def _set_parent(self, metadata):
metadata.tables[_get_table_key(self.name, self.schema)] = self
self._metadata = metadata
quote=False : indicates that the Column identifier must be properly escaped and quoted before being sent
to the database.
- natural_case=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
+ case_sensitive=True : indicates that the identifier should be interpreted by the database in the natural case for identifiers.
Mixed case is not sufficient to cause this identifier to be quoted; it must contain an illegal character.
"""
name = str(name) # in case of incoming unicode
self.index = kwargs.pop('index', None)
self.unique = kwargs.pop('unique', None)
self.quote = kwargs.pop('quote', False)
- self.natural_case = kwargs.pop('natural_case', True)
+ self._set_casing_strategy(name, kwargs)
self.onupdate = kwargs.pop('onupdate', None)
if self.index is not None and self.unique is not None:
raise exceptions.ArgumentError("Column may not define both index and unique")
self.primary_key = True
self.nullable = False
self.table.primary_key.append(self)
-
+
+ def _get_parent(self):
+ return self.table
def _set_parent(self, table):
if getattr(self, 'table', None) is not None:
raise exceptions.ArgumentError("this Column already has a table!")
table.append_column(self)
- if self.table.metadata.natural_case is not None:
- self.natural_case = self.table.metadata.natural_case
if self.index or self.unique:
table.append_index_column(self, index=self.index,
unique=self.unique)
def copy(self):
"""creates a copy of this Column, unitialized"""
- return Column(self.name, self.type, self.default, key = self.key, primary_key = self.primary_key, nullable = self.nullable, hidden = self.hidden)
+ return Column(self.name, self.type, self.default, key = self.key, primary_key = self.primary_key, nullable = self.nullable, hidden = self.hidden, case_sensitive=self.case_sensitive, quote=self.quote)
def _make_proxy(self, selectable, name = None):
"""creates a copy of this Column, initialized the way this Column is"""
fk = None
else:
fk = self.foreign_key.copy()
- c = Column(name or self.name, self.type, fk, self.default, key = name or self.key, primary_key = self.primary_key, nullable = self.nullable, hidden = self.hidden)
+ c = Column(name or self.name, self.type, fk, self.default, key = name or self.key, primary_key = self.primary_key, nullable = self.nullable, hidden = self.hidden, case_sensitive=self.case_sensitive, quote=self.quote)
c.table = selectable
c.orig_set = self.orig_set
c._parent = self
def accept_schema_visitor(self, visitor):
"""calls the visit_foreign_key method on the given visitor."""
visitor.visit_foreign_key(self)
-
+
+ def _get_parent(self):
+ return self.parent
def _set_parent(self, column):
self.parent = column
return self.column.table.metadata
except AttributeError:
return self._metadata
+ def _get_parent(self):
+ return self.column
def _set_parent(self, column):
self.column = column
self._metadata = self.column.table.metadata
class Sequence(DefaultGenerator):
"""represents a sequence, which applies to Oracle and Postgres databases."""
- def __init__(self, name, start = None, increment = None, optional=False, quote=False, natural_case=True, **kwargs):
+ def __init__(self, name, start = None, increment = None, optional=False, quote=False, **kwargs):
super(Sequence, self).__init__(**kwargs)
self.name = name
self.start = start
self.increment = increment
self.optional=optional
- self.natural_case = natural_case
self.quote = quote
+ self._set_casing_strategy(name, kwargs)
def __repr__(self):
return "Sequence(%s)" % string.join(
[repr(self.name)] +
def _set_parent(self, column):
super(Sequence, self)._set_parent(column)
column.sequence = self
- if column.metadata.natural_case is not None:
- self.natural_case = column.metadata.natural_case
def create(self):
self.engine.create(self)
return self
self.columns[index] = item
def copy(self):
raise NotImplementedError()
+ def _get_parent(self):
+ return self.table
class ForeignKeyConstraint(Constraint):
"""table-level foreign key constraint, represents a colleciton of ForeignKey objects."""
super(PrimaryKeyConstraint, self).__init__(name=kwargs.pop('name', None))
self.__colnames = list(columns)
def _set_parent(self, table):
+ self.table = table
table.primary_key = self
for c in self.__colnames:
self.append(table.c[c])
super(Constraint, self).__init__(name)
self.__colnames = list(columns)
def _set_parent(self, table):
+ self.table = table
table.constraints.append(self)
for c in self.__colnames:
self.append(table.c[c])
def _init_items(self, *args):
for column in args:
self.append_column(column)
-
+ def _get_parent(self):
+ return self.table
def append_column(self, column):
# make sure all columns are from the same table
# and no column is repeated
class MetaData(SchemaItem):
"""represents a collection of Tables and their associated schema constructs."""
- def __init__(self, name=None, natural_case=None, **kwargs):
+ def __init__(self, name=None, **kwargs):
# a dictionary that stores Table objects keyed off their name (and possibly schema name)
self.tables = {}
self.name = name
- self.natural_case = natural_case
+ self._set_casing_strategy(name, kwargs)
def is_bound(self):
return False
def clear(self):
self.tables.clear()
def table_iterator(self, reverse=True):
return self._sort_tables(self.tables.values(), reverse=reverse)
-
+ def _get_parent(self):
+ return None
def create_all(self, connectable=None, tables=None, engine=None):
"""create all tables stored in this metadata.
)
person = Table('person', metadata,
Column('id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
- Column('favoriteBall_id', Integer, ForeignKey('ball.id')),
-# Column('favoriteBall_id', Integer),
+ Column('favorite_ball_id', Integer, ForeignKey('ball.id')),
+# Column('favorite_ball_id', Integer),
)
ball.create()
person.create()
-# person.c.favoriteBall_id.append_item(ForeignKey('ball.id'))
+# person.c.favorite_ball_id.append_item(ForeignKey('ball.id'))
ball.c.person_id.append_item(ForeignKey('person.id'))
# make the test more complete for postgres
Ball.mapper = mapper(Ball, ball)
Person.mapper = mapper(Person, person, properties= dict(
balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, foreignkey=ball.c.person_id),
- favorateBall = relation(Ball.mapper, primaryjoin=person.c.favoriteBall_id==ball.c.id, foreignkey=person.c.favoriteBall_id),
+ favorateBall = relation(Ball.mapper, primaryjoin=person.c.favorite_ball_id==ball.c.id, foreignkey=person.c.favorite_ball_id),
)
)
Ball.mapper = mapper(Ball, ball)
Person.mapper = mapper(Person, person, properties= dict(
balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, foreignkey=ball.c.person_id, post_update=False, private=True),
- favorateBall = relation(Ball.mapper, primaryjoin=person.c.favoriteBall_id==ball.c.id, foreignkey=person.c.favoriteBall_id, post_update=True),
+ favorateBall = relation(Ball.mapper, primaryjoin=person.c.favorite_ball_id==ball.c.id, foreignkey=person.c.favorite_ball_id, post_update=True),
)
)
self.assert_sql(db, lambda: sess.flush(), [
(
- "INSERT INTO person (favoriteBall_id) VALUES (:favoriteBall_id)",
- {'favoriteBall_id': None}
+ "INSERT INTO person (favorite_ball_id) VALUES (:favorite_ball_id)",
+ {'favorite_ball_id': None}
),
(
"INSERT INTO ball (person_id) VALUES (:person_id)",
lambda ctx:{'person_id':p.id}
),
(
- "UPDATE person SET favoriteBall_id=:favoriteBall_id WHERE person.id = :person_id",
- lambda ctx:{'favoriteBall_id':p.favorateBall.id,'person_id':p.id}
+ "UPDATE person SET favorite_ball_id=:favorite_ball_id WHERE person.id = :person_id",
+ lambda ctx:{'favorite_ball_id':p.favorateBall.id,'person_id':p.id}
)
],
with_sequences= [
(
- "INSERT INTO person (id, favoriteBall_id) VALUES (:id, :favoriteBall_id)",
- lambda ctx:{'id':ctx.last_inserted_ids()[0], 'favoriteBall_id': None}
+ "INSERT INTO person (id, favorite_ball_id) VALUES (:id, :favorite_ball_id)",
+ lambda ctx:{'id':ctx.last_inserted_ids()[0], 'favorite_ball_id': None}
),
(
"INSERT INTO ball (id, person_id) VALUES (:id, :person_id)",
),
# heres the post update
(
- "UPDATE person SET favoriteBall_id=:favoriteBall_id WHERE person.id = :person_id",
- lambda ctx:{'favoriteBall_id':p.favorateBall.id,'person_id':p.id}
+ "UPDATE person SET favorite_ball_id=:favorite_ball_id WHERE person.id = :person_id",
+ lambda ctx:{'favorite_ball_id':p.favorateBall.id,'person_id':p.id}
)
])
sess.delete(p)
self.assert_sql(db, lambda: sess.flush(), [
# heres the post update (which is a pre-update with deletes)
(
- "UPDATE person SET favoriteBall_id=:favoriteBall_id WHERE person.id = :person_id",
- lambda ctx:{'person_id': p.id, 'favoriteBall_id': None}
+ "UPDATE person SET favorite_ball_id=:favorite_ball_id WHERE person.id = :person_id",
+ lambda ctx:{'person_id': p.id, 'favorite_ball_id': None}
),
(
"DELETE FROM ball WHERE ball.id = :id",
Ball.mapper = mapper(Ball, ball)
Person.mapper = mapper(Person, person, properties= dict(
balls = relation(Ball.mapper, primaryjoin=ball.c.person_id==person.c.id, foreignkey=ball.c.person_id, private=True, post_update=True),
- favorateBall = relation(Ball.mapper, primaryjoin=person.c.favoriteBall_id==ball.c.id, foreignkey=person.c.favoriteBall_id),
+ favorateBall = relation(Ball.mapper, primaryjoin=person.c.favorite_ball_id==ball.c.id, foreignkey=person.c.favorite_ball_id),
)
)
{'person_id':None}
),
(
- "INSERT INTO person (favoriteBall_id) VALUES (:favoriteBall_id)",
- lambda ctx:{'favoriteBall_id':b.id}
+ "INSERT INTO person (favorite_ball_id) VALUES (:favorite_ball_id)",
+ lambda ctx:{'favorite_ball_id':b.id}
),
# heres the post update on each one-to-many item
(
lambda ctx:{'id':ctx.last_inserted_ids()[0], 'person_id':None}
),
(
- "INSERT INTO person (id, favoriteBall_id) VALUES (:id, :favoriteBall_id)",
- lambda ctx:{'id':ctx.last_inserted_ids()[0], 'favoriteBall_id':b.id}
+ "INSERT INTO person (id, favorite_ball_id) VALUES (:id, :favorite_ball_id)",
+ lambda ctx:{'id':ctx.last_inserted_ids()[0], 'favorite_ball_id':b.id}
),
(
"UPDATE ball SET person_id=:person_id WHERE ball.id = :ball_id",
table1 = Table('WorstCase1', metadata,
Column('lowercase', Integer, primary_key=True),
Column('UPPERCASE', Integer),
- Column('MixedCase', Integer, quote=True),
- Column('ASC', Integer, quote=True),
- quote=True)
+ Column('MixedCase', Integer),
+ Column('ASC', Integer))
table2 = Table('WorstCase2', metadata,
- Column('desc', Integer, quote=True, primary_key=True),
- Column('Union', Integer, quote=True),
- Column('MixedCase', Integer, quote=True),
- quote=True)
+ Column('desc', Integer, primary_key=True),
+ Column('Union', Integer),
+ Column('MixedCase', Integer))
table1.create()
table2.create()
res2 = select([table2.c.desc, table2.c.Union, table2.c.MixedCase], use_labels=True).execute().fetchall()
print res2
assert(res2==[(1,2,3),(2,2,3),(4,3,2)])
+
+ def testcascade(self):
+ lcmetadata = MetaData(case_sensitive=False)
+ t1 = Table('SomeTable', lcmetadata,
+ Column('UcCol', Integer),
+ Column('normalcol', String))
+ t2 = Table('othertable', lcmetadata,
+ Column('UcCol', Integer),
+ Column('normalcol', String, ForeignKey('SomeTable.normalcol')))
+ assert lcmetadata.case_sensitive is False
+ assert t1.c.UcCol.case_sensitive is False
+ assert t2.c.normalcol.case_sensitive is False
+
if __name__ == "__main__":
testbase.main()
def testcalculatedcolumns(self):
value_tbl = table('values',
- Column('id', Integer),
- Column('val1', Float),
- Column('val2', Float),
+ column('id', Integer),
+ column('val1', Float),
+ column('val2', Float),
)
self.runtest(
def testcast(self):
tbl = table('casttest',
- Column('id', Integer),
- Column('v1', Float),
- Column('v2', Float),
- Column('ts', TIMESTAMP),
+ column('id', Integer),
+ column('v1', Float),
+ column('v2', Float),
+ column('ts', TIMESTAMP),
)
def check_results(dialect, expected_results, literal):