Last active
April 21, 2016 16:37
-
-
Save zamzterz/d425b5f8a7e4159a77c0f33107659cbc to your computer and use it in GitHub Desktop.
Retry jobs with back off when using RQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime as dt | |
from time import sleep | |
from redis.client import StrictRedis | |
from rq.job import Job, JobStatus, requeue_job | |
from rq.queue import Queue, get_failed_queue | |
from rq.utils import utcnow | |
from rq.worker import SimpleWorker | |
class JobWithBackOffRetry(Job): | |
""" | |
Custom job class with support for keeping track of when a retry should be made. | |
Requires https://github.com/nvie/rq/pull/694 | |
A "back off scheme", which is a series of integers defining the number of seconds before the next retry should | |
be attempted, is used as a basis for defining a 'next_retry' attribute. | |
The 'next_retry' if set, will be a datetime.datetime instance defining the earliest point in time which | |
the job should be retried. If it is not set, it means the task should not be retried again. | |
A use case for this would be a reoccurring script fetching all jobs from RQ's fail queue, finding all | |
jobs which have 'next_retry' < now(). This can then be passed to rq.job.requeue or to the command line tool | |
`rq requeue`. Any JobWithBackOffRetry where next_retry is None should be removed removed from the failed queue | |
with FailedQueue.remove, and can be removed from Redis with Job.remove. | |
""" | |
KEY_BACK_OFF_INDEX = 'back_off_index' | |
KEY_BACK_OFF_SCHEME = 'back_off_scheme' | |
KEY_NEXT_RETRY = 'next_retry' | |
# Job construction | |
@classmethod | |
def create(cls, func, args=None, kwargs=None, connection=None, | |
result_ttl=None, ttl=None, status=None, description=None, | |
depends_on=None, timeout=None, id=None, origin=None, | |
back_off_scheme=None): | |
job = super().create(func, args, kwargs, connection, | |
result_ttl, ttl, status, description, | |
depends_on, timeout, id, origin) | |
job.back_off_scheme = back_off_scheme or (1, 10, 300, 600, 1800, 3600) | |
return job | |
def __init__(self, id=None, connection=None): | |
super().__init__(id, connection) | |
self._back_off_index = 0 | |
self.next_retry = None | |
def set_status(self, status, pipeline=None): | |
super().set_status(status, pipeline) | |
if status == JobStatus.FAILED: | |
self._update_next_retry() | |
def refresh(self): | |
super().refresh() | |
self._back_off_index = self.meta[self.KEY_BACK_OFF_INDEX] | |
self.back_off_scheme = self.meta[self.KEY_BACK_OFF_SCHEME] | |
self.next_retry = self.meta.get(self.KEY_NEXT_RETRY, None) | |
def to_dict(self): | |
self.meta[self.KEY_BACK_OFF_INDEX] = self._back_off_index | |
self.meta[self.KEY_BACK_OFF_SCHEME] = self.back_off_scheme | |
self.meta[self.KEY_NEXT_RETRY] = self.next_retry | |
return super().to_dict() | |
def _update_next_retry(self): | |
if self._back_off_index < len(self.back_off_scheme): | |
back_off_value = self.back_off_scheme[self._back_off_index] | |
self.next_retry = utcnow() + dt.timedelta(seconds=back_off_value) | |
self._back_off_index += 1 | |
else: | |
self.next_retry = None | |
def __repr__(self): | |
return 'Job({0!r}, next_retry={1!r})'.format(self._id, self.next_retry) | |
conn = StrictRedis() | |
q = Queue(connection=conn, job_class=JobWithBackOffRetry) | |
fq = get_failed_queue(conn) | |
# CAUTION: if you have any queued jobs before running this script, these will be removed | |
q.empty() | |
fq.empty() | |
# create job that will always fail, due to an exception | |
back_off_scheme = (1, 2, 3) | |
job = JobWithBackOffRetry.create(func=base64.b64encode, args=('a',), connection=conn, back_off_scheme=back_off_scheme) | |
q.enqueue_job(job) | |
worker = SimpleWorker([q], connection=q.connection, job_class=JobWithBackOffRetry) | |
worker.work(burst=True) | |
for wait_time in back_off_scheme: | |
sleep(wait_time) | |
works_that_should_be_retried = [job for job in fq.jobs if | |
isinstance(job, JobWithBackOffRetry) and | |
job.next_retry is not None and | |
utcnow() > job.next_retry] | |
assert len(works_that_should_be_retried) == 1 | |
print('Retrying..') | |
requeue_job(works_that_should_be_retried[0].id, conn) | |
worker.work(burst=True) | |
assert fq.jobs[0].next_retry is None # this job should not be retried again | |
fq.remove(fq.jobs[0]) | |
assert fq.count == 0 | |
fq.jobs[0].delete() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment