Last active
October 14, 2023 13:36
-
-
Save joezuntz/e7e7764e5b591ed519cfd488e20311f1 to your computer and use it in GitHub Desktop.
Python decorator to let you run a function that may kill/hang in a subprocess, isolating it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
import os | |
import functools | |
from queue import Empty | |
class TimeoutError(Exception): | |
pass | |
def optionally_run_in_subprocess(f): | |
"""A decorator adding a kwarg to a function that makes it run in a subprocess. | |
This can be useful when you have a function that may segfault. | |
""" | |
# This functools.wraps command makes this whole thing work | |
# as a well-behaved decorator, maintaining the original function | |
# name and docstring. | |
@functools.wraps(f) | |
def wrapper(*args, **kwargs): | |
# Two extra kwargs, one public, are used for implementation. | |
# One indicates that the user wants to run in a subprocess, the other | |
# that the functtion is being called in the subprocess alreay. | |
# The latter is the queue where the result gets posted to. | |
run_in_subprocess = kwargs.pop('run_in_subprocess', False) | |
subprocess_timeout = kwargs.pop('subprocess_timeout', None) | |
queue = kwargs.pop('__queue', None) | |
# create the machinery python uses to fork a subprocess | |
# and run a function in it. | |
if run_in_subprocess: | |
q = mp.Queue() | |
p = mp.Process(target=wrapper, args=args, kwargs={"run_in_subprocess":False, "__queue":q, **kwargs}) | |
# Because the use of this is avoiding crashes, rather than performance / parallelization | |
# we wait for the subproces result immediately. | |
p.start() | |
try: | |
result = q.get(timeout=subprocess_timeout) | |
p.join() | |
except Empty: | |
p.terminate() | |
raise TimeoutError("Function {} timed out with args: {}, {}".format(f, args, kwargs)) | |
# Pass on any exception raised in the subprocess | |
if isinstance(result, BaseException): | |
raise result | |
return result | |
else: | |
# Run the function. Eiher we are in the subprocess already or the user | |
# does not want to run in the subproc. | |
try: | |
result = f(*args, **kwargs) | |
except BaseException as error: | |
# If running in standard mode just raise exceptions | |
# as normal | |
if queue is None: | |
raise | |
# Otherwise we pass the exception back to the caller | |
# in place of the result | |
result = error | |
# This is optional, so the function can still just be called | |
# normally with no effect. | |
if queue is not None: | |
queue.put(result) | |
return result | |
return wrapper | |
# Example | |
class Poly: | |
def __init__(self, a, b, c): | |
self.a = a | |
self.b = b | |
self.c = c | |
@optionally_run_in_subprocess | |
def __call__(self, x): | |
# we introduce a fatal flaw in this function | |
# that hangs is forever in some cases | |
if x>10: | |
while True: | |
pass | |
if x<0: | |
raise ValueError("Bad input.") | |
return self.a*x**2 + self.b*x + self.c | |
if __name__ == '__main__': | |
P = Poly(1,2,1) | |
x = 9 | |
x = P(9, run_in_subprocess=False) | |
y = P(9, run_in_subprocess=True) | |
print(x) | |
print(y) | |
w = P(-1, run_in_subprocess=True) | |
try: | |
x = P(11, run_in_subprocess=True, subprocess_timeout=3.0) | |
except TimeoutError: | |
x = 0.0 | |
print(x) | |
@optionally_run_in_subprocess | |
def segfaults(): | |
import ctypes;ctypes.string_at(0) | |
try: | |
z = segfaults(run_in_subprocess=True, subprocess_timeout=3.0) | |
except TimeoutError: | |
print("Looks like the function 'segfaults' has some kind of problem. Weird.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment