#!/usr/bin/python -u
-import sys,getopt,os
+#
+#
+#
+#################################################################################
+# Start off by implementing a general purpose event loop for anyones use
+#################################################################################
+
+import sys
+import getopt
+import os
import libvirt
import select
+import errno
+import time
+import threading
+
+#
+# This general purpose event loop will support waiting for file handle
+# I/O and errors events, as well as scheduling repeatable timers with
+# a fixed interval.
+#
+# It is a pure python implementation based around the poll() API
+#
+class virEventLoopPure:
+ # This class contains the data we need to track for a
+ # single file handle
+ class virEventLoopPureHandle:
+ def __init__(self, handle, fd, events, cb, opaque):
+ self.handle = handle
+ self.fd = fd
+ self.events = events
+ self.cb = cb
+ self.opaque = opaque
+
+ def get_id(self):
+ return self.handle
+
+ def get_fd(self):
+ return self.fd
+
+ def get_events(self):
+ return self.events
+
+ def set_events(self, events):
+ self.events = events
+
+ def dispatch(self, events):
+ self.cb(self.handle,
+ self.fd,
+ events,
+ self.opaque[0],
+ self.opaque[1])
+
+ # This class contains the data we need to track for a
+ # single periodic timer
+ class virEventLoopPureTimer:
+ def __init__(self, timer, interval, cb, opaque):
+ self.timer = timer
+ self.interval = interval
+ self.cb = cb
+ self.opaque = opaque
+ self.lastfired = 0
+
+ def get_id(self):
+ return self.timer
+
+ def get_interval(self):
+ return self.interval
+
+ def set_interval(self, interval):
+ self.interval = interval
+
+ def get_last_fired(self):
+ return self.lastfired
+
+ def set_last_fired(self, now):
+ self.lastfired = now
+
+ def dispatch(self):
+ self.cb(self.timer,
+ self.opaque[0],
+ self.opaque[1])
+
+
+ def __init__(self, debug=False):
+ self.debugOn = debug
+ self.poll = select.poll()
+ self.pipetrick = os.pipe()
+ self.nextHandleID = 1
+ self.nextTimerID = 1
+ self.handles = []
+ self.timers = []
+ self.quit = False
+
+ # The event loop can be used from multiple threads at once.
+ # Specifically while the main thread is sleeping in poll()
+ # waiting for events to occur, another thread may come along
+ # and add/update/remove a file handle, or timer. When this
+ # happens we need to interrupt the poll() sleep in the other
+ # thread, so that it'll see the file handle / timer changes.
+ #
+ # Using OS level signals for this is very unreliable and
+ # hard to implement correctly. Thus we use the real classic
+ # "self pipe" trick. A anonymous pipe, with one end registered
+ # with the event loop for input events. When we need to force
+ # the main thread out of a poll() sleep, we simple write a
+ # single byte of data to the other end of the pipe.
+ self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1]))
+ self.poll.register(self.pipetrick[0], select.POLLIN)
+
+ def debug(self, msg):
+ if self.debugOn:
+ print msg
+
+
+ # Calculate when the next timeout is due to occurr, returning
+ # the absolute timestamp for the next timeout, or 0 if there is
+ # no timeout due
+ def next_timeout(self):
+ next = 0
+ for t in self.timers:
+ last = t.get_last_fired()
+ interval = t.get_interval()
+ if interval < 0:
+ continue
+ if next == 0 or (last + interval) < next:
+ next = last + interval
+
+ return next
+
+ # Lookup a virEventLoopPureHandle object based on file descriptor
+ def get_handle_by_fd(self, fd):
+ for h in self.handles:
+ if h.get_fd() == fd:
+ return h
+ return None
+
+ # Lookup a virEventLoopPureHandle object based on its event loop ID
+ def get_handle_by_id(self, handleID):
+ for h in self.handles:
+ if h.get_id() == handleID:
+ return h
+ return None
+
+
+ # This is the heart of the event loop, performing one single
+ # iteration. It asks when the next timeout is due, and then
+ # calcuates the maximum amount of time it is able to sleep
+ # for in poll() pending file handle events.
+ #
+ # It then goes into the poll() sleep.
+ #
+ # When poll() returns, there will zero or more file handle
+ # events which need to be dispatched to registered callbacks
+ # It may also be time to fire some periodic timers.
+ #
+ # Due to the coarse granularity of schedular timeslices, if
+ # we ask for a sleep of 500ms in order to satisfy a timer, we
+ # may return upto 1 schedular timeslice early. So even though
+ # our sleep timeout was reached, the registered timer may not
+ # technically be at its expiry point. This leads to us going
+ # back around the loop with a crazy 5ms sleep. So when checking
+ # if timeouts are due, we allow a margin of 20ms, to avoid
+ # these pointless repeated tiny sleeps.
+ def run_once(self):
+ sleep = -1
+ next = self.next_timeout()
+ self.debug("Next timeout due at %d" % next)
+ if next > 0:
+ now = int(time.time() * 1000)
+ if now >= next:
+ sleep = 0
+ else:
+ sleep = next - now
+
+ self.debug("Poll with a sleep of %d" % sleep)
+ events = self.poll.poll(sleep / 1000.0)
+
+ # Dispatch any file handle events that occurred
+ for (fd, revents) in events:
+ # See if the events was from the self-pipe
+ # telling us to wakup. if so, then discard
+ # the data just continue
+ if fd == self.pipetrick[0]:
+ data = os.read(fd, 1)
+ continue
+
+ h = self.get_handle_by_fd(fd)
+ if h:
+ self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents))
+ h.dispatch(self.events_from_poll(revents))
+
+ now = int(time.time() * 1000)
+ for t in self.timers:
+ interval = t.get_interval()
+ if interval < 0:
+ continue
+
+ want = t.get_last_fired() + interval
+ # Deduct 20ms, since schedular timeslice
+ # means we could be ever so slightly early
+ if now >= (want-20):
+ self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want)))
+ t.set_last_fired(now)
+ t.dispatch()
+
+
+ # Actually the event loop forever
+ def run_loop(self):
+ self.quit = False
+ while not self.quit:
+ self.run_once()
+
+ def interrupt(self):
+ os.write(self.pipetrick[1], 'c')
+
+
+ # Registers a new file handle 'fd', monitoring for 'events' (libvirt
+ # event constants), firing the callback cb() when an event occurs.
+ # Returns a unique integer identier for this handle, that should be
+ # used to later update/remove it
+ def add_handle(self, fd, events, cb, opaque):
+ handleID = self.nextHandleID + 1
+ self.nextHandleID = self.nextHandleID + 1
-mypoll = select.poll()
-TIMEOUT_MS = 1000
+ h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque)
+ self.handles.append(h)
-debug = False
+ self.poll.register(fd, self.events_to_poll(events))
+ self.interrupt()
-# handle globals
-h_fd = 0
-h_events = 0
-h_cb = None
-h_opaque = None
+ self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
-# timeout globals
-t_active = 0
-t_timeout = -1
-t_cb = None
-t_opaque = None
+ return handleID
-#####################################################
-# Callback Functions
-#####################################################
+ # Registers a new timer with periodic expiry at 'interval' ms,
+ # firing cb() each time the timer expires. If 'interval' is -1,
+ # then the timer is registered, but not enabled
+ # Returns a unique integer identier for this handle, that should be
+ # used to later update/remove it
+ def add_timer(self, interval, cb, opaque):
+ timerID = self.nextTimerID + 1
+ self.nextTimerID = self.nextTimerID + 1
+
+ h = self.virEventLoopPureTimer(timerID, interval, cb, opaque)
+ self.timers.append(h)
+ self.interrupt()
+
+ self.debug("Add timer %d interval %d" % (timerID, interval))
+
+ return timerID
+
+ # Change the set of events to be monitored on the file handle
+ def update_handle(self, handleID, events):
+ h = self.get_handle_by_id(handleID)
+ if h:
+ h.set_events(events)
+ self.poll.unregister(h.get_fd())
+ self.poll.register(h.get_fd(), self.events_to_poll(events))
+ self.interrupt()
+
+ self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
+
+ # Change the periodic frequency of the timer
+ def update_timer(self, timerID, interval):
+ for h in self.timers:
+ if h.get_id() == timerID:
+ h.set_interval(interval);
+ self.interrupt()
+
+ self.debug("Update timer %d interval %d" % (timerID, interval))
+ break
+
+ # Stop monitoring for events on the file handle
+ def remove_handle(self, handleID):
+ handles = []
+ for h in self.handles:
+ if h.get_id() == handleID:
+ self.poll.unregister(h.get_fd())
+ self.debug("Remove handle %d fd %d" % (handleID, h.get_fd()))
+ else:
+ handles.append(h)
+ self.handles = handles
+ self.interrupt()
+
+ # Stop firing the periodic timer
+ def remove_timer(self, timerID):
+ timers = []
+ for h in self.timers:
+ if h.get_id() != timerID:
+ timers.append(h)
+ self.debug("Remove timer %d" % timerID)
+ self.timers = timers
+ self.interrupt()
+
+ # Convert from libvirt event constants, to poll() events constants
+ def events_to_poll(self, events):
+ ret = 0
+ if events & libvirt.VIR_EVENT_HANDLE_READABLE:
+ ret |= select.POLLIN
+ if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
+ ret |= select.POLLOUT
+ if events & libvirt.VIR_EVENT_HANDLE_ERROR:
+ ret |= select.POLLERR;
+ if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
+ ret |= select.POLLHUP;
+ return ret
+
+ # Convert from poll() event constants, to libvirt events constants
+ def events_from_poll(self, events):
+ ret = 0;
+ if events & select.POLLIN:
+ ret |= libvirt.VIR_EVENT_HANDLE_READABLE;
+ if events & select.POLLOUT:
+ ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE;
+ if events & select.POLLNVAL:
+ ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
+ if events & select.POLLERR:
+ ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
+ if events & select.POLLHUP:
+ ret |= libvirt.VIR_EVENT_HANDLE_HANGUP;
+ return ret;
+
+
+###########################################################################
+# Now glue an instance of the general event loop into libvirt's event loop
+###########################################################################
+
+# This single global instance of the event loop wil be used for
+# monitoring libvirt events
+eventLoop = virEventLoopPure(debug=False)
+
+# This keeps track of what thread is running the event loop,
+# (if it is run in a background thread)
+eventLoopThread = None
+
+
+# These next set of 6 methods are the glue between the official
+# libvirt events API, and our particular impl of the event loop
+#
+# There is no reason why the 'virEventLoopPure' has to be used.
+# An application could easily may these 6 glue methods hook into
+# another event loop such as GLib's, or something like the python
+# Twisted event framework.
+
+def virEventAddHandleImpl(fd, events, cb, opaque):
+ global eventLoop
+ return eventLoop.add_handle(fd, events, cb, opaque)
+
+def virEventUpdateHandleImpl(handleID, events):
+ global eventLoop
+ return eventLoop.update_handle(handleID, events)
+
+def virEventRemoveHandleImpl(handleID):
+ global eventLoop
+ return eventLoop.remove_handle(handleID)
+
+def virEventAddTimerImpl(interval, cb, opaque):
+ global eventLoop
+ return eventLoop.add_timer(interval, cb, opaque)
+
+def virEventUpdateTimerImpl(timerID, interval):
+ global eventLoop
+ return eventLoop.update_timer(timerID, interval)
+
+def virEventRemoveTimerImpl(timerID):
+ global eventLoop
+ return eventLoop.remove_timer(timerID)
+
+# This tells libvirt what event loop implementation it
+# should use
+def virEventLoopPureRegister():
+ libvirt.virEventRegisterImpl(virEventAddHandleImpl,
+ virEventUpdateHandleImpl,
+ virEventRemoveHandleImpl,
+ virEventAddTimerImpl,
+ virEventUpdateTimerImpl,
+ virEventRemoveTimerImpl)
+
+# Directly run the event loop in the current thread
+def virEventLoopPureRun():
+ global eventLoop
+ eventLoop.run_loop()
+
+# Spawn a background thread to run the event loop
+def virEventLoopPureStart():
+ global eventLoopThread
+ virEventLoopPureRegister()
+ eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop")
+ eventLoopThread.setDaemon(True)
+ eventLoopThread.start()
+
+
+##########################################################################
+# Everything that now follows is a simple demo of domain lifecycle events
+##########################################################################
def eventToString(event):
eventStrings = ( "Added",
"Removed",
def myDomainEventCallback2 (conn, dom, event, detail, opaque):
print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.ID(), eventToString(event), detail)
-#####################################################
-# EventImpl Functions
-#####################################################
-def myEventHandleTypeToPollEvent(events):
- ret = 0
- if events & libvirt.VIR_EVENT_HANDLE_READABLE:
- ret |= select.POLLIN
- if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
- ret |= select.POLLOUT
- if events & libvirt.VIR_EVENT_HANDLE_ERROR:
- ret |= select.POLLERR;
- if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
- ret |= select.POLLHUP;
- return ret
-
-def myPollEventToEventHandleType(events):
- ret = 0;
- if events & select.POLLIN:
- ret |= libvirt.VIR_EVENT_HANDLE_READABLE;
- if events & select.POLLOUT:
- ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE;
- if events & select.POLLERR:
- ret |= libvirt.VIR_EVENT_HANDLE_ERROR;
- if events & select.POLLHUP:
- ret |= libvirt.VIR_EVENT_HANDLE_HANGUP;
- return ret;
-
-def myAddHandle(fd, events, cb, opaque):
- global h_fd, h_events, h_cb, h_opaque, debug
- if debug:
- print "Adding Handle %s %s %s %s" % (str(fd), str(events), str(cb), str(opaque))
- h_fd = fd
- h_events = events
- h_cb = cb
- h_opaque = opaque
- mypoll.register(fd, myEventHandleTypeToPollEvent(events))
- return 0
-
-def myUpdateHandle(watch, event):
- global h_fd, h_events, debug
- if debug:
- print "Updating Handle %s %s" % (str(h_fd), str(event))
- h_events = event
- mypoll.unregister(h_fd)
- mypoll.register(h_fd, myEventHandleTypeToPollEvent(event))
-
-def myRemoveHandle(watch):
- global h_fd, debug
- if debug:
- print "Removing Handle %s" % str(h_fd)
- mypoll.unregister(h_fd)
- h_fd = 0
- return h_opaque
-
-def myAddTimeout(timeout, cb, opaque):
- global t_active, t_timeout, t_cb, t_opaque, debug
- if debug:
- print "Adding Timeout %s %s %s" % (str(timeout), str(cb), str(opaque))
- if timeout == -1:
- t_active = 0
- else:
- t_active = 1
- t_timeout = timeout;
- t_cb = cb;
- t_opaque = opaque;
- return 0
-
-def myUpdateTimeout(timer, timeout):
- global t_timeout, t_active, debug
- if debug:
- print "Updating Timeout %s %s" % (str(timer), str(timeout))
- if timeout == -1:
- t_active = 0
- else:
- t_active = 1
- t_timeout = timeout;
-
-def myRemoveTimeout(timer):
- global t_active, debug
- if debug:
- print "Removing Timeout %s" % str(timer)
- t_active = 0;
- return t_opaque
-
-##########################################
-# Main
-##########################################
-
def usage():
print "usage: "+os.path.basename(sys.argv[0])+" [uri]"
print " uri will default to qemu:///system"
print "Using uri:" + uri
- libvirt.virEventRegisterImpl( myAddHandle,
- myUpdateHandle,
- myRemoveHandle,
- myAddTimeout,
- myUpdateTimeout,
- myRemoveTimeout );
+ # Run a background thread with the event loop
+ virEventLoopPureStart()
+
vc = libvirt.open(uri)
# Close connection on exit (to test cleanup paths)
vc.domainEventRegister(myDomainEventCallback1,None)
vc.domainEventRegister(myDomainEventCallback2,None)
+ # The rest of your app would go here normally, but for sake
+ # of demo we'll just go to sleep. The other option is to
+ # run the event loop in your main thread if your app is
+ # totally event based.
while 1:
- try:
- if debug:
- print "Poll sleep %d" % t_active
- sts = mypoll.poll(TIMEOUT_MS)
- except select.error, err:
- if err[0] == errno.EINTR:
- continue
- raise
- except KeyboardInterrupt:
- print "Keyboard Interrupt caught - exiting cleanly"
- break
-
- if t_cb and t_active == 1:
- if debug:
- print "Invoking Timeout CB"
- t_cb(t_timeout, t_opaque[0], t_opaque[1])
-
- if not sts:
- if debug:
- print "Timed out"
- continue
-
- rfd = sts[0][0]
- revents = sts[0][1]
-
- if revents & select.POLLHUP:
- print "Reset by peer";
- return -1;
-
- if h_cb != None:
- #print "Invoking Handle CB"
- h_cb(0, h_fd, myPollEventToEventHandleType(revents & h_events),
- h_opaque[0], h_opaque[1])
-
- #print "DEBUG EXIT"
- #break
+ time.sleep(1)
+
if __name__ == "__main__":
main()