--- /dev/null
+#!/usr/bin/python3
+
+import asyncio
+import logging
+import redis.asyncio
+
+from .decorators import *
+
+# Setup logging
+log = logging.getLogger("pbs.cache")
+
+class Cache(object):
+ def __init__(self, backend):
+ self.backend = backend
+
+ # Stores connections assigned to tasks
+ self.__connections = {}
+
+ # Create a connection pool
+ self.pool = redis.asyncio.connection.ConnectionPool.from_url(
+ "redis://localhost:6379/0", decode_responses=True,
+ )
+
+ async def connection(self, *args, **kwargs):
+ """
+ Returns a connection from the pool
+ """
+ # Fetch the current task
+ task = asyncio.current_task()
+
+ assert task, "Could not determine task"
+
+ # Try returning the same connection to the same task
+ try:
+ return self.__connections[task]
+ except KeyError:
+ pass
+
+ # Fetch a new connection from the pool
+ conn = await redis.asyncio.Redis(
+ connection_pool=self.pool,
+ single_connection_client=True,
+ )
+
+ # Store the connection
+ self.__connections[task] = conn
+
+ log.debug("Assigning cache connection %s to %s" % (conn, task))
+
+ # When the task finishes, release the connection
+ task.add_done_callback(self.__release_connection)
+
+ return conn
+
+ def __release_connection(self, task):
+ loop = asyncio.get_running_loop()
+
+ # Retrieve the connection
+ try:
+ conn = self.__connections[task]
+ except KeyError:
+ return
+
+ log.debug("Releasing cache connection %s of %s" % (conn, task))
+
+ # Delete it
+ del self.__connections[task]
+
+ # Return the connection back into the pool
+ asyncio.run_coroutine_threadsafe(conn.close(), loop)
+
+ async def _run(self, command, *args, **kwargs):
+ # Fetch our connection
+ conn = await self.connection()
+
+ # Get the function
+ func = getattr(conn, command)
+
+ # Call the function
+ return await func(*args, **kwargs)
+
+ async def get(self, key):
+ """
+ Fetches the value of a cached key
+ """
+ return await self._run("get", *args, **kwargs)
+
+ async def set(self, *args, **kwargs):
+ """
+ Puts something into the cache
+ """
+ return await self._run("set", *args, **kwargs)
+
+ async def delete(self, *args, **kwargs):
+ """
+ Deletes the key from the cache
+ """
+ return await self._run("delete", *args, **kwargs)
+
+ async def transaction(self, *args, **kwargs):
+ """
+ Returns a new transaction
+ """
+ conn = await self.connection()
+
+ return await conn.transaction(*args, **kwargs)
+
+ async def pipeline(self, *args, **kwargs):
+ """
+ Returns a new pipeline
+ """
+ conn = await self.connection()
+
+ return conn.pipeline(*args, **kwargs)