way to do things, i.e. filter(), filter_by(), all(), one(),
etc. Deprecated methods are docstring'ed with their
new replacements.
- - query.list() replaced with query.all()
- removed ancient query.select_by_attributename() capability.
- added "aliased joins" positional argument to the front of
filter_by(). this allows auto-creation of joins that are aliased
locally to the individual filter_by() call. This allows the
auto-construction of joins which cross the same paths but
- are querying divergent criteria.
+ are querying divergent criteria. ClauseElements at the front
+ of filter_by() are removed (use filter()).
- along with recent speedups to ResultProxy, total number of
function calls significantly reduced for large loads.
test/perf/masseagerload.py reports 0.4 as having the fewest number
- general
- finally figured out how to get setuptools version in, available
as sqlalchemy.__version__ [ticket:428]
+- ext
+ - iteration over dict association proxies is now dict-like, not
+ InstrumentedList-like (e.g. over keys instead of values)
+ - association proxies no longer bind tightly to source collections
+ [ticket:597], and are constructed with a thunk instead
+- orm
+ - forwards-compatibility with 0.4: added one(), first(), and
+ all() to Query
+ - added synchronization to the mapper() construction step, to avoid
+ thread collections when pre-existing mappers are compiling in a
+ different thread [ticket:613]
+ - fixed very stupid bug when deleting items with many-to-many
+ uselist=False relations
+ - remember all that stuff about polymorphic_union ? for
+ joined table inheritance ? Funny thing...
+ You sort of don't need it for joined table inheritance, you
+ can just string all the tables together via outerjoin().
+ The UNION still applies if concrete tables are involved,
+ though (since nothing to join them on).
+ - small fix to eager loading to better work with eager loads
+ to polymorphic mappers that are using a straight "outerjoin"
+ clause
- sql
- result.last_inserted_ids() should return a list that is identically
sized to the primary key constraint of the table. values that were
would not return selectable.c.col, if the selectable is a join
of a table and another join involving the same table. messed
up ORM decision making [ticket:593]
+ - added Interval type to types.py [ticket:595]
- mysql
- - added 'fields' to reserved words [ticket:590]
-
+ - added 'fields' to reserved words [ticket:590]
- oracle
- datetime fixes: got subsecond TIMESTAMP to work [ticket:604],
added OracleDate which supports types.Date with only year/month/day
- sqlite better handles datetime/date/time objects mixed and matched
with various Date/Time/DateTime columns
- string PK column inserts dont get overwritten with OID [ticket:603]
+- extensions
+ - added selectone_by() to assignmapper
0.3.8
- engines
from sqlalchemy.engine import default, base
import sqlalchemy.types as sqltypes
+import datetime
class OracleNumeric(sqltypes.Numeric):
def get_col_spec(self):
return "SMALLINT"
+class OracleDate(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+ def convert_bind_param(self, value, dialect):
+ return value
+ def convert_result_value(self, value, dialect):
+ if not isinstance(value, datetime.datetime):
+ return value
+ else:
+ return value.date()
+
class OracleDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "DATE"
# Oracle does not support TIME columns
# only if cx_oracle contains TIMESTAMP
-class OracleTimestamp(sqltypes.DateTime):
+class OracleTimestamp(sqltypes.TIMESTAMP):
def get_col_spec(self):
return "TIMESTAMP"
def get_dbapi_type(self, dialect):
return dialect.TIMESTAMP
+class OracleString(sqltypes.String):
+ def get_col_spec(self):
+ return "VARCHAR(%(length)s)" % {'length' : self.length}
+
class OracleText(sqltypes.TEXT):
def get_dbapi_type(self, dbapi):
return dbapi.CLOB
else:
return value.read()
-class OracleString(sqltypes.String):
- def get_col_spec(self):
- return "VARCHAR(%(length)s)" % {'length' : self.length}
-
class OracleRaw(sqltypes.Binary):
def get_col_spec(self):
return "RAW(%(length)s)" % {'length' : self.length}
sqltypes.Numeric : OracleNumeric,
sqltypes.Float : OracleNumeric,
sqltypes.DateTime : OracleDateTime,
- sqltypes.Date : OracleDateTime,
+ sqltypes.Date : OracleDate,
sqltypes.String : OracleString,
sqltypes.Binary : OracleBinary,
sqltypes.Boolean : OracleBoolean,
ischema_names = {
'VARCHAR2' : OracleString,
- 'DATE' : OracleDateTime,
+ 'DATE' : OracleDate,
'DATETIME' : OracleDateTime,
'NUMBER' : OracleNumeric,
'BLOB' : OracleBinary,
class DateTimeMixin(object):
def convert_bind_param(self, value, dialect):
if value is not None:
- return str(value)
+ if getattr(value, 'microsecond', None) is not None:
+ return value.strftime(self.__format__ + "." + str(value.microsecond))
+ else:
+ return value.strftime(self.__format__)
else:
return None
- def _cvt(self, value, dialect, fmt):
+ def _cvt(self, value, dialect):
if value is None:
return None
try:
microsecond = int(microsecond)
except ValueError:
(value, microsecond) = (value, 0)
- return time.strptime(value, fmt)[0:6] + (microsecond,)
+ return time.strptime(value, self.__format__)[0:6] + (microsecond,)
class SLDateTime(DateTimeMixin,sqltypes.DateTime):
+ __format__ = "%Y-%m-%d %H:%M:%S"
+
def get_col_spec(self):
return "TIMESTAMP"
def convert_result_value(self, value, dialect):
- tup = self._cvt(value, dialect, "%Y-%m-%d %H:%M:%S")
+ tup = self._cvt(value, dialect)
return tup and datetime.datetime(*tup)
class SLDate(DateTimeMixin, sqltypes.Date):
+ __format__ = "%Y-%m-%d"
+
def get_col_spec(self):
return "DATE"
def convert_result_value(self, value, dialect):
- tup = self._cvt(value, dialect, "%Y-%m-%d")
+ tup = self._cvt(value, dialect)
return tup and datetime.date(*tup[0:3])
class SLTime(DateTimeMixin, sqltypes.Time):
+ __format__ = "%H:%M:%S"
+
def get_col_spec(self):
return "TIME"
def convert_result_value(self, value, dialect):
- tup = self._cvt(value, dialect, "%H:%M:%S")
+ tup = self._cvt(value, dialect)
return tup and datetime.time(*tup[3:7])
class SLText(sqltypes.TEXT):
return _AssociationList(lazy_collection, creator, getter, setter)
elif self.collection_class is dict:
kv_setter = lambda o, k, v: setattr(o, value_attr, v)
- return _AssociationDict(lazy_collection, creator, getter, setter)
+ return _AssociationDict(lazy_collection, creator, getter, kv_setter)
elif self.collection_class is util.Set:
return _AssociationSet(lazy_collection, creator, getter, setter)
else:
from sqlalchemy.orm.session import object_session, attribute_manager
__all__ = ['relation', 'column_property', 'backref', 'eagerload', 'lazyload', 'noload', 'deferred', 'defer', 'undefer', 'undefer_group', 'extension',
- 'mapper', 'clear_mappers', 'compile_mappers', 'clear_mapper', 'class_mapper', 'object_mapper', 'MapperExtension', 'Query',
+ 'mapper', 'clear_mappers', 'compile_mappers', 'class_mapper', 'object_mapper', 'MapperExtension', 'Query',
'polymorphic_union', 'create_session', 'synonym', 'contains_alias', 'contains_eager', 'EXT_PASS', 'object_session'
]
classes as their primary mapper.
"""
- for mapper in mapper_registry.values():
- mapper.dispose()
- mapper_registry.clear()
- sautil.ArgSingleton.instances.clear()
-
-def clear_mapper(m):
- """Remove the given mapper from the storage of mappers.
-
- When a new mapper is created for the previous mapper's class, it
- will be used as that classes' new primary mapper.
- """
-
- del mapper_registry[m.class_key]
- attribute_manager.reset_class_managed(m.class_)
- if hasattr(m.class_, 'c'):
- del m.class_.c
- m.class_key.dispose()
-
+ mapperlib._COMPILE_MUTEX.acquire()
+ try:
+ for mapper in mapper_registry.values():
+ mapper.dispose()
+ mapper_registry.clear()
+ sautil.ArgSingleton.instances.clear()
+ finally:
+ mapperlib._COMPILE_MUTEX.release()
+
def extension(ext):
"""Return a ``MapperOption`` that will insert the given
``MapperExtension`` to the beginning of the list of extensions
childlist = self.get_object_dependencies(obj, uowcommit, passive=self.passive_deletes)
if childlist is not None:
for child in childlist.deleted_items() + childlist.unchanged_items():
- if reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes:
+ if child is None or (reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes):
continue
associationrow = {}
self._synchronize(obj, child, associationrow, False, uowcommit)
childlist = self.get_object_dependencies(obj, uowcommit)
if childlist is None: continue
for child in childlist.added_items():
- if reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes:
+ if child is None or (reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes):
continue
associationrow = {}
self._synchronize(obj, child, associationrow, False, uowcommit)
uowcommit.attributes[(self, "manytomany", obj, child)] = True
secondary_insert.append(associationrow)
for child in childlist.deleted_items():
- if reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes:
+ if child is None or (reverse_dep and (reverse_dep, "manytomany", child, obj) in uowcommit.attributes):
continue
associationrow = {}
self._synchronize(obj, child, associationrow, False, uowcommit)
pass
init._oldinit = oldinit
self.class_.__init__ = init
-
- mapper_registry[self.class_key] = self
+
+ _COMPILE_MUTEX.acquire()
+ try:
+ mapper_registry[self.class_key] = self
+ finally:
+ _COMPILE_MUTEX.release()
+
if self.entity_name is None:
self.class_.c = self.c
self.is_scalar = scalar
if scalar:
# allow corresponding_column to return None
- self.orig_set = []
+ self.orig_set = util.Set()
# indicates if this select statement, as a subquery, should automatically correlate
# its FROM clause to that of an enclosing select, update, or delete statement.
'INT', 'CHAR', 'VARCHAR', 'NCHAR', 'TEXT', 'FLOAT', 'DECIMAL',
'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'SmallInteger','Smallinteger',
'Numeric', 'Float', 'DateTime', 'Date', 'Time', 'Binary', 'Boolean', 'Unicode', 'PickleType', 'NULLTYPE',
- 'SMALLINT', 'DATE', 'TIME'
+ 'SMALLINT', 'DATE', 'TIME','Interval'
]
from sqlalchemy import util, exceptions
import inspect
+import datetime as dt
try:
import cPickle as pickle
except:
return self._impl_dict[dialect]
except AttributeError:
self._impl_dict = {}
- return self._impl_dict.setdefault(dialect, self._create_dialect_impl(dialect))
except KeyError:
- return self._impl_dict.setdefault(dialect, self._create_dialect_impl(dialect))
+ pass
- def _create_dialect_impl(self, dialect):
- typedesc = dialect.type_descriptor(self.impl)
+ typedesc = self.load_dialect_impl(dialect)
tt = self.copy()
if not isinstance(tt, self.__class__):
raise exceptions.AssertionError("Type object %s does not properly implement the copy() method, it must return an object of type %s" % (self, self.__class__))
tt.impl = typedesc
+ self._impl_dict[dialect] = tt
return tt
+ def load_dialect_impl(self, dialect):
+ """loads the dialect-specific implementation of this type.
+
+ by default calls dialect.type_descriptor(self.impl), but
+ can be overridden to provide different behavior.
+ """
+
+ return dialect.type_descriptor(self.impl)
+
def __getattr__(self, key):
"""Proxy all other undefined accessors to the underlying implementation."""
class Boolean(TypeEngine):
pass
+
+class Interval(TypeDecorator):
+ """Type to be used in Column statements to store python timedeltas.
+
+ If it's possible it uses native engine features to store timedeltas
+ (now it's only PostgreSQL Interval type), if there is no such it
+ fallbacks to DateTime storage with converting from/to timedelta on the fly
+
+ Converting is very simple - just use epoch(zero timestamp, 01.01.1970) as
+ base, so if we need to store timedelta = 1 day (24 hours) in database it
+ will be stored as DateTime = '2nd Jan 1970 00:00', see convert_bind_param
+ and convert_result_value to actual conversion code
+ """
+ impl = None
+
+ def __init__(self,*args,**kwargs):
+ #avoid of getting instance of None type in __init__ of TypeDecorator
+ pass
+
+ def load_dialect_impl(self, dialect):
+ import sqlalchemy.databases.postgres as pg
+ """Checks if engine has native implementation of timedelta python type,
+ if so it returns right class to handle it, if there is no native support,
+ it fallback to engine's DateTime implementation class
+ """
+
+ if self.__hasNativeImpl(dialect):
+ #For now, only PostgreSQL has native timedelta types support
+ return pg.PGInterval()
+ else:
+ #All others should fallback to DateTime
+ return dialect.type_descriptor(DateTime)
+
+ def __hasNativeImpl(self,dialect):
+ import sqlalchemy.databases.postgres as pg
+ return dialect.__class__ in [pg.PGDialect]
+
+ def convert_bind_param(self, value, dialect):
+ if not self.__hasNativeImpl(dialect):
+ tmpval = dt.datetime.utcfromtimestamp(0) + value
+ return self.impl.convert_bind_param(tmpval,dialect)
+ else:
+ return self.impl.convert_bind_param(value,dialect)
+
+ def convert_result_value(self, value, dialect):
+ retval = self.impl.convert_result_value(value,dialect)
+ if not self.__hasNativeImpl(dialect):
+ return retval - dt.datetime.utcfromtimestamp(0)
+ else:
+ return retval
+
+ def is_mutable(self):
+ #neither datetime, nor PGInterval are mutable types
+ return False
class FLOAT(Float):pass
class TEXT(String):pass
self.assert_(len(p1._children) == 3)
self.assert_(len(p1.children) == 3)
+ p1.children['d'] = 'new d'
+ assert p1.children['d'] == 'new d'
+
p1._children = {}
self.assert_(len(p1.children) == 0)
def testrefresh2(self):
"""test a hang condition that was occuring on expire/refresh"""
+
s = create_session()
- mapper(Address, addresses)
-
- mapper(User, users, properties = dict(addresses=relation(Address,private=True,lazy=False)) )
+ m1 = mapper(Address, addresses)
+ m2 = mapper(User, users, properties = dict(addresses=relation(Address,private=True,lazy=False)) )
+ assert m1._Mapper__is_compiled is False
+ assert m2._Mapper__is_compiled is False
+
+# compile_mappers()
+ print "NEW USER"
u=User()
+ print "NEW USER DONE"
+ assert m2._Mapper__is_compiled is True
u.user_name='Justin'
a = Address()
a.address_id=17 # to work around the hardcoded IDs in this test suite....
ctx.current.flush()
assert itemkeywords.count().scalar() == 0
+ def testscalar(self):
+ """test that dependency.py doesnt try to delete an m2m relation referencing None."""
+
+ mapper(Keyword, keywords)
+
+ mapper(Item, orderitems, properties = dict(
+ keyword = relation(Keyword, secondary=itemkeywords, uselist=False),
+ ))
+
+ i = Item()
+ ctx.current.flush()
+ ctx.current.delete(i)
+ ctx.current.flush()
+
+
def testmanytomanyupdate(self):
"""tests some history operations on a many to many"""
-# tests the COMPILE_MUTEX in mapper compilation
+"""test that mapper compilation is threadsafe, including
+when additional mappers are created while the existing
+collection is being compiled."""
from sqlalchemy import *
-import thread, time, random
+import thread, time
from sqlalchemy.orm import mapperlib
from testbase import Table, Column
Column('c2', String(30)),
Column('t1c1', None, ForeignKey('t1.c1'))
)
+t3 = Table('t3', meta,
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)),
+)
meta.create_all()
class T1(object):
# should produce thread collisions
#mapperlib._COMPILE_MUTEX = FakeLock()
-existing_compile_all = mapperlib.Mapper._compile_all
-state = [False]
-# decorate mapper's _compile_all() method; the mutex in mapper.compile()
-# should insure that this method is only called once by a single thread only
-def monkeypatch_compile_all(self):
- if state[0]:
- raise "thread collision"
- state[0] = True
- try:
- print "compile", thread.get_ident()
- time.sleep(1 + random.random())
- existing_compile_all(self)
- finally:
- state[0] = False
-mapperlib.Mapper._compile_all = monkeypatch_compile_all
-
def run1():
- print "T1", thread.get_ident()
- class_mapper(T1)
+ for i in range(50):
+ print "T1", thread.get_ident()
+ class_mapper(T1)
+ time.sleep(.05)
def run2():
- print "T2", thread.get_ident()
- class_mapper(T2)
+ for i in range(50):
+ print "T2", thread.get_ident()
+ class_mapper(T2)
+ time.sleep(.057)
-for i in range(0,1):
- clear_mappers()
- mapper(T1, t1, properties={'t2':relation(T2, backref="t1")})
- mapper(T2, t2)
- #compile_mappers()
- print "START"
- for j in range(0, 5):
- thread.start_new_thread(run1, ())
- thread.start_new_thread(run2, ())
- print "WAIT"
- time.sleep(5)
+def run3():
+ for i in range(50):
+ def foo():
+ print "FOO", thread.get_ident()
+ class Foo(object):pass
+ mapper(Foo, t3)
+ class_mapper(Foo).compile()
+ foo()
+ time.sleep(.05)
+
+mapper(T1, t1, properties={'t2':relation(T2, backref="t1")})
+mapper(T2, t2)
+print "START"
+for j in range(0, 5):
+ thread.start_new_thread(run1, ())
+ thread.start_new_thread(run2, ())
+ thread.start_new_thread(run3, ())
+ thread.start_new_thread(run3, ())
+ thread.start_new_thread(run3, ())
+print "WAIT"
+time.sleep(5)
import string,datetime, re, sys, os
import sqlalchemy.engine.url as url
import sqlalchemy.types
-from sqlalchemy.databases import mssql, oracle
+from sqlalchemy.databases import mssql, oracle, mysql
from testbase import Table, Column
dialect_type = col.type.dialect_impl(dialect)
assert isinstance(dialect_type.impl, oracle.OracleText), repr(dialect_type.impl)
+ def testoracletimestamp(self):
+ dialect = oracle.OracleDialect()
+ t1 = oracle.OracleTimestamp
+ t2 = oracle.OracleTimestamp()
+ t3 = types.TIMESTAMP
+ assert isinstance(dialect.type_descriptor(t1), oracle.OracleTimestamp)
+ assert isinstance(dialect.type_descriptor(t2), oracle.OracleTimestamp)
+ assert isinstance(dialect.type_descriptor(t3), oracle.OracleTimestamp)
+
+ def testmysqlbinary(self):
+ dialect = mysql.MySQLDialect()
+ t1 = mysql.MSVarBinary
+ t2 = mysql.MSVarBinary()
+ assert isinstance(dialect.type_descriptor(t1), mysql.MSVarBinary)
+ assert isinstance(dialect.type_descriptor(t2), mysql.MSVarBinary)
+
+
class OverrideTest(PersistTest):
"""tests user-defined types, including a full type as well as a TypeDecorator"""
global users_with_date, insert_data
if db.engine.name == 'oracle':
- # still trying to get oracle sub-second resolution to work
- oracle_subsecond = False
- if oracle_subsecond:
- import sqlalchemy.databases.oracle as oracle
- insert_data = [
- [7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)],
- [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)],
- [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)],
- [10, 'colber', None, None, None]
- ]
-
- fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
-
- collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
- Column('user_date', Date), Column('user_time', oracle.OracleTimestamp)]
- else:
- insert_data = [
- [7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.datetime(2005, 11, 10, 0, 0, 0)],
- [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.datetime(2006, 5, 10, 15, 32, 47)],
- [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.datetime(2004, 9, 18, 4, 0, 52)],
- [10, 'colber', None, None]
- ]
-
- fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
-
- collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
- Column('user_date', DateTime)]
+ import sqlalchemy.databases.oracle as oracle
+ insert_data = [
+ [7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)],
+ [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)],
+ [9, 'foo', datetime.datetime(2006, 11, 10, 11, 52, 35), datetime.date(1970,4,1), datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)],
+ [10, 'colber', None, None, None]
+ ]
+
+ fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
+
+ collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
+ Column('user_date', Date), Column('user_time', TIMESTAMP)]
elif db.engine.name == 'mysql' or db.engine.name == 'mssql':
# these dont really support the TIME type at all
insert_data = [
#x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
#print repr(x)
- @testbase.unsupported('sqlite')
def testdate2(self):
- t = Table('testdate', testbase.metadata, Column('id', Integer, primary_key=True),
+ t = Table('testdate', testbase.metadata, Column('id', Integer, Sequence('datetest_id_seq', optional=True), primary_key=True),
Column('adate', Date), Column('adatetime', DateTime))
t.create()
try:
finally:
t.drop()
+class IntervalTest(AssertMixin):
+ def setUpAll(self):
+ global interval_table, metadata
+ metadata = BoundMetaData(testbase.db)
+ interval_table = Table("intervaltable", metadata,
+ Column("id", Integer, primary_key=True),
+ Column("interval", Interval),
+ )
+ metadata.create_all()
+
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ def test_roundtrip(self):
+ delta = datetime.datetime(2006, 10, 5) - datetime.datetime(2005, 8, 17)
+ interval_table.insert().execute(interval=delta)
+ assert interval_table.select().execute().fetchone()['interval'] == delta
+
+
class TimezoneTest(AssertMixin):
"""test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime