--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 4121
+
+ Fixed bug where sqltypes.BINARY and sqltypes.VARBINARY datatypes
+ would not include correct bound-value handlers for pyodbc,
+ which allows the pyodbc.NullParam value to be passed that
+ helps with FreeTDS.
+
+
+
from ...engine import reflection, default
from ... import types as sqltypes
from ...types import INTEGER, BIGINT, SMALLINT, DECIMAL, NUMERIC, \
- FLOAT, DATETIME, DATE, BINARY,\
+ FLOAT, DATETIME, DATE, BINARY, \
TEXT, VARCHAR, NVARCHAR, CHAR, NCHAR
class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
"""The MSSQL VARBINARY type.
- This type extends both :class:`.types.VARBINARY` and
- :class:`.types.LargeBinary`. In "deprecate_large_types" mode,
- the :class:`.types.LargeBinary` type will produce ``VARBINARY(max)``
- on SQL Server.
+ This type is present to support "deprecate_large_types" mode where
+ either ``VARBINARY(max)`` or IMAGE is rendered. Otherwise, this type
+ object is redundant vs. :class:`.types.VARBINARY`.
.. versionadded:: 1.0.0
"""
-from .base import MSExecutionContext, MSDialect, VARBINARY
+from .base import MSExecutionContext, MSDialect, BINARY, VARBINARY
from ...connectors.pyodbc import PyODBCConnector
from ... import types as sqltypes, util, exc
import decimal
pass
-class _VARBINARY_pyodbc(VARBINARY):
+class _ms_binary_pyodbc(object):
+ """Wraps binary values in dialect-specific Binary wrapper.
+ If the value is null, return a pyodbc-specific BinaryNull
+ object to prevent pyODBC [and FreeTDS] from defaulting binary
+ NULL types to SQLWCHAR and causing implicit conversion errors.
+ """
+
def bind_processor(self, dialect):
if dialect.dbapi is None:
return None
return process
+class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY):
+ pass
+
+
+class _BINARY_pyodbc(_ms_binary_pyodbc, BINARY):
+ pass
+
+
class MSExecutionContext_pyodbc(MSExecutionContext):
_embedded_scope_identity = False
{
sqltypes.Numeric: _MSNumeric_pyodbc,
sqltypes.Float: _MSFloat_pyodbc,
+ BINARY: _BINARY_pyodbc,
+
+ # SQL Server dialect has a VARBINARY that is just to support
+ # "deprecate_large_types" w/ VARBINARY(max), but also we must
+ # handle the usual SQL standard VARBINARY
VARBINARY: _VARBINARY_pyodbc,
+ sqltypes.VARBINARY: _VARBINARY_pyodbc,
sqltypes.LargeBinary: _VARBINARY_pyodbc,
}
)
__requires__ = "non_broken_binary",
__backend__ = True
-
def test_character_binary(self):
self._test_round_trip(
mssql.MSVarBinary(800), b("some normal data")
Column('id', Integer, primary_key=True),
Column('data', type_)
)
- binary_table.create()
+ binary_table.create(engine)
if expected is None:
expected = data
deprecate_large_types=False
)
- def test_slice_one(self):
+ def test_mssql_varbinary_max(self):
+ stream1 = self._load_stream('binary_data_one.dat')
+ self._test_round_trip(
+ mssql.VARBINARY("max"),
+ stream1
+ )
+
+ def test_mssql_legacy_varbinary_max(self):
+ stream1 = self._load_stream('binary_data_one.dat')
+ self._test_round_trip(
+ mssql.VARBINARY("max"),
+ stream1,
+ deprecate_large_types=False
+ )
+
+ def test_binary_slice(self):
+ self._test_var_slice(types.BINARY)
+
+ def test_binary_slice_zeropadding(self):
+ self._test_var_slice_zeropadding(types.BINARY, True)
+
+ def test_varbinary_slice(self):
+ self._test_var_slice(types.VARBINARY)
+
+ def test_varbinary_slice_zeropadding(self):
+ self._test_var_slice_zeropadding(types.VARBINARY, False)
+
+ def test_mssql_varbinary_slice(self):
+ self._test_var_slice(mssql.VARBINARY)
+
+ def test_mssql_varbinary_slice_zeropadding(self):
+ self._test_var_slice_zeropadding(mssql.VARBINARY, False)
+
+ def _test_var_slice(self, type_):
stream1 = self._load_stream('binary_data_one.dat')
data = stream1[0:100]
self._test_round_trip(
- types.BINARY(100),
+ type_(100),
data
)
- def test_slice_zeropadding(self):
+ def _test_var_slice_zeropadding(
+ self, type_, pad, deprecate_large_types=True):
stream2 = self._load_stream('binary_data_two.dat')
data = stream2[0:99]
# the type we used here is 100 bytes
# so we will get 100 bytes zero-padded
- paddedstream = stream2[0:99] + b'\x00'
+ if pad:
+ paddedstream = stream2[0:99] + b'\x00'
+ else:
+ paddedstream = stream2[0:99]
self._test_round_trip(
- types.BINARY(100),
+ type_(100),
data, expected=paddedstream
)