engine.execute(tbl.delete())
-binary_table = None
-MyPickleType = None
+class BinaryTest(fixtures.TestBase):
+ __only_on__ = 'mssql'
+ __requires__ = "non_broken_binary",
+ __backend__ = True
-class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
+ def test_character_binary(self):
+ self._test_round_trip(
+ mssql.MSVarBinary(800), b("some normal data")
+ )
- """Test the Binary and VarBinary types"""
+ @testing.provide_metadata
+ def _test_round_trip(
+ self, type_, data, deprecate_large_types=True,
+ expected=None):
+ if testing.db.dialect.deprecate_large_types is not \
+ deprecate_large_types:
+ engine = engines.testing_engine(
+ options={"deprecate_large_types": deprecate_large_types})
+ else:
+ engine = testing.db
+
+ binary_table = Table(
+ 'binary_table', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', type_)
+ )
+ binary_table.create()
- __only_on__ = 'mssql'
- __requires__ = "non_broken_binary",
- __backend__ = True
+ if expected is None:
+ expected = data
- @classmethod
- def setup_class(cls):
- global MyPickleType
+ with engine.connect() as conn:
+ conn.execute(
+ binary_table.insert(),
+ data=data
+ )
+
+ eq_(
+ conn.scalar(select([binary_table.c.data])),
+ expected
+ )
+
+ eq_(
+ conn.scalar(
+ text("select data from binary_table").
+ columns(binary_table.c.data)
+ ),
+ expected
+ )
+
+ conn.execute(binary_table.delete())
+
+ conn.execute(binary_table.insert(), data=None)
+ eq_(
+ conn.scalar(select([binary_table.c.data])),
+ None
+ )
+
+ eq_(
+ conn.scalar(
+ text("select data from binary_table").
+ columns(binary_table.c.data)
+ ),
+ None
+ )
+
+ def test_plain_pickle(self):
+ self._test_round_trip(
+ PickleType, pickleable.Foo("im foo 1")
+ )
+
+ def test_custom_pickle(self):
class MyPickleType(types.TypeDecorator):
impl = PickleType
def process_bind_param(self, value, dialect):
if value:
- value.stuff = 'this is modified stuff'
+ value.stuff = "BIND" + value.stuff
return value
def process_result_value(self, value, dialect):
if value:
- value.stuff = 'this is the right stuff'
+ value.stuff = value.stuff + "RESULT"
return value
- def teardown(self):
- self.binary_table.drop(testing.db)
-
- def _fixture(self, engine):
- self.binary_table = binary_table = Table(
- 'binary_table',
- MetaData(),
- Column('primary_id', Integer, Sequence('binary_id_seq',
- optional=True), primary_key=True),
- Column('data', mssql.MSVarBinary(8000)),
- Column('data_image', mssql.MSImage),
- Column('data_slice', types.BINARY(100)),
- Column('misc', String(30)),
- Column('pickled', PickleType),
- Column('mypickle', MyPickleType),
- )
- binary_table.create(engine)
- return binary_table
-
- def test_character_binary(self):
- engine = testing.db
- binary_table = self._fixture(engine)
- with engine.connect() as conn:
- conn.execute(
- binary_table.insert(),
- primary_id=1,
- data=b("some normal data")
- )
+ data = pickleable.Foo("im foo 1")
+ expected = pickleable.Foo("im foo 1")
+ expected.stuff = "BINDim stuffRESULT"
- def test_binary_legacy_types(self):
- self._test_binary(False)
+ self._test_round_trip(
+ MyPickleType, data,
+ expected=expected
+ )
- @testing.only_on('mssql >= 11')
- def test_binary_updated_types(self):
- self._test_binary(True)
+ def test_image(self):
+ stream1 = self._load_stream('binary_data_one.dat')
+ self._test_round_trip(
+ mssql.MSImage,
+ stream1
+ )
- def test_binary_none_legacy_types(self):
- self._test_binary_none(False)
+ def test_large_binary(self):
+ stream1 = self._load_stream('binary_data_one.dat')
+ self._test_round_trip(
+ sqltypes.LargeBinary,
+ stream1
+ )
- @testing.only_on('mssql >= 11')
- def test_binary_none_updated_types(self):
- self._test_binary_none(True)
+ def test_large_legacy_types(self):
+ stream1 = self._load_stream('binary_data_one.dat')
+ self._test_round_trip(
+ sqltypes.LargeBinary,
+ stream1,
+ deprecate_large_types=False
+ )
- def _test_binary(self, deprecate_large_types):
- testobj1 = pickleable.Foo('im foo 1')
- testobj2 = pickleable.Foo('im foo 2')
- testobj3 = pickleable.Foo('im foo 3')
+ def test_slice_one(self):
stream1 = self._load_stream('binary_data_one.dat')
- stream2 = self._load_stream('binary_data_two.dat')
- engine = engines.testing_engine(
- options={"deprecate_large_types": deprecate_large_types})
- binary_table = self._fixture(engine)
+ data = stream1[0:100]
- with engine.connect() as conn:
- conn.execute(
- binary_table.insert(),
- primary_id=1,
- misc='binary_data_one.dat',
- data=stream1,
- data_image=stream1,
- data_slice=stream1[0:100],
- pickled=testobj1,
- mypickle=testobj3,
- )
- conn.execute(
- binary_table.insert(),
- primary_id=2,
- misc='binary_data_two.dat',
- data=stream2,
- data_image=stream2,
- data_slice=stream2[0:99],
- pickled=testobj2,
- )
+ self._test_round_trip(
+ types.BINARY(100),
+ data
+ )
- for stmt in \
- binary_table.select(order_by=binary_table.c.primary_id), \
- text(
- 'select * from binary_table order by '
- 'binary_table.primary_id',
- typemap=dict(
- data=mssql.MSVarBinary(8000),
- data_image=mssql.MSImage,
- data_slice=types.BINARY(100), pickled=PickleType,
- mypickle=MyPickleType),
- bind=testing.db):
- with engine.connect() as conn:
- result = conn.execute(stmt).fetchall()
- eq_(list(stream1), list(result[0]['data']))
- paddedstream = list(stream1[0:100])
- paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
- eq_(paddedstream, list(result[0]['data_slice']))
- eq_(list(stream2), list(result[1]['data']))
- eq_(list(stream2), list(result[1]['data_image']))
- eq_(testobj1, result[0]['pickled'])
- eq_(testobj2, result[1]['pickled'])
- eq_(testobj3.moredata, result[0]['mypickle'].moredata)
- eq_(result[0]['mypickle'].stuff, 'this is the right stuff')
-
- def _test_binary_none(self, deprecate_large_types):
- engine = engines.testing_engine(
- options={"deprecate_large_types": deprecate_large_types})
+ def test_slice_zeropadding(self):
+ stream2 = self._load_stream('binary_data_two.dat')
- binary_table = self._fixture(engine)
+ data = stream2[0:99]
- stream2 = self._load_stream('binary_data_two.dat')
+ # the type we used here is 100 bytes
+ # so we will get 100 bytes zero-padded
- with engine.connect() as conn:
- conn.execute(
- binary_table.insert(),
- primary_id=3,
- misc='binary_data_two.dat', data_image=None,
- data_slice=stream2[0:99], pickled=None)
- for stmt in \
- binary_table.select(), \
- text(
- 'select * from binary_table',
- typemap=dict(
- data=mssql.MSVarBinary(8000),
- data_image=mssql.MSImage,
- data_slice=types.BINARY(100),
- pickled=PickleType,
- mypickle=MyPickleType),
- bind=testing.db):
- row = conn.execute(stmt).first()
- eq_(
- row['pickled'], None
- )
- eq_(
- row['data_image'], None
- )
-
- # the type we used here is 100 bytes
- # so we will get 100 bytes zero-padded
- paddedstream = list(stream2[0:99])
- if util.py3k:
- paddedstream.extend([0] * (100 - len(paddedstream)))
- else:
- paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
- eq_(
- list(row['data_slice']), paddedstream
- )
+ paddedstream = stream2[0:99] + b'\x00'
+
+ self._test_round_trip(
+ types.BINARY(100),
+ data, expected=paddedstream
+ )
def _load_stream(self, name, len=3000):
fp = open(