From f0a8b392381f051c38829f439ac634614433a952 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Wed, 25 Oct 2023 13:59:25 +0000 Subject: [PATCH] cache: Import redis cache module from PBS Signed-off-by: Michael Tremer --- Makefile.am | 1 + src/backend/base.py | 4 ++ src/backend/cache.py | 114 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 src/backend/cache.py diff --git a/Makefile.am b/Makefile.am index c691f2d6..b05358f9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -53,6 +53,7 @@ backend_PYTHON = \ src/backend/base.py \ src/backend/blog.py \ src/backend/bugzilla.py \ + src/backend/cache.py \ src/backend/campaigns.py \ src/backend/countries.py \ src/backend/database.py \ diff --git a/src/backend/base.py b/src/backend/base.py index 40e1c7a5..6e816987 100644 --- a/src/backend/base.py +++ b/src/backend/base.py @@ -11,6 +11,7 @@ from . import accounts from . import asterisk from . import blog from . import bugzilla +from . import cache from . import campaigns from . import database from . import fireinfo @@ -56,6 +57,9 @@ class Backend(object): # Create HTTPClient self.http_client = httpclient.HTTPClient(self) + # Initialize the cache + self.cache = cache.Cache(self) + # Initialize settings first. self.settings = settings.Settings(self) self.memcache = memcached.Memcached(self) diff --git a/src/backend/cache.py b/src/backend/cache.py new file mode 100644 index 00000000..19f35de5 --- /dev/null +++ b/src/backend/cache.py @@ -0,0 +1,114 @@ +#!/usr/bin/python3 + +import asyncio +import logging +import redis.asyncio + +from .decorators import * + +# Setup logging +log = logging.getLogger("pbs.cache") + +class Cache(object): + def __init__(self, backend): + self.backend = backend + + # Stores connections assigned to tasks + self.__connections = {} + + # Create a connection pool + self.pool = redis.asyncio.connection.ConnectionPool.from_url( + "redis://localhost:6379/0", decode_responses=True, + ) + + async def connection(self, *args, **kwargs): + """ + Returns a connection from the pool + """ + # Fetch the current task + task = asyncio.current_task() + + assert task, "Could not determine task" + + # Try returning the same connection to the same task + try: + return self.__connections[task] + except KeyError: + pass + + # Fetch a new connection from the pool + conn = await redis.asyncio.Redis( + connection_pool=self.pool, + single_connection_client=True, + ) + + # Store the connection + self.__connections[task] = conn + + log.debug("Assigning cache connection %s to %s" % (conn, task)) + + # When the task finishes, release the connection + task.add_done_callback(self.__release_connection) + + return conn + + def __release_connection(self, task): + loop = asyncio.get_running_loop() + + # Retrieve the connection + try: + conn = self.__connections[task] + except KeyError: + return + + log.debug("Releasing cache connection %s of %s" % (conn, task)) + + # Delete it + del self.__connections[task] + + # Return the connection back into the pool + asyncio.run_coroutine_threadsafe(conn.close(), loop) + + async def _run(self, command, *args, **kwargs): + # Fetch our connection + conn = await self.connection() + + # Get the function + func = getattr(conn, command) + + # Call the function + return await func(*args, **kwargs) + + async def get(self, key): + """ + Fetches the value of a cached key + """ + return await self._run("get", *args, **kwargs) + + async def set(self, *args, **kwargs): + """ + Puts something into the cache + """ + return await self._run("set", *args, **kwargs) + + async def delete(self, *args, **kwargs): + """ + Deletes the key from the cache + """ + return await self._run("delete", *args, **kwargs) + + async def transaction(self, *args, **kwargs): + """ + Returns a new transaction + """ + conn = await self.connection() + + return await conn.transaction(*args, **kwargs) + + async def pipeline(self, *args, **kwargs): + """ + Returns a new pipeline + """ + conn = await self.connection() + + return conn.pipeline(*args, **kwargs) -- 2.47.3