Created
October 19, 2014 23:00
-
-
Save tamoyal/33df0699327e503d3cf9 to your computer and use it in GitHub Desktop.
Some Resque helpers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# There are Minitest tests for reliably_enqueue below the module | |
# Note that this isn't well tested because it was used briefly for maintenance and we ended up killing Resque | |
# I apologize in advance for any off-by-one errors | |
module Resque | |
SLEEP_BETWEEN_RETRY = 0.1.freeze | |
RETRY = 3 | |
NUM_PROCS_FOR_FAST_ENQUEUE = 10 | |
class << self | |
def reliably_enqueue(klass, *payload) | |
retries = RETRY | |
begin | |
Sidewalk.log "Enqueuing #{klass}: #{payload.inspect}" | |
return Resque.enqueue(klass, *payload) | |
rescue Redis::CannotConnectError, Redis::TimeoutError => ex | |
retries -= 1 | |
(retries > 0) || raise | |
sleep SLEEP_BETWEEN_RETRY | |
retry | |
end | |
end | |
# Note: Spawning multiple threads did not speed this up at all ...maybe a ruby Resque client library shortcoming? | |
# For now, forks | |
def fast_enqueue(klass, payloads) | |
if payloads.length < NUM_PROCS_FOR_FAST_ENQUEUE | |
payloads.each { |payload| Resque.reliably_enqueue(klass, *payload) } | |
else | |
approx_slice_size = payloads.length / NUM_PROCS_FOR_FAST_ENQUEUE | |
batches = payloads.each_slice(approx_slice_size).to_a | |
pids = [] | |
batches.each do |batch| | |
config = ActiveRecord::Base.remove_connection | |
pids << fork do | |
REDIS.client.reconnect | |
ActiveRecord::Base.establish_connection(config) | |
puts "Created process for #{batch.length} payloads" | |
batch.each do |payload| | |
Resque.reliably_enqueue(klass, *payload) | |
end | |
ActiveRecord::Base.remove_connection | |
end | |
ActiveRecord::Base.establish_connection(config) | |
end | |
pids.each do |pid| | |
Process.waitpid2(pid) | |
end | |
end | |
end | |
def requeue_and_clear_all_failures | |
(Resque::Failure.count-1).downto(0).each { |i| Resque::Failure.requeue(i) } | |
Resque::Failure.clear | |
end | |
def clear_jobs_of_class(klass) | |
i=0 | |
while job = Resque::Failure.all(i) | |
if job['payload']['class']== klass.to_s | |
Resque::Failure.remove(i) | |
else | |
i += 1 | |
end | |
end | |
end | |
def remove_jobs_with_exception_string(s) | |
i=0 | |
while job = Resque::Failure.all(i) | |
if job['exception'] == s | |
Resque::Failure.remove(i) | |
else | |
i += 1 | |
end | |
end | |
end | |
def requeue_and_clear_n_jobs(n) | |
i = 0 | |
while job = Resque::Failure.all(0) and i < n | |
puts "Requeuing #{job.inspect}" | |
Resque::Failure.requeue(i) | |
Resque::Failure.remove(i) | |
i += 1 | |
end | |
end | |
def requeue_and_clear_n_jobs_of_class(n, klass) | |
i=0 | |
cleared=0 | |
while job = Resque::Failure.all(i) and cleared < n | |
if job['payload']['class'] == klass.to_s | |
puts "Requeuing #{job.inspect}" | |
Resque::Failure.requeue(i) | |
Resque::Failure.remove(i) | |
cleared += 1 | |
else | |
puts "Skipping #{job['payload']['class']}" | |
i += 1 | |
end | |
end | |
puts "Requeued #{cleared} jobs of class #{klass}" | |
end | |
end | |
end | |
# Minitest tests for reliably_enqueue | |
describe Resque do | |
class FakeJob; end | |
class FakeException < StandardError; end | |
describe "reliably_enqueue" do | |
it "should fail after exception is raised max times" do | |
Resque.stubs(:enqueue).raises(Redis::CannotConnectError.new) | |
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise Redis::CannotConnectError | |
end | |
it "should try 3 times" do | |
Resque.stubs(:enqueue) | |
.raises(Redis::CannotConnectError) | |
.then.raises(Redis::CannotConnectError) | |
.then.raises(Redis::CannotConnectError) | |
.then.raises(FakeException.new) | |
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise Redis::CannotConnectError | |
end | |
it "should fail if uncaught exception is raised" do | |
Resque.stubs(:enqueue).raises(FakeException.new) | |
->{ Resque.reliably_enqueue FakeJob, 1 }.must_raise FakeException | |
end | |
it "should execute once if a job is successfully enqueued" do | |
@count = 0 | |
Resque.stubs(:enqueue).returns(@count+=1) | |
Resque.reliably_enqueue FakeJob, 1 | |
@count.must_be :==, 1 | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment