got sqlite, postgres, mysql all working again (one unresolved failure for mysql)
else:
primary_key = self.primary_key_columns
s = sql.select([sql.func.count(list(primary_key)[0])], whereclause, from_obj=from_obj, **context.select_args())
- return self.session.scalar(self.mapper, s, params=self._params)
+ return self.session.scalar(s, params=self._params, mapper=self.mapper)
def compile(self):
"""compiles and returns a SQL statement based on the criterion and conditions within this Query."""
for c in param_names:
params[c.name] = mapper.get_attr_by_column(instance, c)
- result = session.execute(mapper, statement, params)
+ result = session.execute(statement, params, mapper=mapper)
try:
row = result.fetchone()
for prop in group:
statement = sql.select([self.columns[0]], clause, from_obj=[localparent.mapped_table], use_labels=True)
if group is not None:
- result = session.execute(localparent, statement, params)
+ result = session.execute(statement, params, mapper=localparent)
try:
row = result.fetchone()
for prop in group:
finally:
result.close()
else:
- return session.scalar(localparent, sql.select([self.columns[0]], clause, from_obj=[localparent.mapped_table], use_labels=True),params)
+ return session.scalar(sql.select([self.columns[0]], clause, from_obj=[localparent.mapped_table], use_labels=True),params, mapper=localparent)
return lazyload
"""
if len(clauses) == 1:
return clauses[0]
- return ClauseList(operator='AND', negate='OR', *clauses)
+ return ClauseList(operator='AND', *clauses)
def or_(*clauses):
"""Join a list of clauses together using the ``OR`` operator.
if len(clauses) == 1:
return clauses[0]
- return ClauseList(operator='OR', negate='AND', *clauses)
+ return ClauseList(operator='OR', *clauses)
def not_(clause):
"""Return a negation of the given clause, i.e. ``NOT(clause)``.
if hasattr(self, 'negation_clause'):
return self.negation_clause
elif self.negate_operator is None:
- return super(ClauseList, self).negate()
+ return super(ClauseList, self)._negate()
else:
return ClauseList(operator=self.negate_operator, negate=self.operator, *(not_(c) for c in self.clauses))
assert str(e) == "This Compiled object is not bound to any Engine or Connection."
finally:
+ bind.close()
metadata.drop_all(bind=testbase.db)
def test_session(self):
assert str(e).startswith("Could not locate any Engine or Connection bound to mapper")
finally:
+ bind.close()
metadata.drop_all(bind=testbase.db)
def suite():
unittest_modules = ['ext.activemapper',
- 'ext.selectresults',
'ext.assignmapper',
'ext.orderinglist',
'ext.associationproxy']
+++ /dev/null
-from testbase import PersistTest, AssertMixin
-import testbase
-import tables
-
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from testbase import Table, Column
-
-from sqlalchemy.ext.selectresults import SelectResultsExt, SelectResults
-
-class Foo(object):
- pass
-
-class SelectResultsTest(PersistTest):
- def setUpAll(self):
- self.install_threadlocal()
- global foo, metadata
- metadata = MetaData(testbase.db)
- foo = Table('foo', metadata,
- Column('id', Integer, Sequence('foo_id_seq'), primary_key=True),
- Column('bar', Integer),
- Column('range', Integer))
-
- assign_mapper(Foo, foo, extension=SelectResultsExt())
- metadata.create_all()
- for i in range(100):
- Foo(bar=i, range=i%10)
- objectstore.flush()
-
- def setUp(self):
- self.query = Query(Foo)
- self.orig = self.query.select_whereclause()
- self.res = self.query.select()
-
- def tearDownAll(self):
- metadata.drop_all()
- self.uninstall_threadlocal()
- clear_mappers()
-
- def test_selectby(self):
- res = self.query.select_by(range=5)
- assert res.order_by([Foo.c.bar])[0].bar == 5
- assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
-
- @testbase.unsupported('mssql')
- def test_slice(self):
- assert self.res[1] == self.orig[1]
- assert list(self.res[10:20]) == self.orig[10:20]
- assert list(self.res[10:]) == self.orig[10:]
- assert list(self.res[:10]) == self.orig[:10]
- assert list(self.res[:10]) == self.orig[:10]
- assert list(self.res[10:40:3]) == self.orig[10:40:3]
- assert list(self.res[-5:]) == self.orig[-5:]
- assert self.res[10:20][5] == self.orig[10:20][5]
-
- @testbase.supported('mssql')
- def test_slice_mssql(self):
- assert list(self.res[:10]) == self.orig[:10]
- assert list(self.res[:10]) == self.orig[:10]
-
- def test_aggregate(self):
- assert self.res.count() == 100
- assert self.res.filter(foo.c.bar<30).min(foo.c.bar) == 0
- assert self.res.filter(foo.c.bar<30).max(foo.c.bar) == 29
-
- @testbase.unsupported('mysql')
- def test_aggregate_1(self):
- # this one fails in mysql as the result comes back as a string
- assert self.res.filter(foo.c.bar<30).sum(foo.c.bar) == 435
-
- @testbase.unsupported('postgres', 'mysql', 'firebird', 'mssql')
- def test_aggregate_2(self):
- assert self.res.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
-
- @testbase.supported('postgres', 'mysql', 'firebird', 'mssql')
- def test_aggregate_2_int(self):
- assert int(self.res.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
-
- def test_filter(self):
- assert self.res.count() == 100
- assert self.res.filter(Foo.c.bar < 30).count() == 30
- res2 = self.res.filter(Foo.c.bar < 30).filter(Foo.c.bar > 10)
- assert res2.count() == 19
-
- def test_options(self):
- class ext1(MapperExtension):
- def populate_instance(self, mapper, selectcontext, row, instance, **flags):
- instance.TEST = "hello world"
- return EXT_PASS
- objectstore.clear()
- assert self.res.options(extension(ext1()))[0].TEST == "hello world"
-
- def test_order_by(self):
- assert self.res.order_by([Foo.c.bar])[0].bar == 0
- assert self.res.order_by([desc(Foo.c.bar)])[0].bar == 99
-
- def test_offset(self):
- assert list(self.res.order_by([Foo.c.bar]).offset(10))[0].bar == 10
-
- def test_offset(self):
- assert len(list(self.res.limit(10))) == 10
-
-class Obj1(object):
- pass
-class Obj2(object):
- pass
-
-class SelectResultsTest2(PersistTest):
- def setUpAll(self):
- self.install_threadlocal()
- global metadata, table1, table2
- metadata = MetaData(testbase.db)
- table1 = Table('Table1', metadata,
- Column('id', Integer, primary_key=True),
- )
- table2 = Table('Table2', metadata,
- Column('t1id', Integer, ForeignKey("Table1.id"), primary_key=True),
- Column('num', Integer, primary_key=True),
- )
- assign_mapper(Obj1, table1, extension=SelectResultsExt())
- assign_mapper(Obj2, table2, extension=SelectResultsExt())
- metadata.create_all()
- table1.insert().execute({'id':1},{'id':2},{'id':3},{'id':4})
- table2.insert().execute({'num':1,'t1id':1},{'num':2,'t1id':1},{'num':3,'t1id':1},\
-{'num':4,'t1id':2},{'num':5,'t1id':2},{'num':6,'t1id':3})
-
- def setUp(self):
- self.query = Query(Obj1)
- #self.orig = self.query.select_whereclause()
- #self.res = self.query.select()
-
- def tearDownAll(self):
- metadata.drop_all()
- self.uninstall_threadlocal()
- clear_mappers()
-
- def test_distinctcount(self):
- res = self.query.select()
- assert res.count() == 4
- res = self.query.select(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1))
- assert res.count() == 3
- res = self.query.select(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1), distinct=True)
- self.assertEqual(res.count(), 1)
-
-class RelationsTest(AssertMixin):
- def setUpAll(self):
- tables.create()
- tables.data()
- def tearDownAll(self):
- tables.drop()
- def tearDown(self):
- clear_mappers()
- def test_jointo(self):
- """test the join_to and outerjoin_to functions on SelectResults"""
- mapper(tables.User, tables.users, properties={
- 'orders':relation(mapper(tables.Order, tables.orders, properties={
- 'items':relation(mapper(tables.Item, tables.orderitems))
- }))
- })
- session = create_session()
- query = SelectResults(session.query(tables.User))
- x = query.join(['orders','items']).select(tables.Item.c.item_id==2)
- print x.compile()
- self.assert_result(list(x), tables.User, tables.user_result[2])
- def test_outerjointo(self):
- """test the join_to and outerjoin_to functions on SelectResults"""
- mapper(tables.User, tables.users, properties={
- 'orders':relation(mapper(tables.Order, tables.orders, properties={
- 'items':relation(mapper(tables.Item, tables.orderitems))
- }))
- })
- session = create_session()
- query = SelectResults(session.query(tables.User))
- x = query.outerjoin(['orders', 'items']).select(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2))
- print x.compile()
- self.assert_result(list(x), tables.User, *tables.user_result[1:3])
- def test_outerjointo_count(self):
- """test the join_to and outerjoin_to functions on SelectResults"""
- mapper(tables.User, tables.users, properties={
- 'orders':relation(mapper(tables.Order, tables.orders, properties={
- 'items':relation(mapper(tables.Item, tables.orderitems))
- }))
- })
- session = create_session()
- query = SelectResults(session.query(tables.User))
- x = query.outerjoin(['orders', 'items']).select(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2)).count()
- assert x==2
- def test_from(self):
- mapper(tables.User, tables.users, properties={
- 'orders':relation(mapper(tables.Order, tables.orders, properties={
- 'items':relation(mapper(tables.Item, tables.orderitems))
- }))
- })
- session = create_session()
- query = SelectResults(session.query(tables.User))
- x = query.select_from([tables.users.outerjoin(tables.orders).outerjoin(tables.orderitems)]).\
- filter(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2))
- print x.compile()
- self.assert_result(list(x), tables.User, *tables.user_result[1:3])
-
-
-class CaseSensitiveTest(PersistTest):
- def setUpAll(self):
- self.install_threadlocal()
- global metadata, table1, table2
- metadata = MetaData(testbase.db)
- table1 = Table('Table1', metadata,
- Column('ID', Integer, primary_key=True),
- )
- table2 = Table('Table2', metadata,
- Column('T1ID', Integer, ForeignKey("Table1.ID"), primary_key=True),
- Column('NUM', Integer, primary_key=True),
- )
- assign_mapper(Obj1, table1, extension=SelectResultsExt())
- assign_mapper(Obj2, table2, extension=SelectResultsExt())
- metadata.create_all()
- table1.insert().execute({'ID':1},{'ID':2},{'ID':3},{'ID':4})
- table2.insert().execute({'NUM':1,'T1ID':1},{'NUM':2,'T1ID':1},{'NUM':3,'T1ID':1},\
-{'NUM':4,'T1ID':2},{'NUM':5,'T1ID':2},{'NUM':6,'T1ID':3})
-
- def setUp(self):
- self.query = Query(Obj1)
- #self.orig = self.query.select_whereclause()
- #self.res = self.query.select()
-
- def tearDownAll(self):
- metadata.drop_all()
- self.uninstall_threadlocal()
- clear_mappers()
-
- def test_distinctcount(self):
- res = self.query.select()
- assert res.count() == 4
- res = self.query.select(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1))
- assert res.count() == 3
- res = self.query.select(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1), distinct=True)
- self.assertEqual(res.count(), 1)
-
-
-if __name__ == "__main__":
- testbase.main()
from testbase import Table, Column
class Foo(object):
- pass
+ def __init__(self, **kwargs):
+ for k in kwargs:
+ setattr(self, k, kwargs[k])
class GenerativeQueryTest(PersistTest):
def setUpAll(self):
- self.install_threadlocal()
global foo, metadata
metadata = MetaData(testbase.db)
foo = Table('foo', metadata,
Column('bar', Integer),
Column('range', Integer))
- assign_mapper(Foo, foo)
+ mapper(Foo, foo)
metadata.create_all()
+
+ sess = create_session()
for i in range(100):
- Foo(bar=i, range=i%10)
- objectstore.flush()
+ sess.save(Foo(bar=i, range=i%10))
+ sess.flush()
- def setUp(self):
- self.query = Foo.query()
- self.orig = self.query.select_whereclause()
- self.res = self.query
-
def tearDownAll(self):
metadata.drop_all()
- self.uninstall_threadlocal()
clear_mappers()
def test_selectby(self):
- res = self.query.filter_by(range=5)
+ res = create_session().query(Foo).filter_by(range=5)
assert res.order_by([Foo.c.bar])[0].bar == 5
assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
@testbase.unsupported('mssql')
def test_slice(self):
- assert self.query[1] == self.orig[1]
- assert list(self.query[10:20]) == self.orig[10:20]
- assert list(self.query[10:]) == self.orig[10:]
- assert list(self.query[:10]) == self.orig[:10]
- assert list(self.query[:10]) == self.orig[:10]
- assert list(self.query[10:40:3]) == self.orig[10:40:3]
- assert list(self.query[-5:]) == self.orig[-5:]
- assert self.query[10:20][5] == self.orig[10:20][5]
+ sess = create_session()
+ query = sess.query(Foo)
+ orig = query.all()
+ assert query[1] == orig[1]
+ assert list(query[10:20]) == orig[10:20]
+ assert list(query[10:]) == orig[10:]
+ assert list(query[:10]) == orig[:10]
+ assert list(query[:10]) == orig[:10]
+ assert list(query[10:40:3]) == orig[10:40:3]
+ assert list(query[-5:]) == orig[-5:]
+ assert query[10:20][5] == orig[10:20][5]
@testbase.supported('mssql')
def test_slice_mssql(self):
- assert list(self.query[:10]) == self.orig[:10]
- assert list(self.query[:10]) == self.orig[:10]
+ sess = create_session()
+ query = sess.query(Foo)
+ orig = query.all()
+ assert list(query[:10]) == orig[:10]
+ assert list(query[:10]) == orig[:10]
def test_aggregate(self):
- assert self.query.count() == 100
- assert self.query.filter(foo.c.bar<30).min(foo.c.bar) == 0
- assert self.query.filter(foo.c.bar<30).max(foo.c.bar) == 29
- assert self.query.filter(foo.c.bar<30).apply_max(foo.c.bar).first() == 29
- assert self.query.filter(foo.c.bar<30).apply_max(foo.c.bar).one() == 29
+ sess = create_session()
+ query = sess.query(Foo)
+ assert query.count() == 100
+ assert query.filter(foo.c.bar<30).min(foo.c.bar) == 0
+ assert query.filter(foo.c.bar<30).max(foo.c.bar) == 29
+ assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).first() == 29
+ assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).one() == 29
@testbase.unsupported('mysql')
def test_aggregate_1(self):
# this one fails in mysql as the result comes back as a string
- assert self.query.filter(foo.c.bar<30).sum(foo.c.bar) == 435
+ query = create_session().query(Foo)
+ assert query.filter(foo.c.bar<30).sum(foo.c.bar) == 435
@testbase.unsupported('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_2(self):
- assert self.res.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
+ query = create_session().query(Foo)
+ assert query.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
@testbase.supported('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_2_int(self):
- assert int(self.res.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
+ query = create_session().query(Foo)
+ assert int(query.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
@testbase.unsupported('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_3(self):
- assert self.res.filter(foo.c.bar<30).apply_avg(foo.c.bar).first() == 14.5
- assert self.res.filter(foo.c.bar<30).apply_avg(foo.c.bar).one() == 14.5
+ query = create_session().query(Foo)
+ assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).first() == 14.5
+ assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).one() == 14.5
def test_filter(self):
- assert self.query.count() == 100
- assert self.query.filter(Foo.c.bar < 30).count() == 30
- res2 = self.query.filter(Foo.c.bar < 30).filter(Foo.c.bar > 10)
+ query = create_session().query(Foo)
+ assert query.count() == 100
+ assert query.filter(Foo.c.bar < 30).count() == 30
+ res2 = query.filter(Foo.c.bar < 30).filter(Foo.c.bar > 10)
assert res2.count() == 19
def test_options(self):
+ query = create_session().query(Foo)
class ext1(MapperExtension):
def populate_instance(self, mapper, selectcontext, row, instance, **flags):
instance.TEST = "hello world"
return EXT_PASS
- objectstore.clear()
- assert self.res.options(extension(ext1()))[0].TEST == "hello world"
+ assert query.options(extension(ext1()))[0].TEST == "hello world"
def test_order_by(self):
- assert self.res.order_by([Foo.c.bar])[0].bar == 0
- assert self.res.order_by([desc(Foo.c.bar)])[0].bar == 99
+ query = create_session().query(Foo)
+ assert query.order_by([Foo.c.bar])[0].bar == 0
+ assert query.order_by([desc(Foo.c.bar)])[0].bar == 99
def test_offset(self):
- assert list(self.res.order_by([Foo.c.bar]).offset(10))[0].bar == 10
+ query = create_session().query(Foo)
+ assert list(query.order_by([Foo.c.bar]).offset(10))[0].bar == 10
def test_offset(self):
- assert len(list(self.res.limit(10))) == 10
+ query = create_session().query(Foo)
+ assert len(list(query.limit(10))) == 10
class Obj1(object):
pass
class GenerativeTest2(PersistTest):
def setUpAll(self):
- self.install_threadlocal()
global metadata, table1, table2
metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('t1id', Integer, ForeignKey("Table1.id"), primary_key=True),
Column('num', Integer, primary_key=True),
)
- assign_mapper(Obj1, table1)
- assign_mapper(Obj2, table2)
+ mapper(Obj1, table1)
+ mapper(Obj2, table2)
metadata.create_all()
table1.insert().execute({'id':1},{'id':2},{'id':3},{'id':4})
table2.insert().execute({'num':1,'t1id':1},{'num':2,'t1id':1},{'num':3,'t1id':1},\
{'num':4,'t1id':2},{'num':5,'t1id':2},{'num':6,'t1id':3})
- def setUp(self):
- self.query = Query(Obj1)
- #self.orig = self.query.select_whereclause()
- #self.res = self.query.select()
-
def tearDownAll(self):
metadata.drop_all()
- self.uninstall_threadlocal()
clear_mappers()
def test_distinctcount(self):
- res = self.query
- assert res.count() == 4
- res = self.query.filter(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1))
+ query = create_session().query(Obj1)
+ assert query.count() == 4
+ res = query.filter(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1))
assert res.count() == 3
- res = self.query.filter(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1)).distinct()
+ res = query.filter(and_(table1.c.id==table2.c.t1id,table2.c.t1id==1)).distinct()
self.assertEqual(res.count(), 1)
class RelationsTest(AssertMixin):
class CaseSensitiveTest(PersistTest):
def setUpAll(self):
- self.install_threadlocal()
global metadata, table1, table2
metadata = MetaData(testbase.db)
table1 = Table('Table1', metadata,
Column('T1ID', Integer, ForeignKey("Table1.ID"), primary_key=True),
Column('NUM', Integer, primary_key=True),
)
- assign_mapper(Obj1, table1)
- assign_mapper(Obj2, table2)
+ mapper(Obj1, table1)
+ mapper(Obj2, table2)
metadata.create_all()
table1.insert().execute({'ID':1},{'ID':2},{'ID':3},{'ID':4})
table2.insert().execute({'NUM':1,'T1ID':1},{'NUM':2,'T1ID':1},{'NUM':3,'T1ID':1},\
{'NUM':4,'T1ID':2},{'NUM':5,'T1ID':2},{'NUM':6,'T1ID':3})
- def setUp(self):
- self.query = Query(Obj1)
- #self.orig = self.query.select_whereclause()
- #self.res = self.query.select()
-
def tearDownAll(self):
metadata.drop_all()
- self.uninstall_threadlocal()
clear_mappers()
def test_distinctcount(self):
- res = self.query
- assert res.count() == 4
- res = self.query.filter(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1))
+ q = create_session().query(Obj1)
+ assert q.count() == 4
+ res = q.filter(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1))
assert res.count() == 3
- res = self.query.filter(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1)).distinct()
+ res = q.filter(and_(table1.c.ID==table2.c.T1ID,table2.c.T1ID==1)).distinct()
self.assertEqual(res.count(), 1)
class SelfRefTest(ORMTest):
class A(object):pass
class B(object):pass
- assign_mapper(B, b)
+ mapper(B, b)
- assign_mapper(A, a,
+ mapper(A, a,
properties = {
'tbs' : relation(B, primaryjoin=and_(b.c.a1==a.c.a1, b.c.b2 == True), lazy=False),
}
)
- assign_mapper(C, c,
+ mapper(C, c,
properties = {
'a1s' : relation(A, secondary=c2a1, lazy=False),
'a2s' : relation(A, secondary=c2a2, lazy=False)
l = q.add_column("count").add_column("concat").from_statement(s).all()
assert l == expected
- assert [user8] == create_session().query(User).select_from(s).filter(s.c.count==3).all()
-
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\
.group_by([c for c in users.c]).order_by(users.c.id)
def setUp(self):
global session
session = create_session(bind=testbase.db)
- conn = session.connect()
+ conn = testbase.db.connect()
conn.create(tbl_a)
conn.create(tbl_b)
conn.create(tbl_c)
session.save_or_update(b)
def tearDown(self):
- conn = session.connect()
+ conn = testbase.db.connect()
conn.drop(tbl_d)
conn.drop(tbl_c)
conn.drop(tbl_b)
parser.add_option("--quiet", action="store_true", dest="quiet", help="suppress unittest output")
parser.add_option("--log-info", action="append", dest="log_info", help="turn on info logging for <LOG> (multiple OK)")
parser.add_option("--log-debug", action="append", dest="log_debug", help="turn on debug logging for <LOG> (multiple OK)")
- parser.add_option("--nothreadlocal", action="store_true", dest="nothreadlocal", help="dont use thread-local mod")
parser.add_option("--enginestrategy", action="store", default=None, dest="enginestrategy", help="engine strategy (plain or threadlocal, defaults to plain)")
parser.add_option("--coverage", action="store_true", dest="coverage", help="Dump a full coverage report after running")
parser.add_option("--reversetop", action="store_true", dest="topological", help="Reverse the collection ordering for topological sorts (helps reveal dependency issues)")
if options.mysql_engine:
table_options['mysql_engine'] = options.mysql_engine
- if not options.nothreadlocal:
- __import__('sqlalchemy.mods.threadlocal')
- sqlalchemy.mods.threadlocal.uninstall_plugin()
global echo
echo = options.verbose and not options.quiet
"""DEPRECATED. use print <statement>"""
echo_text(text)
- def install_threadlocal(self):
- """DEPRECATED."""
- sqlalchemy.mods.threadlocal.install_plugin()
-
- def uninstall_threadlocal(self):
- """DEPRECATED."""
- sqlalchemy.mods.threadlocal.uninstall_plugin()
def setUpAll(self):
pass