+0.3.7
+- sql:
+ - column labels are now generated in the compilation phase, which
+ means their lengths are dialect-dependent. So on oracle a label
+ that gets truncated to 30 chars will go out to 63 characters
+ on postgres. Also, the true labelname is always attached as the
+ accessor on the parent Selectable so theres no need to be aware
+ of the genrerated label names [ticket:512].
+
0.3.6
- sql:
- bindparam() names are now repeatable! specify two
from sqlalchemy import schema, sql, engine, util, sql_util, exceptions
from sqlalchemy.engine import default
-import string, re, sets, weakref
+import string, re, sets, weakref, random
ANSI_FUNCS = sets.ImmutableSet(['CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP',
'CURRENT_USER', 'LOCALTIME', 'LOCALTIMESTAMP',
# which will be passed to a ResultProxy and used for resultset-level value conversion
self.typemap = {}
- # a dictionary of select columns mapped to their name or key
- self.columns = {}
+ # a dictionary of select columns labels mapped to their "generated" label
+ self.column_labels = {}
# True if this compiled represents an INSERT
self.isinsert = False
return ""
def visit_label(self, label):
+ labelname = label.name
+ if len(labelname) >= self.dialect.max_identifier_length():
+ labelname = labelname[0:self.dialect.max_identifier_length() - 6] + "_" + hex(random.randint(0, 65535))[2:]
+
if len(self.select_stack):
- self.typemap.setdefault(label.name.lower(), label.obj.type)
- self.strings[label] = self.strings[label.obj] + " AS " + self.preparer.format_label(label)
-
+ self.typemap.setdefault(labelname.lower(), label.obj.type)
+ if isinstance(label.obj, sql._ColumnClause):
+ self.column_labels[label.obj._label] = labelname.lower()
+ self.strings[label] = self.strings[label.obj] + " AS " + self.preparer.format_label(label, labelname)
+
def visit_column(self, column):
if len(self.select_stack):
# if we are within a visit to a Select, set up the "typemap"
# for this column which is used to translate result set values
self.typemap.setdefault(column.name.lower(), column.type)
- self.columns.setdefault(column.key, column)
+ self.column_labels.setdefault(column._label, column.name.lower())
if column.table is None or not column.table.named_with_column():
self.strings[column] = self.preparer.format_column(column)
else:
def format_sequence(self, sequence):
return self.__generic_obj_format(sequence, sequence.name)
- def format_label(self, label):
- return self.__generic_obj_format(label, label.name)
+ def format_label(self, label, name=None):
+ return self.__generic_obj_format(label, name or label.name)
def format_alias(self, alias):
return self.__generic_obj_format(alias, alias.name)
def type_descriptor(self, typeobj):
return sqltypes.adapt_type(typeobj, colspecs)
+ def max_identifier_length(self):
+ return 30
+
def oid_column_name(self, column):
if not isinstance(column.table, sql.TableClause) and not isinstance(column.table, sql.Select):
return None
def create_execution_context(self):
return PGExecutionContext(self)
+ def max_identifier_length(self):
+ return 68
+
def type_descriptor(self, typeobj):
if self.version == 2:
return sqltypes.adapt_type(typeobj, pg2_colspecs)
raise NotImplementedError()
+ def max_identifier_length(self):
+ """Return the maximum length of identifier names.
+
+ Return None if no limit."""
+ return None
+
def supports_sane_rowcount(self):
"""Indicate whether the dialect properly implements statements rowcount.
proxy(str(compiled), parameters)
context.post_exec(self.__engine, proxy, compiled, parameters)
rpargs = self.__engine.dialect.create_result_proxy_args(self, cursor)
- return ResultProxy(self.__engine, self, cursor, context, typemap=compiled.typemap, columns=compiled.columns, **rpargs)
+ return ResultProxy(self.__engine, self, cursor, context, typemap=compiled.typemap, column_labels=compiled.column_labels, **rpargs)
# poor man's multimethod/generic function thingy
executors = {
else:
return object.__new__(cls, *args, **kwargs)
- def __init__(self, engine, connection, cursor, executioncontext=None, typemap=None, columns=None, should_prefetch=None):
+ def __init__(self, engine, connection, cursor, executioncontext=None, typemap=None, column_labels=None, should_prefetch=None):
"""ResultProxy objects are constructed via the execute() method on SQLEngine."""
self.connection = connection
self.cursor = cursor
self.engine = engine
self.closed = False
- self.columns = columns
+ self.column_labels = column_labels
if executioncontext is not None:
self.__executioncontext = executioncontext
self.rowcount = executioncontext.get_rowcount(cursor)
self.props = {}
self.keys = []
i = 0
+
if metadata is not None:
for item in metadata:
# sqlite possibly prepending table name to colnames so strip
try:
return self.__key_cache[key]
except KeyError:
- # TODO: use has_key on these, too many potential KeyErrors being raised
- if isinstance(key, sql.ColumnElement):
- try:
- rec = self.props[key._label.lower()]
- except KeyError:
- try:
- rec = self.props[key.key.lower()]
- except KeyError:
- try:
- rec = self.props[key.name.lower()]
- except KeyError:
- raise exceptions.NoSuchColumnError("Could not locate column in row for column '%s'" % str(key))
- elif isinstance(key, str):
- try:
- rec = self.props[key.lower()]
- except KeyError:
- try:
- if self.columns is not None:
- rec = self._convert_key(self.columns[key])
- else:
- raise
- except KeyError:
- raise exceptions.NoSuchColumnError("Could not locate column in row for column '%s'" % str(key))
- else:
- try:
- rec = self.props[key]
- except KeyError:
- raise exceptions.NoSuchColumnError("Could not locate column in row for column '%s'" % str(key))
+ if isinstance(key, int) and key in self.props:
+ rec = self.props[key]
+ elif isinstance(key, basestring) and key.lower() in self.props:
+ rec = self.props[key.lower()]
+ elif isinstance(key, sql.ColumnElement):
+ label = self.column_labels.get(key._label, key.name)
+ if label in self.props:
+ rec = self.props[label]
+
+ if not "rec" in locals():
+ raise exceptions.NoSuchColumnError("Could not locate column in row for column '%s'" % (repr(key)))
+
self.__key_cache[key] = rec
return rec
+
def _has_key(self, row, key):
try:
typeobj = typeobj()
return typeobj
+ def max_identifier_length(self):
+ # TODO: probably raise this and fill out
+ # db modules better
+ return 30
+
def oid_column_name(self, column):
return None
return [self] + self.onclause._get_from_objects() + self.left._get_from_objects() + self.right._get_from_objects()
class Alias(FromClause):
- def __init__(self, selectable, alias = None):
+ def __init__(self, selectable, alias=None):
baseselectable = selectable
while isinstance(baseselectable, Alias):
baseselectable = baseselectable.selectable
for c in self.c:
yield c
yield self.selectable
+
def accept_visitor(self, visitor):
visitor.visit_alias(self)
self.is_literal = is_literal
def _get_label(self):
+ """generate a 'label' for this column.
+
+ the label is a product of the parent table name and column name, and
+ is treated as a unique identifier of this Column across all Tables and derived
+ selectables for a particular metadata collection.
+ """
+
# for a "literal" column, we've no idea what the text is
# therefore no 'label' can be automatically generated
if self.is_literal:
if self.__label is None:
if self.table is not None and self.table.named_with_column():
self.__label = self.table.name + "_" + self.name
- if self.table.c.has_key(self.__label) or len(self.__label) >= 30:
- self.__label = self.__label[0:24] + "_" + hex(random.randint(0, 65535))[2:]
+ counter = 1
+ while self.table.c.has_key(self.__label):
+ self.__label = self.__label + "_%d" % counter
+ counter += 1
else:
self.__label = self.name
self.__label = "".join([x for x in self.__label if x in legal_characters])
'sql.select',
'sql.selectable',
'sql.case_statement',
+ 'sql.labels',
# assorted round-trip tests
'sql.query',
--- /dev/null
+import testbase
+
+from sqlalchemy import *
+
+class LongLabelsTest(testbase.PersistTest):
+ def setUpAll(self):
+ global metadata, table1
+ metadata = MetaData(engine=testbase.db)
+ table1 = Table("some_large_named_table", metadata,
+ Column("this_is_the_primary_key_column", Integer, primary_key=True),
+ Column("this_is_the_data_column", String(30))
+ )
+ metadata.create_all()
+ table1.insert().execute(**{"this_is_the_primary_key_column":1, "this_is_the_data_column":"data1"})
+ table1.insert().execute(**{"this_is_the_primary_key_column":2, "this_is_the_data_column":"data2"})
+ table1.insert().execute(**{"this_is_the_primary_key_column":3, "this_is_the_data_column":"data3"})
+ table1.insert().execute(**{"this_is_the_primary_key_column":4, "this_is_the_data_column":"data4"})
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ def test_result(self):
+ r = table1.select(use_labels=True).execute()
+ result = []
+ for row in r:
+ result.append((row[table1.c.this_is_the_primary_key_column], row[table1.c.this_is_the_data_column]))
+ assert result == [
+ (1, "data1"),
+ (2, "data2"),
+ (3, "data3"),
+ (4, "data4"),
+ ]
+
+if __name__ == '__main__':
+ testbase.main()
\ No newline at end of file