from sqlalchemy import (create_engine, Column, Integer, String, select, case,
func)
-from sqlalchemy.orm import sessionmaker, MapperExtension, aliased
+from sqlalchemy.orm import Session, aliased
from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import event
-engine = create_engine('sqlite://', echo=True)
Base = declarative_base()
-class NestedSetExtension(MapperExtension):
- def before_insert(self, mapper, connection, instance):
- if not instance.parent:
- instance.left = 1
- instance.right = 2
- else:
- personnel = mapper.mapped_table
- right_most_sibling = connection.scalar(
- select([personnel.c.rgt]).where(personnel.c.emp==instance.parent.emp)
- )
-
- connection.execute(
- personnel.update(personnel.c.rgt>=right_most_sibling).values(
- lft = case(
- [(personnel.c.lft>right_most_sibling, personnel.c.lft + 2)],
- else_ = personnel.c.lft
- ),
- rgt = case(
- [(personnel.c.rgt>=right_most_sibling, personnel.c.rgt + 2)],
- else_ = personnel.c.rgt
- )
- )
- )
- instance.left = right_most_sibling
- instance.right = right_most_sibling + 1
-
- # before_update() would be needed to support moving of nodes
- # after_delete() would be needed to support removal of nodes.
- # [ticket:1172] needs to be implemented for deletion to work as well.
-
class Employee(Base):
__tablename__ = 'personnel'
__mapper_args__ = {
- 'extension':NestedSetExtension(),
- 'batch':False # allows extension to fire for each instance before going to the next.
+ 'batch': False # allows extension to fire for each
+ # instance before going to the next.
}
parent = None
def __repr__(self):
return "Employee(%s, %d, %d)" % (self.emp, self.left, self.right)
+@event.listens_for(Employee, "before_insert")
+def before_insert(mapper, connection, instance):
+ if not instance.parent:
+ instance.left = 1
+ instance.right = 2
+ else:
+ personnel = mapper.mapped_table
+ right_most_sibling = connection.scalar(
+ select([personnel.c.rgt]).
+ where(personnel.c.emp == instance.parent.emp)
+ )
+
+ connection.execute(
+ personnel.update(
+ personnel.c.rgt >= right_most_sibling).values(
+ lft=case(
+ [(personnel.c.lft > right_most_sibling,
+ personnel.c.lft + 2)],
+ else_=personnel.c.lft
+ ),
+ rgt=case(
+ [(personnel.c.rgt >= right_most_sibling,
+ personnel.c.rgt + 2)],
+ else_=personnel.c.rgt
+ )
+ )
+ )
+ instance.left = right_most_sibling
+ instance.right = right_most_sibling + 1
+
+ # before_update() would be needed to support moving of nodes
+ # after_delete() would be needed to support removal of nodes.
+
+engine = create_engine('sqlite://', echo=True)
+
Base.metadata.create_all(engine)
-session = sessionmaker(bind=engine)()
+session = Session(bind=engine)
albert = Employee(emp='Albert')
bert = Employee(emp='Bert')
session.add_all([albert, bert, chuck, donna, eddie, fred])
session.commit()
-print session.query(Employee).all()
+print(session.query(Employee).all())
# 1. Find an employee and all his/her supervisors, no matter how deep the tree.
ealias = aliased(Employee)
-print session.query(Employee).\
+print(session.query(Employee).\
filter(ealias.left.between(Employee.left, Employee.right)).\
- filter(ealias.emp=='Eddie').all()
+ filter(ealias.emp == 'Eddie').all())
-#2. Find the employee and all his/her subordinates. (This query has a nice symmetry with the first query.)
-print session.query(Employee).\
+#2. Find the employee and all his/her subordinates.
+# (This query has a nice symmetry with the first query.)
+print(session.query(Employee).\
filter(Employee.left.between(ealias.left, ealias.right)).\
- filter(ealias.emp=='Chuck').all()
+ filter(ealias.emp == 'Chuck').all())
-#3. Find the level of each node, so you can print the tree as an indented listing.
-for indentation, employee in session.query(func.count(Employee.emp).label('indentation') - 1, ealias).\
+#3. Find the level of each node, so you can print the tree
+# as an indented listing.
+for indentation, employee in session.query(
+ func.count(Employee.emp).label('indentation') - 1, ealias).\
filter(ealias.left.between(Employee.left, Employee.right)).\
group_by(ealias.emp).\
- order_by(ealias.left):
- print " " * indentation + str(employee)
+ order_by(ealias.left):
+ print(" " * indentation + str(employee))
if __name__ == '__main__':
from sqlalchemy import (MetaData, Table, Column, Integer, Unicode,
- ForeignKey, UnicodeText, and_, not_)
+ ForeignKey, UnicodeText, and_, not_, create_engine)
from sqlalchemy.orm import mapper, relationship, Session
from sqlalchemy.orm.collections import attribute_mapped_collection
})
mapper(AnimalFact, facts)
-
- metadata.bind = 'sqlite:///'
- metadata.create_all()
- session = Session()
+ engine = create_engine("sqlite://")
+ metadata.create_all(engine)
+ session = Session(bind=engine)
stoat = Animal(u'stoat')
stoat[u'color'] = u'reddish'
critter[u'cuteness'] = u'very'
print 'changing cuteness:'
- metadata.bind.echo = True
+ engine.echo = True
session.commit()
- metadata.bind.echo = False
+ engine.echo = False
marten = Animal(u'marten')
marten[u'color'] = u'brown'
print 'just the facts', q.all()
- metadata.drop_all()