Skip to content

Instantly share code, notes, and snippets.

@wakingyeung
Last active September 5, 2021 03:55
Show Gist options
  • Save wakingyeung/369f3db4e1630009d28776c49c5dfabe to your computer and use it in GitHub Desktop.
Save wakingyeung/369f3db4e1630009d28776c49c5dfabe to your computer and use it in GitHub Desktop.
Distributed locks with Redis for Python.
#!/usr/bin/env python
# -*- coding=utf-8 -*-
import math
import time
import uuid
import redis
def acquire(
conn: redis.client.Redis,
lockname: str,
acquire_timeout: int = 10,
lock_timeout: int = 10,
) -> str:
identifier = str(uuid.uuid4())
lockname = f"lock:{lockname}"
lock_timeout = int(math.ceil(lock_timeout))
end = time.time() + acquire_timeout
while time.time() < end:
if conn.setnx(lockname, identifier):
conn.expire(lockname, lock_timeout)
return identifier
elif conn.ttl(lockname) < 0:
conn.expire(lockname, lock_timeout)
time.sleep(0.001)
return ""
def release(conn: redis.client.Redis, lockname: str, identifier: str) -> bool:
pipe = conn.pipeline(True)
lockname = f"lock:{lockname}"
while True:
try:
pipe.watch(lockname)
if pipe.get(lockname) == identifier:
pipe.multi()
pipe.delete(lockname)
pipe.execute()
return True
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
return False
#!/usr/bin/env python
# -*- coding=utf-8 -*-
import math
import time
import uuid
from typing import Any, List
import redis
def script_load(script: str):
sha = [None]
def call(
conn: redis.client.Redis,
keys: List[str] = None,
args: List[Any] = None,
force_eval: bool = False,
):
if keys is None:
keys = []
if args is None:
args = []
if not force_eval:
if not sha[0]:
sha[0] = conn.execute_command("SCRIPT", "LOAD", script, parse="LOAD")
try:
return conn.execute_command(
"EVALSHA", sha[0], len(keys), *(keys + args)
)
except redis.exceptions.ResponseError as e:
if not e.args[0].startswith("NOSCRIPT"):
raise
return conn.execute_command("EVAL", script, len(keys), *(keys + args))
return call
def acquire(
conn: redis.client.Redis,
lockname: str,
acquire_timeout: int = 10,
lock_timeout: int = 10,
) -> str:
identifier = str(uuid.uuid4())
lockname = f"lock:{lockname}"
lock_timeout = int(math.ceil(lock_timeout))
acquired = False
end = time.time() + acquire_timeout
while time.time() < end and not acquired:
acquired = (
acquire_with_lua(conn, [lockname], [lock_timeout, identifier]) == b"OK"
)
time.sleep(0.001 * int(not acquired))
if not acquired:
return ""
return identifier
acquire_with_lua = script_load(
"""
if redis.call('exists', KEYS[1]) == 0 then
return redis.call('setex', KEYS[1], unpack(ARGV))
end
"""
)
def release(conn: redis.client.Redis, lockname: str, identifier: str) -> bool:
lockname = f"lock:{lockname}"
return bool(release_with_lua(conn, [lockname], [identifier]))
release_with_lua = script_load(
"""
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1]) or 1
else
return 0
end
"""
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment