]>
Commit | Line | Data |
---|---|---|
f0a8b392 MT |
1 | #!/usr/bin/python3 |
2 | ||
3 | import asyncio | |
4 | import logging | |
5 | import redis.asyncio | |
6 | ||
7 | from .decorators import * | |
8 | ||
9 | # Setup logging | |
8392330e | 10 | log = logging.getLogger() |
f0a8b392 MT |
11 | |
12 | class Cache(object): | |
13 | def __init__(self, backend): | |
14 | self.backend = backend | |
15 | ||
16 | # Stores connections assigned to tasks | |
17 | self.__connections = {} | |
18 | ||
19 | # Create a connection pool | |
20 | self.pool = redis.asyncio.connection.ConnectionPool.from_url( | |
4b73b7c8 | 21 | "redis://localhost:6379/0", |
f0a8b392 MT |
22 | ) |
23 | ||
24 | async def connection(self, *args, **kwargs): | |
25 | """ | |
26 | Returns a connection from the pool | |
27 | """ | |
28 | # Fetch the current task | |
29 | task = asyncio.current_task() | |
30 | ||
31 | assert task, "Could not determine task" | |
32 | ||
33 | # Try returning the same connection to the same task | |
34 | try: | |
35 | return self.__connections[task] | |
36 | except KeyError: | |
37 | pass | |
38 | ||
39 | # Fetch a new connection from the pool | |
40 | conn = await redis.asyncio.Redis( | |
41 | connection_pool=self.pool, | |
42 | single_connection_client=True, | |
43 | ) | |
44 | ||
45 | # Store the connection | |
46 | self.__connections[task] = conn | |
47 | ||
48 | log.debug("Assigning cache connection %s to %s" % (conn, task)) | |
49 | ||
50 | # When the task finishes, release the connection | |
51 | task.add_done_callback(self.__release_connection) | |
52 | ||
53 | return conn | |
54 | ||
55 | def __release_connection(self, task): | |
56 | loop = asyncio.get_running_loop() | |
57 | ||
58 | # Retrieve the connection | |
59 | try: | |
60 | conn = self.__connections[task] | |
61 | except KeyError: | |
62 | return | |
63 | ||
64 | log.debug("Releasing cache connection %s of %s" % (conn, task)) | |
65 | ||
66 | # Delete it | |
67 | del self.__connections[task] | |
68 | ||
69 | # Return the connection back into the pool | |
70 | asyncio.run_coroutine_threadsafe(conn.close(), loop) | |
71 | ||
72 | async def _run(self, command, *args, **kwargs): | |
73 | # Fetch our connection | |
74 | conn = await self.connection() | |
75 | ||
76 | # Get the function | |
77 | func = getattr(conn, command) | |
78 | ||
79 | # Call the function | |
80 | return await func(*args, **kwargs) | |
81 | ||
340f6f0a | 82 | async def get(self, *args, **kwargs): |
f0a8b392 MT |
83 | """ |
84 | Fetches the value of a cached key | |
85 | """ | |
86 | return await self._run("get", *args, **kwargs) | |
87 | ||
88 | async def set(self, *args, **kwargs): | |
89 | """ | |
90 | Puts something into the cache | |
91 | """ | |
92 | return await self._run("set", *args, **kwargs) | |
93 | ||
94 | async def delete(self, *args, **kwargs): | |
95 | """ | |
96 | Deletes the key from the cache | |
97 | """ | |
98 | return await self._run("delete", *args, **kwargs) | |
99 | ||
100 | async def transaction(self, *args, **kwargs): | |
101 | """ | |
102 | Returns a new transaction | |
103 | """ | |
104 | conn = await self.connection() | |
105 | ||
106 | return await conn.transaction(*args, **kwargs) | |
107 | ||
108 | async def pipeline(self, *args, **kwargs): | |
109 | """ | |
110 | Returns a new pipeline | |
111 | """ | |
112 | conn = await self.connection() | |
113 | ||
114 | return conn.pipeline(*args, **kwargs) |