@classmethod
def setup_class(cls):
- global binary_table, MyPickleType
+ global MyPickleType
class MyPickleType(types.TypeDecorator):
impl = PickleType
value.stuff = 'this is the right stuff'
return value
- binary_table = Table(
+ def teardown(self):
+ self.binary_table.drop(testing.db)
+
+ def _fixture(self, engine):
+ self.binary_table = binary_table = Table(
'binary_table',
- MetaData(testing.db),
+ MetaData(),
Column('primary_id', Integer, Sequence('binary_id_seq',
optional=True), primary_key=True),
Column('data', mssql.MSVarBinary(8000)),
Column('pickled', PickleType),
Column('mypickle', MyPickleType),
)
- binary_table.create()
+ binary_table.create(engine)
+ return binary_table
- def teardown(self):
- binary_table.delete().execute()
+ def test_binary_legacy_types(self):
+ self._test_binary(False)
- @classmethod
- def teardown_class(cls):
- binary_table.drop()
+ @testing.only_on('mssql >= 11')
+ def test_binary_updated_types(self):
+ self._test_binary(True)
- def test_binary(self):
+ def test_binary_none_legacy_types(self):
+ self._test_binary_none(False)
+
+ @testing.only_on('mssql >= 11')
+ def test_binary_none_updated_types(self):
+ self._test_binary_none(True)
+
+ def _test_binary(self, deprecate_large_types):
testobj1 = pickleable.Foo('im foo 1')
testobj2 = pickleable.Foo('im foo 2')
testobj3 = pickleable.Foo('im foo 3')
- stream1 = self.load_stream('binary_data_one.dat')
- stream2 = self.load_stream('binary_data_two.dat')
- binary_table.insert().execute(
- primary_id=1,
- misc='binary_data_one.dat',
- data=stream1,
- data_image=stream1,
- data_slice=stream1[0:100],
- pickled=testobj1,
- mypickle=testobj3,
- )
- binary_table.insert().execute(
- primary_id=2,
- misc='binary_data_two.dat',
- data=stream2,
- data_image=stream2,
- data_slice=stream2[0:99],
- pickled=testobj2,
- )
+ stream1 = self._load_stream('binary_data_one.dat')
+ stream2 = self._load_stream('binary_data_two.dat')
+ engine = engines.testing_engine(
+ options={"deprecate_large_types": deprecate_large_types})
+
+ binary_table = self._fixture(engine)
+
+ with engine.connect() as conn:
+ conn.execute(
+ binary_table.insert(),
+ primary_id=1,
+ misc='binary_data_one.dat',
+ data=stream1,
+ data_image=stream1,
+ data_slice=stream1[0:100],
+ pickled=testobj1,
+ mypickle=testobj3,
+ )
+ conn.execute(
+ binary_table.insert(),
+ primary_id=2,
+ misc='binary_data_two.dat',
+ data=stream2,
+ data_image=stream2,
+ data_slice=stream2[0:99],
+ pickled=testobj2,
+ )
for stmt in \
binary_table.select(order_by=binary_table.c.primary_id), \
data_slice=types.BINARY(100), pickled=PickleType,
mypickle=MyPickleType),
bind=testing.db):
- l = stmt.execute().fetchall()
+ with engine.connect() as conn:
+ l = conn.execute(stmt).fetchall()
eq_(list(stream1), list(l[0]['data']))
paddedstream = list(stream1[0:100])
paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
eq_(testobj3.moredata, l[0]['mypickle'].moredata)
eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
- @testing.requires.no_mssql_freetds
- def test_binary_none(self):
- # TODO: pyodbc does not seem to accept "None" for a VARBINARY
- # column (data=None). error: [Microsoft][ODBC SQL Server
- # Driver][SQL Server]Implicit conversion from data type varchar
- # to varbinary is not allowed. Use the CONVERT function to run
- # this query. (257) binary_table.insert().execute(primary_id=3,
- # misc='binary_data_two.dat', data=None, data_image=None,
- # data_slice=stream2[0:99], pickled=None)
-
- stream2 = self.load_stream('binary_data_two.dat')
-
- binary_table.insert().execute(
- primary_id=3,
- misc='binary_data_two.dat', data_image=None,
- data_slice=stream2[0:99], pickled=None)
- for stmt in \
- binary_table.select(), \
- text(
- 'select * from binary_table',
- typemap=dict(
- data=mssql.MSVarBinary(8000),
- data_image=mssql.MSImage,
- data_slice=types.BINARY(100), pickled=PickleType,
- mypickle=MyPickleType),
- bind=testing.db):
- row = stmt.execute().first()
- eq_(
- row['pickled'], None
- )
- eq_(
- row['data_image'], None
- )
- eq_(
- row['data_slice'], stream2[0:99]
- )
+ def _test_binary_none(self, deprecate_large_types):
+ engine = engines.testing_engine(
+ options={"deprecate_large_types": deprecate_large_types})
- def load_stream(self, name, len=3000):
+ binary_table = self._fixture(engine)
+
+ stream2 = self._load_stream('binary_data_two.dat')
+
+ with engine.connect() as conn:
+ conn.execute(
+ binary_table.insert(),
+ primary_id=3,
+ misc='binary_data_two.dat', data_image=None,
+ data_slice=stream2[0:99], pickled=None)
+ for stmt in \
+ binary_table.select(), \
+ text(
+ 'select * from binary_table',
+ typemap=dict(
+ data=mssql.MSVarBinary(8000),
+ data_image=mssql.MSImage,
+ data_slice=types.BINARY(100),
+ pickled=PickleType,
+ mypickle=MyPickleType),
+ bind=testing.db):
+ row = conn.execute(stmt).first()
+ eq_(
+ row['pickled'], None
+ )
+ eq_(
+ row['data_image'], None
+ )
+
+ # the type we used here is 100 bytes
+ # so we will get 100 bytes zero-padded
+ paddedstream = list(stream2[0:99])
+ paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
+ eq_(
+ list(row['data_slice']), paddedstream
+ )
+
+ def _load_stream(self, name, len=3000):
fp = open(
os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb')
stream = fp.read(len)