]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Modernize SQL server binary tests
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 24 Oct 2017 16:40:25 +0000 (12:40 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 24 Oct 2017 16:40:25 +0000 (12:40 -0400)
In preparation for I6e3c16a69465b4fbc7b17a1927fb5e66acee93cb
let's first use simplified binary test fixtures.

Change-Id: Ie0ac3ad194a64019bfcea0e5001cc8bdcf8a05e5

test/dialect/mssql/test_types.py

index 63a2f3bd3c29a5b6b91fe3e0c183519d4cf3653c..00955b426d07f384278ab851b339b1b2e4c380b6 100644 (file)
@@ -893,177 +893,148 @@ class TypeRoundTripTest(
                 engine.execute(tbl.delete())
 
 
-binary_table = None
-MyPickleType = None
+class BinaryTest(fixtures.TestBase):
+    __only_on__ = 'mssql'
+    __requires__ = "non_broken_binary",
+    __backend__ = True
 
 
-class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
+    def test_character_binary(self):
+        self._test_round_trip(
+            mssql.MSVarBinary(800), b("some normal data")
+        )
 
-    """Test the Binary and VarBinary types"""
+    @testing.provide_metadata
+    def _test_round_trip(
+            self, type_, data, deprecate_large_types=True,
+            expected=None):
+        if testing.db.dialect.deprecate_large_types is not \
+                deprecate_large_types:
+            engine = engines.testing_engine(
+                options={"deprecate_large_types": deprecate_large_types})
+        else:
+            engine = testing.db
+
+        binary_table = Table(
+            'binary_table', self.metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', type_)
+        )
+        binary_table.create()
 
-    __only_on__ = 'mssql'
-    __requires__ = "non_broken_binary",
-    __backend__ = True
+        if expected is None:
+            expected = data
 
-    @classmethod
-    def setup_class(cls):
-        global MyPickleType
+        with engine.connect() as conn:
+            conn.execute(
+                binary_table.insert(),
+                data=data
+            )
+
+            eq_(
+                conn.scalar(select([binary_table.c.data])),
+                expected
+            )
+
+            eq_(
+                conn.scalar(
+                    text("select data from binary_table").
+                    columns(binary_table.c.data)
+                ),
+                expected
+            )
+
+            conn.execute(binary_table.delete())
+
+            conn.execute(binary_table.insert(), data=None)
+            eq_(
+                conn.scalar(select([binary_table.c.data])),
+                None
+            )
+
+            eq_(
+                conn.scalar(
+                    text("select data from binary_table").
+                    columns(binary_table.c.data)
+                ),
+                None
+            )
+
+    def test_plain_pickle(self):
+        self._test_round_trip(
+            PickleType, pickleable.Foo("im foo 1")
+        )
+
+    def test_custom_pickle(self):
 
         class MyPickleType(types.TypeDecorator):
             impl = PickleType
 
             def process_bind_param(self, value, dialect):
                 if value:
-                    value.stuff = 'this is modified stuff'
+                    value.stuff = "BIND" + value.stuff
                 return value
 
             def process_result_value(self, value, dialect):
                 if value:
-                    value.stuff = 'this is the right stuff'
+                    value.stuff = value.stuff + "RESULT"
                 return value
 
-    def teardown(self):
-        self.binary_table.drop(testing.db)
-
-    def _fixture(self, engine):
-        self.binary_table = binary_table = Table(
-            'binary_table',
-            MetaData(),
-            Column('primary_id', Integer, Sequence('binary_id_seq',
-                   optional=True), primary_key=True),
-            Column('data', mssql.MSVarBinary(8000)),
-            Column('data_image', mssql.MSImage),
-            Column('data_slice', types.BINARY(100)),
-            Column('misc', String(30)),
-            Column('pickled', PickleType),
-            Column('mypickle', MyPickleType),
-        )
-        binary_table.create(engine)
-        return binary_table
-
-    def test_character_binary(self):
-        engine = testing.db
-        binary_table = self._fixture(engine)
-        with engine.connect() as conn:
-            conn.execute(
-                binary_table.insert(),
-                primary_id=1,
-                data=b("some normal data")
-            )
+        data = pickleable.Foo("im foo 1")
+        expected = pickleable.Foo("im foo 1")
+        expected.stuff = "BINDim stuffRESULT"
 
-    def test_binary_legacy_types(self):
-        self._test_binary(False)
+        self._test_round_trip(
+            MyPickleType, data,
+            expected=expected
+        )
 
-    @testing.only_on('mssql >= 11')
-    def test_binary_updated_types(self):
-        self._test_binary(True)
+    def test_image(self):
+        stream1 = self._load_stream('binary_data_one.dat')
+        self._test_round_trip(
+            mssql.MSImage,
+            stream1
+        )
 
-    def test_binary_none_legacy_types(self):
-        self._test_binary_none(False)
+    def test_large_binary(self):
+        stream1 = self._load_stream('binary_data_one.dat')
+        self._test_round_trip(
+            sqltypes.LargeBinary,
+            stream1
+        )
 
-    @testing.only_on('mssql >= 11')
-    def test_binary_none_updated_types(self):
-        self._test_binary_none(True)
+    def test_large_legacy_types(self):
+        stream1 = self._load_stream('binary_data_one.dat')
+        self._test_round_trip(
+            sqltypes.LargeBinary,
+            stream1,
+            deprecate_large_types=False
+        )
 
-    def _test_binary(self, deprecate_large_types):
-        testobj1 = pickleable.Foo('im foo 1')
-        testobj2 = pickleable.Foo('im foo 2')
-        testobj3 = pickleable.Foo('im foo 3')
+    def test_slice_one(self):
         stream1 = self._load_stream('binary_data_one.dat')
-        stream2 = self._load_stream('binary_data_two.dat')
-        engine = engines.testing_engine(
-            options={"deprecate_large_types": deprecate_large_types})
 
-        binary_table = self._fixture(engine)
+        data = stream1[0:100]
 
-        with engine.connect() as conn:
-            conn.execute(
-                binary_table.insert(),
-                primary_id=1,
-                misc='binary_data_one.dat',
-                data=stream1,
-                data_image=stream1,
-                data_slice=stream1[0:100],
-                pickled=testobj1,
-                mypickle=testobj3,
-            )
-            conn.execute(
-                binary_table.insert(),
-                primary_id=2,
-                misc='binary_data_two.dat',
-                data=stream2,
-                data_image=stream2,
-                data_slice=stream2[0:99],
-                pickled=testobj2,
-            )
+        self._test_round_trip(
+            types.BINARY(100),
+            data
+        )
 
-        for stmt in \
-            binary_table.select(order_by=binary_table.c.primary_id), \
-                text(
-                    'select * from binary_table order by '
-                    'binary_table.primary_id',
-                    typemap=dict(
-                        data=mssql.MSVarBinary(8000),
-                        data_image=mssql.MSImage,
-                        data_slice=types.BINARY(100), pickled=PickleType,
-                        mypickle=MyPickleType),
-                    bind=testing.db):
-            with engine.connect() as conn:
-                result = conn.execute(stmt).fetchall()
-            eq_(list(stream1), list(result[0]['data']))
-            paddedstream = list(stream1[0:100])
-            paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
-            eq_(paddedstream, list(result[0]['data_slice']))
-            eq_(list(stream2), list(result[1]['data']))
-            eq_(list(stream2), list(result[1]['data_image']))
-            eq_(testobj1, result[0]['pickled'])
-            eq_(testobj2, result[1]['pickled'])
-            eq_(testobj3.moredata, result[0]['mypickle'].moredata)
-            eq_(result[0]['mypickle'].stuff, 'this is the right stuff')
-
-    def _test_binary_none(self, deprecate_large_types):
-        engine = engines.testing_engine(
-            options={"deprecate_large_types": deprecate_large_types})
+    def test_slice_zeropadding(self):
+        stream2 = self._load_stream('binary_data_two.dat')
 
-        binary_table = self._fixture(engine)
+        data = stream2[0:99]
 
-        stream2 = self._load_stream('binary_data_two.dat')
+        # the type we used here is 100 bytes
+        # so we will get 100 bytes zero-padded
 
-        with engine.connect() as conn:
-            conn.execute(
-                binary_table.insert(),
-                primary_id=3,
-                misc='binary_data_two.dat', data_image=None,
-                data_slice=stream2[0:99], pickled=None)
-            for stmt in \
-                binary_table.select(), \
-                    text(
-                        'select * from binary_table',
-                        typemap=dict(
-                            data=mssql.MSVarBinary(8000),
-                            data_image=mssql.MSImage,
-                            data_slice=types.BINARY(100),
-                            pickled=PickleType,
-                            mypickle=MyPickleType),
-                        bind=testing.db):
-                row = conn.execute(stmt).first()
-                eq_(
-                    row['pickled'], None
-                )
-                eq_(
-                    row['data_image'], None
-                )
-
-                # the type we used here is 100 bytes
-                # so we will get 100 bytes zero-padded
-                paddedstream = list(stream2[0:99])
-                if util.py3k:
-                    paddedstream.extend([0] * (100 - len(paddedstream)))
-                else:
-                    paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
-                eq_(
-                    list(row['data_slice']), paddedstream
-                )
+        paddedstream = stream2[0:99] + b'\x00'
+
+        self._test_round_trip(
+            types.BINARY(100),
+            data, expected=paddedstream
+        )
 
     def _load_stream(self, name, len=3000):
         fp = open(