]> git.ipfire.org Git - people/jschlag/pbs.git/blame - src/buildservice/scheduler.py
Run all cronjobs as user pakfire
[people/jschlag/pbs.git] / src / buildservice / scheduler.py
CommitLineData
83be3106
MT
1#!/usr/bin/python
2
3import logging
52d18d04 4import multiprocessing
83be3106
MT
5import time
6import traceback
7
57859ebc 8from . import Backend
52d18d04 9
83be3106
MT
10class Event(object):
11 interval = None
12
13 priority = 0
14
52d18d04
MT
15 def __init__(self, *args, **kwargs):
16 self.args = args
17 self.kwargs = kwargs
83be3106
MT
18
19 self._next_start_time = 0
20
21 self.scheduler = None
22
23 def __repr__(self):
24 if hasattr(self, "_next_start_time"):
25 return "<%s next_start_in=%ds>" % \
26 (self.__class__.__name__, self._next_start_time - time.time())
27
28 return "<%s>" % self.__class__.__name__
29
30 def run(self, *args, **kwargs):
31 raise NotImplemented
32
52d18d04
MT
33 def run_subprocess_background(self, method, *args):
34 arguments = [method,] + list(args)
35
36 process = multiprocessing.Process(target=self.fork, args=arguments)
37 process.daemon = False
38
39 # Start the process.
40 process.start()
41
42 return process
43
44 def run_subprocess(self, *args):
45 process = self.run_subprocess_background(*args)
46
47 # Wait until process has finished.
48 process.join()
49
50 @staticmethod
51 def fork(method, *args, **kwargs):
52 # Create new pakfire instance.
57859ebc 53 backend = Backend()
52d18d04 54
57859ebc 55 return method(backend, *args, **kwargs)
52d18d04 56
83be3106
MT
57
58class Scheduler(object):
59 def __init__(self):
60 self._queue = []
61
62 def add_event(self, event, start_time=None):
63 event.scheduler = self
64
65 self._queue.append(event)
66
67 # Set initial start time.
68 if start_time is None:
69 start_time = time.time()
70
71 event._next_start_time = start_time
72
73 def sort_queue(self):
74 self._queue.sort(key=lambda e: (e.priority, e._next_start_time))
75
76 def run(self):
77 while self._queue:
78 self.sort_queue()
52d18d04 79 print self._queue
83be3106
MT
80
81 for event in self._queue:
82 # If the event has to be started some time in
83 # the future.
84 if event._next_start_time <= time.time():
85 try:
86 logging.info("Running %s..." % event)
87
52d18d04 88 event.run(*event.args, **event.kwargs)
83be3106
MT
89
90 # In case the user interrupts the scheduler.
91 except KeyboardInterrupt:
92 # Stop immediately.
93 return
94
95 except:
96 traceback.print_exc()
97
98 finally:
99 # Set the next execution time if the event
100 # should be run again.
101 if event.interval:
102 event._next_start_time = time.time() + event.interval
103
104 # Otherwise remove it from the queue.
105 else:
106 self._queue.remove(event)
107
108 # Get back to outer loop and sort the queue again.
109 break
110
111 # Sleep a bit.
112 time.sleep(1)