Skip to content

Instantly share code, notes, and snippets.

@zamzterz
Last active April 21, 2016 16:37
Show Gist options
  • Save zamzterz/d425b5f8a7e4159a77c0f33107659cbc to your computer and use it in GitHub Desktop.
Save zamzterz/d425b5f8a7e4159a77c0f33107659cbc to your computer and use it in GitHub Desktop.
Retry jobs with back off when using RQ
import base64
import datetime as dt
from time import sleep
from redis.client import StrictRedis
from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue, get_failed_queue
from rq.utils import utcnow
from rq.worker import SimpleWorker
class JobWithBackOffRetry(Job):
"""
Custom job class with support for keeping track of when a retry should be made.
Requires https://github.com/nvie/rq/pull/694
A "back off scheme", which is a series of integers defining the number of seconds before the next retry should
be attempted, is used as a basis for defining a 'next_retry' attribute.
The 'next_retry' if set, will be a datetime.datetime instance defining the earliest point in time which
the job should be retried. If it is not set, it means the task should not be retried again.
A use case for this would be a reoccurring script fetching all jobs from RQ's fail queue, finding all
jobs which have 'next_retry' < now(). This can then be passed to rq.job.requeue or to the command line tool
`rq requeue`. Any JobWithBackOffRetry where next_retry is None should be removed removed from the failed queue
with FailedQueue.remove, and can be removed from Redis with Job.remove.
"""
KEY_BACK_OFF_INDEX = 'back_off_index'
KEY_BACK_OFF_SCHEME = 'back_off_scheme'
KEY_NEXT_RETRY = 'next_retry'
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None,
depends_on=None, timeout=None, id=None, origin=None,
back_off_scheme=None):
job = super().create(func, args, kwargs, connection,
result_ttl, ttl, status, description,
depends_on, timeout, id, origin)
job.back_off_scheme = back_off_scheme or (1, 10, 300, 600, 1800, 3600)
return job
def __init__(self, id=None, connection=None):
super().__init__(id, connection)
self._back_off_index = 0
self.next_retry = None
def set_status(self, status, pipeline=None):
super().set_status(status, pipeline)
if status == JobStatus.FAILED:
self._update_next_retry()
def refresh(self):
super().refresh()
self._back_off_index = self.meta[self.KEY_BACK_OFF_INDEX]
self.back_off_scheme = self.meta[self.KEY_BACK_OFF_SCHEME]
self.next_retry = self.meta.get(self.KEY_NEXT_RETRY, None)
def to_dict(self):
self.meta[self.KEY_BACK_OFF_INDEX] = self._back_off_index
self.meta[self.KEY_BACK_OFF_SCHEME] = self.back_off_scheme
self.meta[self.KEY_NEXT_RETRY] = self.next_retry
return super().to_dict()
def _update_next_retry(self):
if self._back_off_index < len(self.back_off_scheme):
back_off_value = self.back_off_scheme[self._back_off_index]
self.next_retry = utcnow() + dt.timedelta(seconds=back_off_value)
self._back_off_index += 1
else:
self.next_retry = None
def __repr__(self):
return 'Job({0!r}, next_retry={1!r})'.format(self._id, self.next_retry)
conn = StrictRedis()
q = Queue(connection=conn, job_class=JobWithBackOffRetry)
fq = get_failed_queue(conn)
# CAUTION: if you have any queued jobs before running this script, these will be removed
q.empty()
fq.empty()
# create job that will always fail, due to an exception
back_off_scheme = (1, 2, 3)
job = JobWithBackOffRetry.create(func=base64.b64encode, args=('a',), connection=conn, back_off_scheme=back_off_scheme)
q.enqueue_job(job)
worker = SimpleWorker([q], connection=q.connection, job_class=JobWithBackOffRetry)
worker.work(burst=True)
for wait_time in back_off_scheme:
sleep(wait_time)
works_that_should_be_retried = [job for job in fq.jobs if
isinstance(job, JobWithBackOffRetry) and
job.next_retry is not None and
utcnow() > job.next_retry]
assert len(works_that_should_be_retried) == 1
print('Retrying..')
requeue_job(works_that_should_be_retried[0].id, conn)
worker.work(burst=True)
assert fq.jobs[0].next_retry is None # this job should not be retried again
fq.remove(fq.jobs[0])
assert fq.count == 0
fq.jobs[0].delete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment