def __init__(self, class_, *columns, **kwargs):
super(CompositeProperty, self).__init__(*columns, **kwargs)
+ self._col_position_map = dict((c, i) for i, c in enumerate(columns))
self.composite_class = class_
self.comparator_factory = kwargs.pop('comparator', CompositeProperty.Comparator)
self.strategy_class = strategies.CompositeColumnLoader
return self.get_col_value(column, obj)
def setattr(self, state, value, column):
- # TODO: test coverage for this method
+
obj = state.get_impl(self.key).get(state)
if obj is None:
obj = self.composite_class(*[None for c in self.columns])
state.get_impl(self.key).set(state, obj, None)
- for a, b in zip(self.columns, value.__composite_values__()):
- if a is column:
- setattr(obj, b, value)
-
+ if hasattr(obj, '__set_composite_values__'):
+ values = list(obj.__composite_values__())
+ values[self._col_position_map[column]] = value
+ obj.__set_composite_values__(*values)
+ else:
+ setattr(obj, column.key, value)
+
def get_col_value(self, column, value):
+ if value is None:
+ return None
for a, b in zip(self.columns, value.__composite_values__()):
if a is column:
return b
def __eq__(self, other):
if other is None:
- return sql.and_(*[a==None for a in self.prop.columns])
+ values = [None] * len(self.prop.columns)
else:
- return sql.and_(*[a==b for a, b in
- zip(self.prop.columns,
- other.__composite_values__())])
-
+ values = other.__composite_values__()
+ return sql.and_(*[a==b for a, b in zip(self.prop.columns, values)])
def __ne__(self, other):
- return sql.or_(*[a!=b for a, b in
- zip(self.prop.columns,
- other.__composite_values__())])
+ return sql.not_(self.__eq__(other))
def __str__(self):
return str(self.parent.class_.__name__) + "." + self.key
def define_tables(self, metadata):
Table('graphs', metadata,
Column('id', Integer, primary_key=True),
- Column('version_id', Integer, primary_key=True),
+ Column('version_id', Integer, primary_key=True, nullable=True),
Column('name', String(30)))
Table('edges', metadata,
['graph_id', 'graph_version_id'],
['graphs.id', 'graphs.version_id']))
+ Table('foobars', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x1', Integer, default=2),
+ Column('x2', Integer),
+ Column('x3', Integer, default=15),
+ Column('x4', Integer)
+ )
+
@testing.resolve_artifact_names
def test_basic(self):
class Point(object):
self.id = id
self.version = version
def __composite_values__(self):
- # a tuple this time
return (self.id, self.version)
def __eq__(self, other):
return other.id == self.id and other.version == self.version
def __init__(self, version):
self.version = version
- mapper(Graph, graphs, properties={
+ mapper(Graph, graphs, allow_null_pks=True, properties={
'version':sa.orm.composite(Version, graphs.c.id,
graphs.c.version_id)})
g2 = sess.query(Graph).get(Version(1, 1))
eq_(g.version, g2.version)
+
+ # TODO: can't seem to get NULL in for a PK value
+ # in either mysql or postgres, autoincrement=False etc.
+ # notwithstanding
+ @testing.fails_on_everything_except("sqlite")
+ def go():
+ g = Graph(Version(2, None))
+ sess.save(g)
+ sess.flush()
+ sess.clear()
+ g2 = sess.query(Graph).filter_by(version=Version(2, None)).one()
+ eq_(g.version, g2.version)
+ go()
+
+ @testing.resolve_artifact_names
+ def test_attributes_with_defaults(self):
+ class Foobar(object):
+ pass
+
+ class FBComposite(object):
+ def __init__(self, x1, x2, x3, x4):
+ self.x1 = x1
+ self.x2 = x2
+ self.x3 = x3
+ self.x4 = x4
+ def __composite_values__(self):
+ return self.x1, self.x2, self.x3, self.x4
+ def __eq__(self, other):
+ return other.x1 == self.x1 and other.x2 == self.x2 and other.x3 == self.x3 and other.x4 == self.x4
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ mapper(Foobar, foobars, properties=dict(
+ foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
+ ))
+
+ sess = create_session()
+ f1 = Foobar()
+ f1.foob = FBComposite(None, 5, None, None)
+ sess.save(f1)
+ sess.flush()
+
+ assert f1.foob == FBComposite(2, 5, 15, None)
+
+ @testing.resolve_artifact_names
+ def test_set_composite_values(self):
+ class Foobar(object):
+ pass
+
+ class FBComposite(object):
+ def __init__(self, x1, x2, x3, x4):
+ self.x1val = x1
+ self.x2val = x2
+ self.x3 = x3
+ self.x4 = x4
+ def __composite_values__(self):
+ return self.x1val, self.x2val, self.x3, self.x4
+ def __set_composite_values__(self, x1, x2, x3, x4):
+ self.x1val = x1
+ self.x2val = x2
+ self.x3 = x3
+ self.x4 = x4
+ def __eq__(self, other):
+ return other.x1val == self.x1val and other.x2val == self.x2val and other.x3 == self.x3 and other.x4 == self.x4
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ mapper(Foobar, foobars, properties=dict(
+ foob=sa.orm.composite(FBComposite, foobars.c.x1, foobars.c.x2, foobars.c.x3, foobars.c.x4)
+ ))
+
+ sess = create_session()
+ f1 = Foobar()
+ f1.foob = FBComposite(None, 5, None, None)
+ sess.save(f1)
+ sess.flush()
+
+ assert f1.foob == FBComposite(2, 5, 15, None)
+
+ @testing.resolve_artifact_names
+ def test_save_null(self):
+ """test saving a null composite value
+
+ See google groups thread for more context:
+ http://groups.google.com/group/sqlalchemy/browse_thread/thread/0c6580a1761b2c29
+
+ """
+ class Point(object):
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+ def __composite_values__(self):
+ return [self.x, self.y]
+ def __eq__(self, other):
+ return other.x == self.x and other.y == self.y
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ class Graph(object):
+ pass
+ class Edge(object):
+ def __init__(self, start, end):
+ self.start = start
+ self.end = end
+
+ mapper(Graph, graphs, properties={
+ 'edges':relation(Edge)
+ })
+ mapper(Edge, edges, properties={
+ 'start':sa.orm.composite(Point, edges.c.x1, edges.c.y1),
+ 'end':sa.orm.composite(Point, edges.c.x2, edges.c.y2)
+ })
+
+ sess = create_session()
+ g = Graph()
+ g.id = 1
+ g.version_id=1
+ e = Edge(None, None)
+ g.edges.append(e)
+
+ sess.save(g)
+ sess.flush()
+
+ sess.clear()
+
+ g2 = sess.query(Graph).get([1, 1])
+ assert g2.edges[-1].start.x is None
+ assert g2.edges[-1].start.y is None
class NoLoadTest(_fixtures.FixtureTest):