Created
August 20, 2021 14:10
-
-
Save Sinkler/0b6389ff8ced9e1c0d82e2c478862a08 to your computer and use it in GitHub Desktop.
The emcache backend for the aiocache library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import typing | |
import emcache | |
from aiocache.base import BaseCache | |
from aiocache.serializers import BaseSerializer | |
import settings | |
class EmcacheCache(BaseCache): | |
NAME = "memcached" | |
def __init__(self, serializer=None, endpoint="127.0.0.1", port=11211, pool_size=2, loop=None, **kwargs): | |
super().__init__(**kwargs) | |
self.endpoint = endpoint | |
self.port = port | |
self.pool_size = int(pool_size) | |
self._loop = loop | |
self.client: typing.Optional[typing.Union[emcache.Client, emcache.client._Client]] = None | |
self.hosts = [emcache.MemcachedHostAddress(self.endpoint, self.port)] | |
self.serializer = serializer | |
def __repr__(self): # pragma: no cover | |
return "EmcacheCache ({}:{})".format(self.endpoint, self.port) | |
def _build_key(self, key, namespace=None): | |
ns_key = super()._build_key(key, namespace=namespace).replace(" ", "_") | |
return str.encode(ns_key) | |
async def _load_client(self): | |
if self.client is None or self.client._closed: | |
self.client = await emcache.create_client( | |
self.hosts, | |
timeout=settings.CACHE_TIMEOUT, | |
max_connections=self.pool_size, | |
autobatching=True, | |
) | |
async def _get(self, key, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None): | |
await self._load_client() | |
item = await self.client.get(key) | |
if encoding is None and item: | |
return item.value | |
if item is None: | |
return item | |
return item.value.decode(encoding) | |
async def _gets(self, key, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None): | |
await self._load_client() | |
key = key.encode() if isinstance(key, str) else key | |
item = await self.client.gets(key) | |
return item and item.cas | |
async def _multi_get(self, keys, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None): | |
await self._load_client() | |
keys = [key.encode() if isinstance(key, str) else key for key in keys] | |
values = [] | |
for item in (await self.client.get_many(keys)).values(): | |
if encoding is None and item: | |
values.append(item.value) | |
elif item is None: | |
values.append(None) | |
else: | |
values.append(item.value.decode(encoding)) | |
return values | |
async def _set(self, key, value, ttl=0, _cas_token=None, _conn=None): | |
await self._load_client() | |
value = value.encode() if isinstance(value, str) else value | |
if _cas_token is not None: | |
await self._cas(key, value, _cas_token, ttl=ttl, _conn=_conn) | |
return True | |
await self.client.set(key, value, exptime=ttl or 0) | |
return True | |
async def _cas(self, key, value, token, ttl=None, _conn=None): | |
await self._load_client() | |
try: | |
await self.client.cas(key, value, token, exptime=ttl or 0) | |
except emcache.NotStoredStorageCommandError: | |
return False | |
return True | |
async def _multi_set(self, pairs, ttl=0, _conn=None): | |
await self._load_client() | |
tasks = [] | |
for key, value in pairs: | |
value = str.encode(value) if isinstance(value, str) else value | |
tasks.append(self.client.set(key, value, exptime=ttl or 0)) | |
await asyncio.gather(*tasks) | |
return True | |
async def _add(self, key, value, ttl=0, _conn=None): | |
await self._load_client() | |
value = str.encode(value) if isinstance(value, str) else value | |
await self.client.add(key, value, exptime=ttl or 0) | |
return True | |
async def _exists(self, key, _conn=None): | |
await self._load_client() | |
try: | |
await self.client.append(key, b"") | |
except emcache.NotStoredStorageCommandError: | |
return False | |
return True | |
async def _increment(self, key, delta, _conn=None): | |
await self._load_client() | |
incremented = None | |
try: | |
if delta > 0: | |
incremented = await self.client.increment(key, delta) | |
else: | |
incremented = await self.client.decrement(key, abs(delta)) | |
except emcache.NotFoundCommandError: | |
await self._set(key, str(delta)) | |
return incremented or delta | |
async def _expire(self, key, ttl, _conn=None): | |
await self._load_client() | |
return await self.client.touch(key, ttl) | |
async def _delete(self, key, _conn=None): | |
await self._load_client() | |
try: | |
await self.client.delete(key) | |
except emcache.NotFoundCommandError: | |
return False | |
return True | |
async def _clear(self, namespace=None, _conn=None): | |
await self._load_client() | |
if namespace: | |
raise ValueError("MemcachedBackend doesnt support flushing by namespace") | |
for idx, host in enumerate(self.hosts): | |
await self.client.flush_all(host, delay=10 + (10 * idx)) | |
return True | |
async def _redlock_release(self, key, _): | |
await self._load_client() | |
return await self._delete(key) | |
async def _close(self, *args, _conn=None, **kwargs): | |
await self._load_client() | |
await self.client.close() | |
@classmethod | |
def parse_uri_path(self, path): | |
return {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment