Created
October 21, 2024 15:18
-
-
Save mperham/c622ace4596105d5df48eb8f3d422c5a to your computer and use it in GitHub Desktop.
sidekiq clustering and memory monitoring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'sidekiq' | |
require 'sidekiq/cli' | |
# Default to running one process per core | |
def process_count | |
return ENV['SIDEKIQ_COUNT'].to_i unless ENV['SIDEKIQ_COUNT'].to_i == 0 | |
require 'etc' | |
Etc.nprocessors | |
end | |
def log | |
$stderr.write "Supervisor[#{$$}] " | |
$stderr.puts yield | |
end | |
def handle_sig(sig) | |
# If we're shutting down, we don't need to respawn child processes that die | |
if sig == "INT" || sig == "TERM" | |
@watch_children = false | |
end | |
@pids.each do |pid| | |
Process.kill(sig, pid) | |
end | |
end | |
def fork_child | |
Process.fork do | |
begin | |
cli = Sidekiq::CLI.instance | |
cli.parse | |
cli.run | |
rescue => e | |
raise e if $DEBUG | |
log { e.message } | |
log { e.backtrace.join("\n") } | |
exit 1 | |
end | |
end | |
end | |
def start_children | |
process_count.times.map do | |
fork_child | |
end | |
end | |
# Limit each child process to a chunk of RAM | |
@memory_percentage_limit = ENV['SIDEKIQ_MB_LIMIT'].nil? ? 2048 : ENV['SIDEKIQ_MB_LIMIT'].to_i | |
count = process_count | |
log { "Starting Mastodon Sidekiq cluster with #{process_count} process#{count > 1 ? "es" : ""}" } | |
@pids = start_children | |
@watch_children = true | |
sleep 1 | |
Thread.new do | |
log { "Monitoring child PIDs: #{@pids}" } | |
while @watch_children do | |
@pids.each do |pid| | |
memory_percent_used = `ps -o %mem= -p #{pid}`.to_f | |
if memory_percent_used == 0.0 # child died | |
@pids.delete(pid) | |
@pids << new_pid = fork_child | |
log { "Replaced lost PID #{pid} with #{new_pid}" } | |
elsif memory_percent_used > @memory_percentage_limit | |
log { "PID #{pid} crossed memory threshold -- replacing" } | |
@pids.delete(pid) | |
Process.kill("TSTP", pid) | |
sleep 5 | |
Process.kill("TERM", pid) | |
@pids << new_pid = fork_child | |
elsif $DEBUG | |
log { "#{pid}: #{memory_percent_used.round(2)}" } | |
end | |
end | |
puts "" if $DEBUG | |
sleep 10 | |
end | |
log { "Child supervisor stopping" } | |
end | |
%w(INT TSTP TERM).each do |sig| | |
Signal.trap(sig) do | |
handle_sig(sig) | |
end | |
end | |
Process.waitall | |
log { "Stopping Sidekiq cluster" } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment