Last active
September 5, 2021 03:55
-
-
Save wakingyeung/369f3db4e1630009d28776c49c5dfabe to your computer and use it in GitHub Desktop.
Distributed locks with Redis for Python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding=utf-8 -*- | |
import math | |
import time | |
import uuid | |
import redis | |
def acquire( | |
conn: redis.client.Redis, | |
lockname: str, | |
acquire_timeout: int = 10, | |
lock_timeout: int = 10, | |
) -> str: | |
identifier = str(uuid.uuid4()) | |
lockname = f"lock:{lockname}" | |
lock_timeout = int(math.ceil(lock_timeout)) | |
end = time.time() + acquire_timeout | |
while time.time() < end: | |
if conn.setnx(lockname, identifier): | |
conn.expire(lockname, lock_timeout) | |
return identifier | |
elif conn.ttl(lockname) < 0: | |
conn.expire(lockname, lock_timeout) | |
time.sleep(0.001) | |
return "" | |
def release(conn: redis.client.Redis, lockname: str, identifier: str) -> bool: | |
pipe = conn.pipeline(True) | |
lockname = f"lock:{lockname}" | |
while True: | |
try: | |
pipe.watch(lockname) | |
if pipe.get(lockname) == identifier: | |
pipe.multi() | |
pipe.delete(lockname) | |
pipe.execute() | |
return True | |
pipe.unwatch() | |
break | |
except redis.exceptions.WatchError: | |
pass | |
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding=utf-8 -*- | |
import math | |
import time | |
import uuid | |
from typing import Any, List | |
import redis | |
def script_load(script: str): | |
sha = [None] | |
def call( | |
conn: redis.client.Redis, | |
keys: List[str] = None, | |
args: List[Any] = None, | |
force_eval: bool = False, | |
): | |
if keys is None: | |
keys = [] | |
if args is None: | |
args = [] | |
if not force_eval: | |
if not sha[0]: | |
sha[0] = conn.execute_command("SCRIPT", "LOAD", script, parse="LOAD") | |
try: | |
return conn.execute_command( | |
"EVALSHA", sha[0], len(keys), *(keys + args) | |
) | |
except redis.exceptions.ResponseError as e: | |
if not e.args[0].startswith("NOSCRIPT"): | |
raise | |
return conn.execute_command("EVAL", script, len(keys), *(keys + args)) | |
return call | |
def acquire( | |
conn: redis.client.Redis, | |
lockname: str, | |
acquire_timeout: int = 10, | |
lock_timeout: int = 10, | |
) -> str: | |
identifier = str(uuid.uuid4()) | |
lockname = f"lock:{lockname}" | |
lock_timeout = int(math.ceil(lock_timeout)) | |
acquired = False | |
end = time.time() + acquire_timeout | |
while time.time() < end and not acquired: | |
acquired = ( | |
acquire_with_lua(conn, [lockname], [lock_timeout, identifier]) == b"OK" | |
) | |
time.sleep(0.001 * int(not acquired)) | |
if not acquired: | |
return "" | |
return identifier | |
acquire_with_lua = script_load( | |
""" | |
if redis.call('exists', KEYS[1]) == 0 then | |
return redis.call('setex', KEYS[1], unpack(ARGV)) | |
end | |
""" | |
) | |
def release(conn: redis.client.Redis, lockname: str, identifier: str) -> bool: | |
lockname = f"lock:{lockname}" | |
return bool(release_with_lua(conn, [lockname], [identifier])) | |
release_with_lua = script_load( | |
""" | |
if redis.call('get', KEYS[1]) == ARGV[1] then | |
return redis.call('del', KEYS[1]) or 1 | |
else | |
return 0 | |
end | |
""" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment