# Create an events queue
self.queue = queue.Queue(1024)
+ # Remember the last time the database was cleaned
+ self.last_cleanup_at = None
+
# Keep references to our workers
self.workers = []
# If there was nothing in the queue, we will try again
except queue.Empty:
+ # We have time to cleanup the database
+ self.cleanup()
+
+ # Nothing else to do in this iteration...
continue
# Parse the event
log.debug("Worker %s terminated" % self.native_id)
+ _cleanup = threading.Lock()
+
+ def cleanup(self):
+ """
+ Cleanup the database
+ """
+ now = datetime.datetime.utcnow()
+
+ # Cleanup the database if it has never been cleaned up
+ if self.reporter.last_cleanup_at is None:
+ pass
+
+ # Cleanup the database if the last cleanup has been
+ elif self.reporter.last_cleanup_at + datetime.timedelta(hours=6) <= now:
+ pass
+
+ # Otherwise we won't cleanup the database
+ else:
+ return
+
+ # Acquire the lock so this will only run once
+ if self._cleanup.acquire(blocking=False):
+ try:
+ log.debug("Cleaning up the database...")
+
+ # Determine the retention time
+ retention_days = datetime.timedelta(
+ days = self.config.getint("database", "retention", fallback=365 * 5)
+ )
+
+ # Save when we performed this last
+ self.reporter.last_cleanup_at = now
+
+ # Remove everything
+ self.db.execute(
+ "DELETE FROM alerts WHERE timestamp <= ?",
+ (now - retention_days,),
+ )
+
+ # Release the lock
+ finally:
+ self._cleanup.release()
+
def process(self, event):
"""
Called whenever we have received an event