]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/cache.py
wiki: Only match usernames when a word starts with @
[ipfire.org.git] / src / backend / cache.py
1 #!/usr/bin/python3
2
3 import asyncio
4 import logging
5 import redis.asyncio
6
7 from .decorators import *
8
9 # Setup logging
10 log = logging.getLogger()
11
12 class Cache(object):
13 def __init__(self, backend):
14 self.backend = backend
15
16 # Stores connections assigned to tasks
17 self.__connections = {}
18
19 # Create a connection pool
20 self.pool = redis.asyncio.connection.ConnectionPool.from_url(
21 "redis://localhost:6379/0",
22 )
23
24 async def connection(self, *args, **kwargs):
25 """
26 Returns a connection from the pool
27 """
28 # Fetch the current task
29 task = asyncio.current_task()
30
31 assert task, "Could not determine task"
32
33 # Try returning the same connection to the same task
34 try:
35 return self.__connections[task]
36 except KeyError:
37 pass
38
39 # Fetch a new connection from the pool
40 conn = await redis.asyncio.Redis(
41 connection_pool=self.pool,
42 single_connection_client=True,
43 )
44
45 # Store the connection
46 self.__connections[task] = conn
47
48 log.debug("Assigning cache connection %s to %s" % (conn, task))
49
50 # When the task finishes, release the connection
51 task.add_done_callback(self.__release_connection)
52
53 return conn
54
55 def __release_connection(self, task):
56 loop = asyncio.get_running_loop()
57
58 # Retrieve the connection
59 try:
60 conn = self.__connections[task]
61 except KeyError:
62 return
63
64 log.debug("Releasing cache connection %s of %s" % (conn, task))
65
66 # Delete it
67 del self.__connections[task]
68
69 # Return the connection back into the pool
70 asyncio.run_coroutine_threadsafe(conn.close(), loop)
71
72 async def _run(self, command, *args, **kwargs):
73 # Fetch our connection
74 conn = await self.connection()
75
76 # Get the function
77 func = getattr(conn, command)
78
79 # Call the function
80 return await func(*args, **kwargs)
81
82 async def get(self, *args, **kwargs):
83 """
84 Fetches the value of a cached key
85 """
86 return await self._run("get", *args, **kwargs)
87
88 async def set(self, *args, **kwargs):
89 """
90 Puts something into the cache
91 """
92 return await self._run("set", *args, **kwargs)
93
94 async def delete(self, *args, **kwargs):
95 """
96 Deletes the key from the cache
97 """
98 return await self._run("delete", *args, **kwargs)
99
100 async def transaction(self, *args, **kwargs):
101 """
102 Returns a new transaction
103 """
104 conn = await self.connection()
105
106 return await conn.transaction(*args, **kwargs)
107
108 async def pipeline(self, *args, **kwargs):
109 """
110 Returns a new pipeline
111 """
112 conn = await self.connection()
113
114 return conn.pipeline(*args, **kwargs)