+++ /dev/null
-#!/usr/bin/python3
-
-import asyncio
-import logging
-import redis.asyncio
-
-from .decorators import *
-
-# Setup logging
-log = logging.getLogger("pbs.cache")
-
-class Cache(object):
- def __init__(self, backend):
- self.backend = backend
-
- # Stores connections assigned to tasks
- self.__connections = {}
-
- # Create a connection pool
- self.pool = redis.asyncio.connection.ConnectionPool.from_url(
- "redis://localhost:6379/0", decode_responses=True,
- )
-
- async def connection(self, *args, **kwargs):
- """
- Returns a connection from the pool
- """
- # Fetch the current task
- task = asyncio.current_task()
-
- assert task, "Could not determine task"
-
- # Try returning the same connection to the same task
- try:
- return self.__connections[task]
- except KeyError:
- pass
-
- # Fetch a new connection from the pool
- conn = await redis.asyncio.Redis(
- connection_pool=self.pool,
- single_connection_client=True,
- )
-
- # Store the connection
- self.__connections[task] = conn
-
- log.debug("Assigning cache connection %s to %s" % (conn, task))
-
- # When the task finishes, release the connection
- task.add_done_callback(self.__release_connection)
-
- return conn
-
- def __release_connection(self, task):
- loop = asyncio.get_running_loop()
-
- # Retrieve the connection
- try:
- conn = self.__connections[task]
- except KeyError:
- return
-
- log.debug("Releasing cache connection %s of %s" % (conn, task))
-
- # Delete it
- del self.__connections[task]
-
- # Return the connection back into the pool
- asyncio.run_coroutine_threadsafe(conn.close(), loop)
-
- async def _run(self, command, *args, **kwargs):
- # Fetch our connection
- conn = await self.connection()
-
- # Get the function
- func = getattr(conn, command)
-
- # Call the function
- return await func(*args, **kwargs)
-
- async def get(self, key):
- """
- Fetches the value of a cached key
- """
- return await self._run("get", *args, **kwargs)
-
- async def set(self, *args, **kwargs):
- """
- Puts something into the cache
- """
- return await self._run("set", *args, **kwargs)
-
- async def delete(self, *args, **kwargs):
- """
- Deletes the key from the cache
- """
- return await self._run("delete", *args, **kwargs)
-
- async def transaction(self, *args, **kwargs):
- """
- Returns a new transaction
- """
- conn = await self.connection()
-
- return await conn.transaction(*args, **kwargs)
-
- async def pipeline(self, *args, **kwargs):
- """
- Returns a new pipeline
- """
- conn = await self.connection()
-
- return conn.pipeline(*args, **kwargs)