return self._extend("NCHAR")
-class MSBinary(sqltypes.Binary):
+class MSGenericBinary(sqltypes.Binary):
+ """The Binary type assumes that a Binary specification without a length
+ is an unbound Binary type whereas one with a length specification results
+ in a fixed length Binary type.
+
+ If you want standard MSSQL ``BINARY`` behavior use the ``MSBinary`` type.
+
+ """
+
+ def get_col_spec(self):
+ if self.length:
+ return "BINARY(%s)" % self.length
+ else:
+ return "IMAGE"
+
+
+class MSBinary(MSGenericBinary):
def get_col_spec(self):
if self.length:
return "BINARY(%s)" % self.length
return "BINARY"
-class MSVarBinary(MSBinary):
+class MSVarBinary(MSGenericBinary):
def get_col_spec(self):
if self.length:
return "VARBINARY(%s)" % self.length
return "VARBINARY"
-class MSImage(MSBinary):
+class MSImage(MSGenericBinary):
def get_col_spec(self):
return "IMAGE"
return value and True or False
return process
+
class MSTimeStamp(sqltypes.TIMESTAMP):
def get_col_spec(self):
return "TIMESTAMP"
+
class MSMoney(sqltypes.TypeEngine):
def get_col_spec(self):
return "MONEY"
+
class MSSmallMoney(MSMoney):
def get_col_spec(self):
return "SMALLMONEY"
+
class MSUniqueIdentifier(sqltypes.TypeEngine):
def get_col_spec(self):
return "UNIQUEIDENTIFIER"
+
class MSVariant(sqltypes.TypeEngine):
def get_col_spec(self):
return "SQL_VARIANT"
sqltypes.Date : MSDate,
sqltypes.Time : MSTime,
sqltypes.String : MSString,
- sqltypes.Binary : MSBinary,
+ sqltypes.Binary : MSGenericBinary,
sqltypes.Boolean : MSBoolean,
sqltypes.Text : MSText,
sqltypes.UnicodeText : MSNText,
import testenv; testenv.configure_for_tests()
-import re
+import os, pickleable, re
from sqlalchemy import *
+from sqlalchemy import types, exc
from sqlalchemy.orm import *
-from sqlalchemy import exc
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
import sqlalchemy.engine.url as url
testing.db, None, None).get_column_specification(c)
+class BinaryTest(TestBase, AssertsExecutionResults):
+ """Test the Binary and VarBinary types"""
+ def setUpAll(self):
+ global binary_table, MyPickleType
+
+ class MyPickleType(types.TypeDecorator):
+ impl = PickleType
+
+ def process_bind_param(self, value, dialect):
+ if value:
+ value.stuff = 'this is modified stuff'
+ return value
+
+ def process_result_value(self, value, dialect):
+ if value:
+ value.stuff = 'this is the right stuff'
+ return value
+
+ binary_table = Table('binary_table', MetaData(testing.db),
+ Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
+ Column('data', mssql.MSVarBinary(8000)),
+ Column('data_image', mssql.MSImage),
+ Column('data_slice', Binary(100)),
+ Column('misc', String(30)),
+ # construct PickleType with non-native pickle module, since cPickle uses relative module
+ # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
+ # to the 'types' module
+ Column('pickled', PickleType),
+ Column('mypickle', MyPickleType)
+ )
+ binary_table.create()
+
+ def tearDown(self):
+ binary_table.delete().execute()
+
+ def tearDownAll(self):
+ binary_table.drop()
+
+ def testbinary(self):
+ testobj1 = pickleable.Foo('im foo 1')
+ testobj2 = pickleable.Foo('im foo 2')
+ testobj3 = pickleable.Foo('im foo 3')
+
+ stream1 =self.load_stream('binary_data_one.dat')
+ stream2 =self.load_stream('binary_data_two.dat')
+ binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_image=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
+ binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_image=stream2, data_slice=stream2[0:99], pickled=testobj2)
+ binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_image=None, data_slice=stream2[0:99], pickled=None)
+
+ for stmt in (
+ binary_table.select(order_by=binary_table.c.primary_id),
+ text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db)
+ ):
+ l = stmt.execute().fetchall()
+ self.assertEquals(list(stream1), list(l[0]['data']))
+
+ paddedstream = list(stream1[0:100])
+ paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
+ self.assertEquals(paddedstream, list(l[0]['data_slice']))
+
+ self.assertEquals(list(stream2), list(l[1]['data']))
+ self.assertEquals(list(stream2), list(l[1]['data_image']))
+ self.assertEquals(testobj1, l[0]['pickled'])
+ self.assertEquals(testobj2, l[1]['pickled'])
+ self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata)
+ self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff')
+
+ def load_stream(self, name, len=3000):
+ f = os.path.join(os.path.dirname(testenv.__file__), name)
+ return file(f).read(len)
+
+
if __name__ == "__main__":
testenv.main()