- cached TypeEngine classes are cached per-dialect class
instead of per-dialect.
-
+
+ - new UserDefinedType should be used as a base class for
+ new types, which preserves the 0.5 behavior of
+ get_col_spec().
+
+ - The result_processor() method of all type classes now
+ accepts a second argument "coltype", which is the DBAPI
+ type argument from cursor.description. This argument
+ can help some types decide on the most efficient processing
+ of result values.
+
- Deprecated Dialect.get_params() removed.
- Dialect.get_rowcount() has been renamed to a descriptor
return value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is not None:
return PersistentGisElement(value)
class AcNumeric(types.Numeric):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
return None
def bind_processor(self, dialect):
def bind_processor(self, dialect):
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
return None
class AcChar(types.CHAR):
def get_col_spec(self):
return "YESNO"
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
class _FBBoolean(sqltypes.Boolean):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
return value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if isinstance(value, datetime.datetime):
return value.time()
class InfoBoolean(sqltypes.Boolean):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
return value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
while True:
if value is None:
dialect.datetimeformat,))
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
dialect.datetimeformat,))
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
dialect.datetimeformat,))
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
return str(value)
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
import sys
class MSDateTime_adodbapi(MSDateTime):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
# adodbapi will return datetimes with empty time values as datetime.date() objects.
# Promote them back to full datetime.datetime()
class _MSNumeric(sqltypes.Numeric):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
def process(value):
if value is not None:
return process
_reg = re.compile(r"(\d+)-(\d+)-(\d+)")
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if isinstance(value, datetime.datetime):
return value.date()
return process
_reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?")
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if isinstance(value, datetime.datetime):
return value.time()
__visit_name__ = 'BIT'
class _MSBoolean(sqltypes.Boolean):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
"""
self.length = length
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
"""Convert a MySQL's 64 bit, variable length binary string to a long."""
def process(value):
if value is not None:
__visit_name__ = 'TIME'
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
time = datetime.time
def process(value):
# convert from a timedelta value
length = max([len(v) for v in strip_values] + [0])
super(SET, self).__init__(length=length, **kw)
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
# The good news:
# No ',' quoting issues- commas aren't allowed in SET values
__visit_name__ = 'BOOLEAN'
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
class _DecimalType(_NumericType):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
return
def process(value):
class _oursqlNumeric(NUMERIC):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
return
def process(value):
class _oursqlBIT(BIT):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
"""oursql already converts mysql bits, so."""
return None
from sqlalchemy.dialects.mysql.base import BIT, MySQLDialect, MySQLExecutionContext
class _ZxJDBCBit(BIT):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
"""Converts boolean or byte arrays from MySQL Connector/J to longs."""
def process(value):
if value is None:
def get_dbapi_type(self, dbapi):
return dbapi.NUMBER
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
def bind_processor(self, dialect):
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if not isinstance(value, datetime):
return value
return process
class _OracleDateTime(sqltypes.DateTime):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None or isinstance(value, datetime):
return value
# only if cx_oracle contains TIMESTAMP
class _OracleTimestamp(sqltypes.TIMESTAMP):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None or isinstance(value, datetime):
return value
return process
class _LOBMixin(object):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if not dialect.auto_convert_lobs:
# return the cx_oracle.LOB directly.
# don't even call super.result_processor here.
return None
- super_process = super(_LOBMixin, self).result_processor(dialect)
+ super_process = super(_LOBMixin, self).result_processor(dialect, coltype)
lob = dialect.dbapi.LOB
if super_process:
def process(value):
return dbapi.FIXED_CHAR
class _OracleNVarChar(sqltypes.NVARCHAR):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if dialect._cx_oracle_native_nvarchar:
return None
else:
- return sqltypes.NVARCHAR.result_processor(self, dialect)
+ return sqltypes.NVARCHAR.result_processor(self, dialect, coltype)
class _OracleText(_LOBMixin, sqltypes.Text):
def get_dbapi_type(self, dbapi):
return dbapi.NCLOB
class _OracleInteger(sqltypes.Integer):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def to_int(val):
if val is not None:
val = int(val)
for bind, name in self.compiled.bind_names.iteritems():
if name in self.out_parameters:
type = bind.type
- result_processor = type.dialect_impl(self.dialect).\
- result_processor(self.dialect)
+ impl_type = type.dialect_impl(self.dialect)
+ dbapi_type = impl_type.get_dbapi_type(self.dialect.dbapi)
+ result_processor = impl_type.\
+ result_processor(self.dialect,
+ dbapi_type)
if result_processor is not None:
out_parameters[name] = \
- result_processor(self.out_parameters[name].getvalue())
+ result_processor(self.out_parameters[name].getvalue(), dbapi_type)
else:
out_parameters[name] = self.out_parameters[name].getvalue()
else:
class _ZxJDBCDate(sqltypes.Date):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
class _ZxJDBCNumeric(sqltypes.Numeric):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
def process(value):
if isinstance(value, decimal.Decimal):
return [convert_item(item) for item in value]
return process
- def result_processor(self, dialect):
- item_proc = self.item_type.result_processor(dialect)
+ def result_processor(self, dialect, coltype):
+ item_proc = self.item_type.result_processor(dialect, coltype)
def process(value):
if value is None:
return value
"""
from sqlalchemy.engine import default
import decimal
-from sqlalchemy import util
+from sqlalchemy import util, exc
from sqlalchemy import types as sqltypes
-from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext
+from sqlalchemy.dialects.postgresql.base import PGDialect, \
+ PGCompiler, PGIdentifierPreparer, PGExecutionContext
class _PGNumeric(sqltypes.Numeric):
def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect):
+ def process(value):
+ if value is not None:
+ return float(value)
+ else:
+ return value
+ return process
+
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
- return None
+ if coltype in (700, 701):
+ def process(value):
+ if value is not None:
+ return decimal.Decimal(str(value))
+ else:
+ return value
+ return process
+ elif coltype == 1700:
+ # pg8000 returns Decimal natively for 1700
+ return None
+ else:
+ raise exc.InvalidRequestError("Unknown PG numeric type: %d" % coltype)
else:
- def process(value):
- if isinstance(value, decimal.Decimal):
- return float(value)
- else:
- return value
- return process
-
+ if coltype in (700, 701):
+ # pg8000 returns float natively for 701
+ return None
+ elif coltype == 1700:
+ def process(value):
+ if value is not None:
+ return float(value)
+ else:
+ return value
+ return process
+ else:
+ raise exc.InvalidRequestError("Unknown PG numeric type: %d" % coltype)
class PostgreSQL_pg8000ExecutionContext(PGExecutionContext):
pass
PGDialect.colspecs,
{
sqltypes.Numeric : _PGNumeric,
- sqltypes.Float: sqltypes.Float, # prevents _PGNumeric from being used
}
)
def bind_processor(self, dialect):
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
- return None
+ if coltype in (700, 701):
+ def process(value):
+ if value is not None:
+ return decimal.Decimal(str(value))
+ else:
+ return value
+ return process
+ elif coltype == 1700:
+ # pg8000 returns Decimal natively for 1700
+ return None
+ else:
+ raise exc.InvalidRequestError("Unknown PG numeric type: %d" % coltype)
else:
- def process(value):
- if isinstance(value, decimal.Decimal):
- return float(value)
- else:
- return value
- return process
-
+ if coltype in (700, 701):
+ # pg8000 returns float natively for 701
+ return None
+ elif coltype == 1700:
+ def process(value):
+ if value is not None:
+ return float(value)
+ else:
+ return value
+ return process
+ else:
+ raise exc.InvalidRequestError("Unknown PG numeric type: %d" % coltype)
class _PGEnum(ENUM):
def __init__(self, *arg, **kw):
PGDialect.colspecs,
{
sqltypes.Numeric : _PGNumeric,
- sqltypes.Float: sqltypes.Float, # prevents _PGNumeric from being used
ENUM : _PGEnum, # needs force_unicode
sqltypes.Enum : _PGEnum, # needs force_unicode
ARRAY : _PGArray, # needs force_unicode
def bind_processor(self, dialect):
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
return None
else:
)
_reg = re.compile(r"(\d+)-(\d+)-(\d+)(?: (\d+):(\d+):(\d+)(?:\.(\d+))?)?")
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
return self._result_processor(datetime.datetime, self._reg)
class _SLDate(_DateTimeMixin, sqltypes.Date):
)
_reg = re.compile(r"(\d+)-(\d+)-(\d+)")
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
return self._result_processor(datetime.date, self._reg)
class _SLTime(_DateTimeMixin, sqltypes.Time):
)
_reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?")
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
return self._result_processor(datetime.time, self._reg)
return value and 1 or 0
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
__visit_name__ = "UNIQUEIDENTIFIER"
class SybaseBoolean(sqltypes.Boolean):
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return None
typemap = self.dialect.dbapi_type_map
for i, (colname, coltype) in enumerate(m[0:2] for m in metadata):
-
if self.dialect.description_encoding:
colname = colname.decode(self.dialect.description_encoding)
name, obj, type_ = (colname, None, typemap.get(coltype, types.NULLTYPE))
processor = type_.dialect_impl(self.dialect).\
- result_processor(self.dialect)
+ result_processor(self.dialect, coltype)
if processor:
def make_colfunc(processor, index):
return value
def bind_processor(self, dialect):
- """Defines a bind parameter processing function."""
+ """Defines a bind parameter processing function.
+
+ :param dialect: Dialect instance in use.
+
+ """
return None
- def result_processor(self, dialect):
- """Defines a result-column processing function."""
+ def result_processor(self, dialect, coltype):
+ """Defines a result-column processing function.
+
+ :param dialect: Dialect instance in use.
+
+ :param coltype: DBAPI coltype argument received in cursor.description.
+
+ """
return None
"""
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
"""Return a conversion function for processing result row values.
Returns a callable which will receive a result row column
return value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
return value
return process
else:
return self.impl.bind_processor(dialect)
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.__class__.process_result_value.func_code is not TypeDecorator.process_result_value.func_code:
process_value = self.process_result_value
- impl_processor = self.impl.result_processor(dialect)
+ impl_processor = self.impl.result_processor(dialect, coltype)
if impl_processor:
def process(value):
return process_value(impl_processor(value), dialect)
return process_value(value, dialect)
return process
else:
- return self.impl.result_processor(dialect)
+ return self.impl.result_processor(dialect, coltype)
def copy(self):
instance = self.__class__.__new__(self.__class__)
else:
return None
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if (not dialect.returns_unicode_strings or self.convert_unicode == 'force') \
and (self.convert_unicode or dialect.convert_unicode):
def process(value):
self.asdecimal = asdecimal
def adapt(self, impltype):
- return impltype(precision=self.precision, scale=self.scale, asdecimal=self.asdecimal)
+ return impltype(
+ precision=self.precision,
+ scale=self.scale,
+ asdecimal=self.asdecimal)
def get_dbapi_type(self, dbapi):
return dbapi.NUMBER
return value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if self.asdecimal:
def process(value):
if value is not None:
return None
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
if util.jython:
def process(value):
if value is not None:
return value
return process
- def result_processor(self, dialect):
- impl_processor = self.impl.result_processor(dialect)
+ def result_processor(self, dialect, coltype):
+ impl_processor = self.impl.result_processor(dialect, coltype)
loads = self.pickler.loads
if impl_processor:
def process(value):
return value
return process
- def result_processor(self, dialect):
- impl_processor = self.impl.result_processor(dialect)
+ def result_processor(self, dialect, coltype):
+ impl_processor = self.impl.result_processor(dialect, coltype)
epoch = self.epoch
if impl_processor:
def process(value):
from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
from sqlalchemy.test import engines
import datetime
+import decimal
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy import exc, schema, types
from sqlalchemy.test import *
from sqlalchemy.sql import table, column
from sqlalchemy.test.testing import eq_
+from test.engine._base import TablesTest
class SequenceTest(TestBase, AssertsCompiledSQL):
def test_basic(self):
"SELECT EXTRACT(%s FROM t.col1::timestamp) AS anon_1 "
"FROM t" % field)
+class FloatCoercionTest(TablesTest, AssertsExecutionResults):
+ __only_on__ = 'postgresql'
+ __dialect__ = postgresql.dialect()
+
+ @classmethod
+ def define_tables(cls, metadata):
+ data_table = Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', Integer)
+ )
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def insert_data(cls):
+ data_table.insert().execute(
+ {'data':3},
+ {'data':5},
+ {'data':7},
+ {'data':2},
+ {'data':15},
+ {'data':12},
+ {'data':6},
+ {'data':478},
+ {'data':52},
+ {'data':9},
+ )
+
+ def _round(self, x):
+ if isinstance(x, float):
+ return round(x, 9)
+ elif isinstance(x, decimal.Decimal):
+ # really ?
+ x = x.shift(decimal.Decimal(9)).to_integral() / pow(10, 9)
+ return x
+ @testing.resolve_artifact_names
+ def test_float_coercion(self):
+ for type_, result in [
+ (Numeric, decimal.Decimal('140.381230939')),
+ (Float, 140.381230939),
+ (Float(asdecimal=True), decimal.Decimal('140.381230939')),
+ (Numeric(asdecimal=False), 140.381230939),
+ ]:
+ ret = testing.db.execute(
+ select([
+ func.stddev_pop(data_table.c.data, type_=type_)
+ ])
+ ).scalar()
+
+ eq_(self._round(ret), result)
+
+ ret = testing.db.execute(
+ select([
+ cast(func.stddev_pop(data_table.c.data), type_)
+ ])
+ ).scalar()
+ eq_(self._round(ret), result)
+
+
+
class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
__only_on__ = 'postgresql'
__dialect__ = postgresql.dialect()
bp = sldt.bind_processor(None)
eq_(bp(dt), '2008-06-27 12:00:00.000125')
- rp = sldt.result_processor(None)
+ rp = sldt.result_processor(None, None)
eq_(rp(bp(dt)), dt)
sldt.__legacy_microseconds__ = True
def process(value):
return "BIND_IN"+ value
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
return value + "BIND_OUT"
return process
def process(value):
return "BIND_IN"+ impl_processor(value)
return process
- def result_processor(self, dialect):
- impl_processor = super(MyDecoratedType, self).result_processor(dialect) or (lambda value:value)
+ def result_processor(self, dialect, coltype):
+ impl_processor = super(MyDecoratedType, self).result_processor(dialect, coltype) or (lambda value:value)
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
return "BIND_IN"+ impl_processor(value)
return process
- def result_processor(self, dialect):
- impl_processor = super(MyUnicodeType, self).result_processor(dialect) or (lambda value:value)
+ def result_processor(self, dialect, coltype):
+ impl_processor = super(MyUnicodeType, self).result_processor(dialect, coltype) or (lambda value:value)
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
def process(value):
return value * 10
return process
- def result_processor(self, dialect):
+ def result_processor(self, dialect, coltype):
def process(value):
return value / 10
return process