]>
git.ipfire.org Git - ipfire.org.git/blob - src/backend/cache.py
7 from .decorators
import *
10 log
= logging
.getLogger()
13 def __init__(self
, backend
):
14 self
.backend
= backend
16 # Stores connections assigned to tasks
17 self
.__connections
= {}
19 # Create a connection pool
20 self
.pool
= redis
.asyncio
.connection
.ConnectionPool
.from_url(
21 "redis://localhost:6379/0",
24 async def connection(self
, *args
, **kwargs
):
26 Returns a connection from the pool
28 # Fetch the current task
29 task
= asyncio
.current_task()
31 assert task
, "Could not determine task"
33 # Try returning the same connection to the same task
35 return self
.__connections
[task
]
39 # Fetch a new connection from the pool
40 conn
= await redis
.asyncio
.Redis(
41 connection_pool
=self
.pool
,
42 single_connection_client
=True,
45 # Store the connection
46 self
.__connections
[task
] = conn
48 log
.debug("Assigning cache connection %s to %s" % (conn
, task
))
50 # When the task finishes, release the connection
51 task
.add_done_callback(self
.__release
_connection
)
55 def __release_connection(self
, task
):
56 loop
= asyncio
.get_running_loop()
58 # Retrieve the connection
60 conn
= self
.__connections
[task
]
64 log
.debug("Releasing cache connection %s of %s" % (conn
, task
))
67 del self
.__connections
[task
]
69 # Return the connection back into the pool
70 asyncio
.run_coroutine_threadsafe(conn
.close(), loop
)
72 async def _run(self
, command
, *args
, **kwargs
):
73 # Fetch our connection
74 conn
= await self
.connection()
77 func
= getattr(conn
, command
)
80 return await func(*args
, **kwargs
)
82 async def get(self
, *args
, **kwargs
):
84 Fetches the value of a cached key
86 return await self
._run
("get", *args
, **kwargs
)
88 async def set(self
, *args
, **kwargs
):
90 Puts something into the cache
92 return await self
._run
("set", *args
, **kwargs
)
94 async def delete(self
, *args
, **kwargs
):
96 Deletes the key from the cache
98 return await self
._run
("delete", *args
, **kwargs
)
100 async def transaction(self
, *args
, **kwargs
):
102 Returns a new transaction
104 conn
= await self
.connection()
106 return await conn
.transaction(*args
, **kwargs
)
108 async def pipeline(self
, *args
, **kwargs
):
110 Returns a new pipeline
112 conn
= await self
.connection()
114 return conn
.pipeline(*args
, **kwargs
)