Created
October 25, 2017 02:02
-
-
Save speedplane/224eb551c51a74068011f4d776237513 to your computer and use it in GitHub Desktop.
Celery Autoscaler Based on Memory and System Load
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import re | |
from celery import Celery | |
from celery.worker.autoscale import Autoscaler as CeleryAutoscaler | |
class DAAutoscaler(CeleryAutoscaler): | |
# Try to keep the load above this point. | |
LOAD_MIN = .8 | |
# Try to keep the load below this. | |
LOAD_MAX = 1.1 | |
# We need this percentage of free memory to scale up. | |
MEM_FREE_SCALE_UP = .3 | |
# Any less than this memory and we scale down. | |
MEM_FREE_SCALE_DOWN = .2 | |
def __init__(self, *args, **kwargs): | |
self.num_cpus = multiprocessing.cpu_count() | |
logging.info("DAAutoscaler: Num CPUs %s", self.num_cpus) | |
super(DAAutoscaler, self).__init__(*args, **kwargs) | |
def _maybe_scale(self, req = None): | |
'''Scale up or down if we too much/little load or memory.''' | |
cur_load = self._get_load() | |
mem_free = self._get_free_mem() | |
if cur_load < self.LOAD_MIN and mem_free > self.MEM_FREE_SCALE_UP: | |
mul = int(self.LOAD_MAX / cur_load) | |
logging.info("DAAutoscaler: Scale Up %dX %.2f free=%.2f%%", | |
mul, cur_load, 100*mem_free) | |
self.scale_up(1) | |
return True | |
if cur_load > self.LOAD_MAX or mem_free < self.MEM_FREE_SCALE_DOWN: | |
mul = int(cur_load / self.LOAD_MAX) | |
logging.info("DAAutoscaler: Scale Down %dX %.2f free=%.2f%%", | |
mul, cur_load, 100*mem_free) | |
self.scale_down(mul) | |
return True | |
logging.info("DAAutoscaler: Ok %.2f .2f%%", cur_load, 100*mem_free) | |
def _get_load(self): | |
load1min, load5min, load15min = os.getloadavg() | |
# Prevent divide by zero | |
if load1min < 0.001: | |
load1min = 0.001 | |
return 1.0 * load1min / self.num_cpus | |
re_total = re.compile(r"MemTotal:\s+(?P<total>\d+)\s+kB") | |
re_free = re.compile(r"MemFree:\s+(?P<free>\d+)\s+kB") | |
def _get_free_mem(self): | |
'''Return percentage of free memory 0.0 to 1.0.''' | |
try: | |
# Try using the cross platform method. | |
import psutil | |
except ImportError: | |
# If not, make it work for most linux distros. | |
with open('/proc/meminfo', 'rb') as f: | |
mem = f.read() | |
return (1.0 * int(self.re_free.search(mem).group("free")) / | |
int(self.re_total.search(mem).group("total"))) | |
else: | |
return psutil.virtual_memory().percent / 100 |
Hey great work and thanks very much for publishing this !
Am I missing something or -
If the queue is empty, won't this just scale up indefinitely (or until idle workers exhaust resources) ?
No reference to min/max worker limits (self.max/min_concurrency
) or number of reserved tasks (self.qty
) ?
Thanks again ๐ ๐
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@speedplane great work!
I guess that you have mistake in line 60:
psutil.virtual_memory().percen't
doesnt return free memory but total usage.from description of virtual_memory: