return task
+ def run_periodic_task(self, delay, callback, *args):
+ """
+ Calls the given callback periodically in the background
+ """
+ self.run_task(self._periodic_task, delay, callback, *args)
+
+ async def _periodic_task(self, delay, callback, *args):
+ """
+ Helper function for run_periodic_task() that will call the given
+ callback regulary after the timer has expired.
+ """
+ log.debug("Periodic callback %r started" % callback)
+
+ while True:
+ # Run the callback in a separate task
+ self.run_task(callback, *args)
+
+ # Wait a little moment
+ await asyncio.sleep(delay)
+
@functools.cached_property
def auth(self):
return auth.Auth(self)
debug = True,
)
+# Register background tasks
+backend.run_periodic_task(600, backend.ratelimiter.cleanup)
+
api_key_header = fastapi.security.APIKeyHeader(name="X-API-Key")
async def require_current_user(