]> git.ipfire.org Git - ipfire.org.git/commitdiff
cache: Import redis cache module from PBS
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 25 Oct 2023 13:59:25 +0000 (13:59 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 25 Oct 2023 13:59:25 +0000 (13:59 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/backend/base.py
src/backend/cache.py [new file with mode: 0644]

index c691f2d65cded7b2e104d83102a87cbb06001eac..b05358f9c9db7dbdd2091b0fd8acce6278fe076e 100644 (file)
@@ -53,6 +53,7 @@ backend_PYTHON = \
        src/backend/base.py \
        src/backend/blog.py \
        src/backend/bugzilla.py \
+       src/backend/cache.py \
        src/backend/campaigns.py \
        src/backend/countries.py \
        src/backend/database.py \
index 40e1c7a5a39933f9c1ddfb576432586bb6c3bf1b..6e816987b88c1ce874b0169d31ec5c14e379c759 100644 (file)
@@ -11,6 +11,7 @@ from . import accounts
 from . import asterisk
 from . import blog
 from . import bugzilla
+from . import cache
 from . import campaigns
 from . import database
 from . import fireinfo
@@ -56,6 +57,9 @@ class Backend(object):
                # Create HTTPClient
                self.http_client = httpclient.HTTPClient(self)
 
+               # Initialize the cache
+               self.cache = cache.Cache(self)
+
                # Initialize settings first.
                self.settings = settings.Settings(self)
                self.memcache = memcached.Memcached(self)
diff --git a/src/backend/cache.py b/src/backend/cache.py
new file mode 100644 (file)
index 0000000..19f35de
--- /dev/null
@@ -0,0 +1,114 @@
+#!/usr/bin/python3
+
+import asyncio
+import logging
+import redis.asyncio
+
+from .decorators import *
+
+# Setup logging
+log = logging.getLogger("pbs.cache")
+
+class Cache(object):
+       def __init__(self, backend):
+               self.backend = backend
+
+               # Stores connections assigned to tasks
+               self.__connections = {}
+
+               # Create a connection pool
+               self.pool = redis.asyncio.connection.ConnectionPool.from_url(
+                       "redis://localhost:6379/0", decode_responses=True,
+               )
+
+       async def connection(self, *args, **kwargs):
+               """
+                       Returns a connection from the pool
+               """
+               # Fetch the current task
+               task = asyncio.current_task()
+
+               assert task, "Could not determine task"
+
+               # Try returning the same connection to the same task
+               try:
+                       return self.__connections[task]
+               except KeyError:
+                       pass
+
+               # Fetch a new connection from the pool
+               conn = await redis.asyncio.Redis(
+                       connection_pool=self.pool,
+                       single_connection_client=True,
+               )
+
+               # Store the connection
+               self.__connections[task] = conn
+
+               log.debug("Assigning cache connection %s to %s" % (conn, task))
+
+               # When the task finishes, release the connection
+               task.add_done_callback(self.__release_connection)
+
+               return conn
+
+       def __release_connection(self, task):
+               loop = asyncio.get_running_loop()
+
+               # Retrieve the connection
+               try:
+                       conn = self.__connections[task]
+               except KeyError:
+                       return
+
+               log.debug("Releasing cache connection %s of %s" % (conn, task))
+
+               # Delete it
+               del self.__connections[task]
+
+               # Return the connection back into the pool
+               asyncio.run_coroutine_threadsafe(conn.close(), loop)
+
+       async def _run(self, command, *args, **kwargs):
+               # Fetch our connection
+               conn = await self.connection()
+
+               # Get the function
+               func = getattr(conn, command)
+
+               # Call the function
+               return await func(*args, **kwargs)
+
+       async def get(self, key):
+               """
+                       Fetches the value of a cached key
+               """
+               return await self._run("get", *args, **kwargs)
+
+       async def set(self, *args, **kwargs):
+               """
+                       Puts something into the cache
+               """
+               return await self._run("set", *args, **kwargs)
+
+       async def delete(self, *args, **kwargs):
+               """
+                       Deletes the key from the cache
+               """
+               return await self._run("delete", *args, **kwargs)
+
+       async def transaction(self, *args, **kwargs):
+               """
+                       Returns a new transaction
+               """
+               conn = await self.connection()
+
+               return await conn.transaction(*args, **kwargs)
+
+       async def pipeline(self, *args, **kwargs):
+               """
+                       Returns a new pipeline
+               """
+               conn = await self.connection()
+
+               return conn.pipeline(*args, **kwargs)