Source code for impscan.share.multiproc_utils

from __future__ import annotations

import multiprocessing as mp
from functools import partial
from multiprocessing import Pool, Process
from typing import Callable

from more_itertools import chunked
from tqdm import tqdm

__all__ = ["batch_multiprocess", "batch_multiprocess_with_return"]

[docs]def batch_multiprocess( funcs: list[Callable], n_cores: int = mp.cpu_count(), show_progress: bool = True, tqdm_desc: str | None = None, ) -> None: """ Run a list of functions on `n_cores` (default: all CPU cores), with the option to show a progress bar using tqdm (default: shown). """ iterator = [*chunked(funcs, n_cores)] if show_progress: iterator = tqdm(iterator, desc=tqdm_desc) for func_batch in iterator: procs = [] for f in func_batch: procs.append(Process(target=f)) for p in procs: p.start() for p in procs: p.join()
[docs]def batch_multiprocess_with_return( funcs: list[Callable], pool_results: list | None = None, n_cores: int = mp.cpu_count(), show_progress: bool = True, tqdm_desc: str | None = None, ) -> list: """ Run a list of functions on `n_cores` (default: all CPU cores), with the option to show a progress bar using tqdm (default: shown). """ iterator = [*chunked(funcs, n_cores)] pool_results = pool_results if pool_results else [] pool = Pool(processes=n_cores) if show_progress: iterator = tqdm(iterator, desc=tqdm_desc) for func_batch in iterator: procs = [] for f in func_batch: pool.apply_async(func=f, callback=pool_results.append) pool.close() pool.join() return pool_results