"""
I don't want to be reinventing the wheel but I can't find a satisfying implementation.
I want to be able to execute arbitrary functions asynchronously on a different process.
Any ongoing subprocesses must immediately be able to be terminated without errors.
Results of cancelled subprocesses may be ignored.
`concurrent.futures.ProcessPoolExecutor` gets very close to the desired implementation,
but it has issues:
- by default it waits for subprocesses to close on __exit__.
Unfortunately it is possible the subprocesses can be running non-Python code,
e.g. a C implementation of SVC whose subprocess won't end until fit is complete.
- even if that is overwritten and no wait is performed,
the subprocess will raise an error when it is done.
Though that does not hinder the execution of the program,
I don't want errors for expected behavior.
"""
import datetime
import gc
import logging
import multiprocessing
import os
import psutil
import queue
import struct
import time
import traceback
from typing import Optional, Callable, Dict, List
import uuid
from psutil import NoSuchProcess
try:
import resource
except ModuleNotFoundError:
resource = None # type: ignore
log = logging.getLogger(__name__)
class AsyncFuture:
""" Reference to a function call executed on a different process. """
def __init__(self, fn, *args, **kwargs):
self.id = uuid.uuid4()
self.fn = fn
self.args = args
self.kwargs = kwargs
self.result = None
self.exception = None
self.traceback = None
def execute(self, extra_kwargs):
""" Execute the function call `fn(*args, **kwargs)` and record results. """
try:
# Don't update self.kwargs, as it will be pickled back to the main process
kwargs = {**self.kwargs, **extra_kwargs}
self.result = self.fn(*self.args, **kwargs)
except Exception as e:
self.exception = e
self.traceback = traceback.format_exc()
[docs]class AsyncEvaluator:
""" Manages subprocesses on which arbitrary functions can be evaluated.
The function and all its arguments must be picklable.
Using the same AsyncEvaluator in two different contexts raises a `RuntimeError`.
defaults: Dict, optional (default=None)
Default parameter values shared between all submit calls.
This allows these defaults to be transferred only once per process,
instead of twice per call (to and from the subprocess).
Only supports keyword arguments.
"""
defaults: Dict = {}
def __init__(
self,
n_workers: Optional[int] = None,
memory_limit_mb: Optional[int] = None,
logfile: Optional[str] = None,
wait_time_before_forced_shutdown: int = 10,
):
"""
Parameters
----------
n_workers : int, optional (default=None)
Maximum number of subprocesses to run for parallel evaluations.
Defaults to `AsyncEvaluator.n_jobs`, using all cores unless overwritten.
memory_limit_mb : int, optional (default=None)
The maximum number of megabytes that this process and its subprocesses
may use in total. If None, no limit is enforced.
There is no guarantee the limit is not violated.
logfile : str, optional (default=None)
If set, recorded resource usage will be written to this file.
wait_time_before_forced_shutdown : int (default=10)
Number of seconds to wait between asking the worker processes to shut down
and terminating them forcefully if they failed to do so.
"""
self._has_entered = False
self.futures: Dict[uuid.UUID, AsyncFuture] = {}
self._processes: List[psutil.Process] = []
self._n_jobs = n_workers
self._memory_limit_mb = memory_limit_mb
self._mem_violations = 0
self._mem_behaved = 0
self._logfile = logfile
self._wait_time_before_forced_shutdown = wait_time_before_forced_shutdown
self._input: multiprocessing.Queue = multiprocessing.Queue()
self._output: multiprocessing.Queue = multiprocessing.Queue()
self._command: multiprocessing.Queue = multiprocessing.Queue()
pid = os.getpid()
self._main_process = psutil.Process(pid)
def __enter__(self):
if self._has_entered:
raise RuntimeError(
"You can not use the same AsyncEvaluator in two different contexts."
)
self._has_entered = True
self._input = multiprocessing.Queue()
self._output = multiprocessing.Queue()
log.debug(
f"Process {self._main_process.pid} starting {self._n_jobs} subprocesses."
)
for _ in range(self._n_jobs):
self._start_worker_process()
self._log_memory_usage()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
log.debug(f"Signaling {len(self._processes)} subprocesses to stop.")
for _ in self._processes:
self._command.put("stop")
for i in range(self._wait_time_before_forced_shutdown + 1):
if self._command.empty():
break
time.sleep(1)
else:
# A non-empty command queue indicates a process(es) was unable to shut down.
# All processes need to be terminated to free resources.
for process in self._processes:
try:
process.terminate()
except psutil.NoSuchProcess:
pass
return False
def submit(self, fn: Callable, *args, **kwargs) -> AsyncFuture:
""" Submit fn(*args, **kwargs) to be evaluated on a subprocess.
Parameters
----------
fn: Callable
Function to call on a subprocess.
args
Positional arguments to call `fn` with.
kwargs
Keyword arguments to call `fn` with.
Returns
-------
AsyncFuture
A Future of which the `result` or `exception` field will be populated
once evaluation is finished.
"""
future = AsyncFuture(fn, *args, **kwargs)
self.futures[future.id] = future
self._input.put(future)
return future
def wait_next(self, poll_time: float = 0.05) -> AsyncFuture:
""" Wait until an AsyncFuture has been completed and return it.
Parameters
----------
poll_time: float (default=0.05)
Time to sleep between checking if a future has been completed.
Returns
-------
AsyncFuture
The completed future that completed first.
Raises
------
RuntimeError
If all futures have already been completed and returned.
"""
if len(self.futures) == 0:
raise RuntimeError("No Futures queued, must call `submit` first.")
while True:
self._control_memory_usage()
self._log_memory_usage()
try:
completed_future = self._output.get(block=False)
except queue.Empty:
time.sleep(poll_time)
continue
match = self.futures.pop(completed_future.id)
match.result, match.exception, match.traceback = (
completed_future.result,
completed_future.exception,
completed_future.traceback,
)
self._mem_behaved += 1
return match
def _start_worker_process(self) -> psutil.Process:
""" Start a new worker node and add it to the process pool. """
mp_process = multiprocessing.Process(
target=evaluator_daemon,
args=(self._input, self._output, self._command, AsyncEvaluator.defaults),
daemon=True,
)
mp_process.start()
subprocess = psutil.Process(mp_process.pid)
self._processes.append(subprocess)
return subprocess
def _stop_worker_process(self, process: psutil.Process):
""" Terminate a new worker node and remove it from the process pool. """
process.terminate()
self._processes.remove(process)
def _control_memory_usage(self, threshold=0.05):
""" Dynamically restarts or kills processes to adhere to memory constraints. """
if self._memory_limit_mb is None:
return
# If the memory usage of all processes (the main process, and the evaluation
# subprocesses) exceeds the maximum allowed memory usage, we have to terminate
# one of them.
# If we were never to start new processes, eventually all subprocesses would
# likely be killed due to 'silly' pipelines (e.g. multiple polynomial feature
# steps).
# On the other hand if there is e.g. a big dataset, by always restarting we
# will set up the same scenario for failure over and over again.
# So we want to dynamically find the right amount of evaluation processes, such
# that the total memory usage is not exceeded "too often".
# Here `threshold` defines the ratio of processes that should be allowed to
# fail due to memory constraints. Setting it too high might lead to aggressive
# subprocess killing and underutilizing compute resources. If it is too low,
# the number of concurrent jobs might shrink too slowly inducing a lot of
# loss in compute time due to interrupted evaluations.
# ! Like the rest of this module, I hate to use custom code with this,
# in particular there is a risk that terminating the process might leave
# the multiprocess queue broken.
mem_proc = list(self._get_memory_usage())
if sum(map(lambda x: x[1], mem_proc)) > self._memory_limit_mb:
log.info(
f"GAMA exceeded memory usage "
f"({self._mem_violations}, {self._mem_behaved})."
)
self._log_memory_usage()
self._mem_violations += 1
# Find the process with the most memory usage, that is not the main process
proc, _ = max(mem_proc[1:], key=lambda t: t[1])
n_evaluations = self._mem_violations + self._mem_behaved
fail_ratio = self._mem_violations / n_evaluations
if fail_ratio < threshold or len(self._processes) == 1:
# restart `pid`
log.info(f"Terminating {proc.pid} due to memory usage.")
self._stop_worker_process(proc)
log.info("Starting new evaluations process.")
self._start_worker_process()
else:
# More than one process left alive and a violation of the threshold,
# requires killing a subprocess.
self._mem_behaved = 0
self._mem_violations = 0
log.info(f"Terminating {proc.pid} due to memory usage.")
self._stop_worker_process(proc)
# todo: update the Future of the evaluation that was terminated.
def _log_memory_usage(self):
if not self._logfile:
return
mem_by_pid = self._get_memory_usage()
mem_str = ",".join([f"{proc.pid},{mem_mb}" for (proc, mem_mb) in mem_by_pid])
timestamp = datetime.datetime.now().isoformat()
with open(self._logfile, "a") as memory_log:
memory_log.write(f"{timestamp},{mem_str}\n")
def _get_memory_usage(self):
processes = [self._main_process] + self._processes
for process in processes:
try:
yield process, process.memory_info()[0] / (2 ** 20)
except NoSuchProcess:
# can never be the main process anyway
self._processes = [p for p in self._processes if p.pid != process.pid]
self._start_worker_process()
def evaluator_daemon(
input_queue: queue.Queue,
output_queue: queue.Queue,
command_queue: queue.Queue,
default_parameters: Optional[Dict] = None,
):
""" Function for daemon subprocess that evaluates functions from AsyncFutures.
Parameters
----------
input_queue: queue.Queue[AsyncFuture]
Queue to get AsyncFuture from.
Queue should be managed by multiprocessing.manager.
output_queue: queue.Queue[AsyncFuture]
Queue to put AsyncFuture to.
Queue should be managed by multiprocessing.manager.
command_queue: queue.Queue[Str]
Queue to put commands for the subprocess.
Queue should be managed by multiprocessing.manager.
default_parameters: Dict, optional (default=None)
Additional parameters to pass to AsyncFuture.Execute.
This is useful to avoid passing lots of repetitive data through AsyncFuture.
"""
try:
while True:
try:
command_queue.get(block=False)
break
except queue.Empty:
pass
try:
future = input_queue.get(block=False)
future.execute(default_parameters)
if future.result:
if isinstance(future.result, tuple):
result = future.result[0]
else:
result = future.result
if isinstance(result.error, MemoryError):
# Can't pickle MemoryErrors. Should work around this later.
result.error = "MemoryError"
gc.collect()
output_queue.put(future)
except (MemoryError, struct.error) as e:
future.result = None
future.exception = str(type(e))
gc.collect()
output_queue.put(future)
except queue.Empty:
pass
except Exception as e:
# There are no plans currently for recovering from any exception:
print(f"Stopping daemon:{type(e)}:{str(e)}")
traceback.print_exc()