A Python package for easy multiprocessing, but faster than multiprocessing

Overview

MPIRE (MultiProcessing Is Really Easy)

Build status Docs status

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package. It combines the convenient map like functions of multiprocessing.Pool with the benefits of using copy-on-write shared objects of multiprocessing.Process, together with easy-to-use worker state, worker insights, and progress bar functionality.

Full documentation is available at https://slimmer-ai.github.io/mpire/.

Features

  • Faster execution than other multiprocessing libraries. See benchmarks.
  • Intuitive, Pythonic syntax
  • Multiprocessing with map/map_unordered/imap/imap_unordered functions
  • Easy use of copy-on-write shared objects with a pool of workers
  • Each worker can have its own state and with convenient worker init and exit functionality this state can be easily manipulated (e.g., to load a memory-intensive model only once for each worker without the need of sending it through a queue)
  • Progress bar support using tqdm
  • Progress dashboard support
  • Worker insights to provide insight into your multiprocessing efficiency
  • Graceful and user-friendly exception handling
  • Automatic task chunking for all available map functions to speed up processing of small task queues (including numpy arrays)
  • Adjustable maximum number of active tasks to avoid memory problems
  • Automatic restarting of workers after a specified number of tasks to reduce memory footprint
  • Nested pool of workers are allowed when setting the daemon option
  • Child processes can be pinned to specific or a range of CPUs
  • Optionally utilizes dill as serialization backend through multiprocess, enabling parallelizing more exotic objects, lambdas, and functions in iPython and Jupyter notebooks.

Installation

Note

MPIRE currently only supports Linux based operating systems that support 'fork' as start method. Support for Windows is coming soon.

Through pip (PyPi):

pip install mpire

From source:

python setup.py install

Getting started

Suppose you have a time consuming function that receives some input and returns its results. Simple functions like these are known as embarrassingly parellel problems, functions that require little to no effort to turn into a parellel task. Parallelizing a simple function as this can be as easy as importing multiprocessing and using the multiprocessing.Pool class:

import time
from multiprocessing import Pool

def time_consuming_function(x):
    time.sleep(1)  # Simulate that this function takes long to complete
    return ...

with Pool(processes=5) as pool:
    results = pool.map(time_consuming_function, range(10))

MPIRE can be used almost as a drop-in replacement to multiprocessing. We use the mpire.WorkerPool class and call one of the available map functions:

from mpire import WorkerPool

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10))

The differences in code are small: there's no need to learn a completely new multiprocessing syntax, if you're used to vanilla multiprocessing. The additional available functionality, though, is what sets MPIRE apart.

Progress bar

Suppose we want to know the status of the current task: how many tasks are completed, how long before the work is ready? It's as simple as setting the progress_bar parameter to True:

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10), progress_bar=True)

And it will output a nicely formatted tqdm progress bar. In case you're running your code inside a notebook it will automatically switch to a widget.

MPIRE also offers a dashboard, for which you need to install additional dependencies. See Dashboard for more information.

Shared objects

If you have one or more objects that you want to share between all workers you can make use of the copy-on-write shared_objects option of MPIRE. MPIRE will pass on these objects only once for each worker without copying/serialization. Only when you alter the object in the worker function it will start copying it for that worker.

def time_consuming_function(some_object, x):
    time.sleep(1)  # Simulate that this function takes long to complete
    return ...

def main():
    some_object = ...
    with WorkerPool(n_jobs=5, shared_objects=some_object) as pool:
        results = pool.map(time_consuming_function, range(10), progress_bar=True)

See shared_objects for more details.

Worker initialization

Workers can be initialized using the worker_init feature. Together with worker_state you can load a model, or set up a database connection, etc.:

def init(worker_state):
    # Load a big dataset or model and store it in a worker specific worker_state
    worker_state['dataset'] = ...
    worker_state['model'] = ...

def task(worker_state, idx):
    # Let the model predict a specific instance of the dataset
    return worker_state['model'].predict(worker_state['dataset'][idx])

with WorkerPool(n_jobs=5, use_worker_state=True) as pool:
    results = pool.map(task, range(10), worker_init=init)

Similarly, you can use the worker_exit feature to let MPIRE call a function whenever a worker terminates. You can even let this exit function return results, which can be obtained later on. See the worker_init and worker_exit section for more information.

Worker insights

When you're multiprocessing setup isn't performing as you want it to and you have no clue what's causing it, there's the worker insights functionality. This will give you insight in your setup, but it will not profile the function you're running (there are other libraries for that). Instead, it profiles the worker start up time, waiting time and working time. When worker init and exit functions are provided it will time those as well.

Perhaps you're sending a lot of data over the task queue, which makes the waiting time go up. Whatever the case, you can enable and grab the insights using the enable_insights flag and mpire.WorkerPool.get_insights function, respectively:

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10), enable_insights=True)
    insights = pool.get_insights()

See worker insights for a more detailed example and expected output.

Documentation

See the full documentation at https://slimmer-ai.github.io/mpire/ for information on all the other features of MPIRE.

If you want to build the documentation yourself, please install the documentation dependencies by executing:

pip install mpire[docs]

or

pip install .[docs]

Documentation can then be build by executing:

python setup.py build_docs

Documentation can also be build from the docs folder directly. In that case MPIRE should be installed and available in your current working environment. Then execute:

make html

in the docs folder.

Backport of the concurrent.futures package to Python 2.6 and 2.7

This is a backport of the concurrent.futures standard library module to Python 2. It does not work on Python 3 due to Python 2 syntax being used in th

Alex Grönholm 224 Nov 07, 2022
A concurrent sync tool which works with multiple sources and targets.

Concurrent Sync A concurrent sync tool which works similar to rsync. It supports syncing given sources with multiple targets concurrently. Requirement

Halit Şimşek 2 Jan 11, 2022
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio.

AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrenc

Alex Grönholm 1.1k Jan 06, 2023
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centrali

102 Jan 06, 2023
SCOOP (Scalable COncurrent Operations in Python)

SCOOP (Scalable COncurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from h

Yannick Hold 573 Dec 27, 2022
A Python package for easy multiprocessing, but faster than multiprocessing

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package.

753 Dec 29, 2022
Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

TUNiB 559 Dec 28, 2022
Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

unsync Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes. Quick Overview Functions marked with the @

Alex Sherman 802 Dec 28, 2022
Raise asynchronous exceptions in other thread, control the timeout of blocks or callables with a context manager or a decorator

stopit Raise asynchronous exceptions in other threads, control the timeout of blocks or callables with two context managers and two decorators. Attent

Gilles Lenfant 97 Oct 12, 2022
Thread-safe asyncio-aware queue for Python

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous

aio-libs 665 Jan 03, 2023
Python Multithreading without GIL

Multithreaded Python without the GIL

Sam Gross 2.3k Jan 05, 2023
aiomisc - miscellaneous utils for asyncio

aiomisc - miscellaneous utils for asyncio Miscellaneous utils for asyncio. The complete documentation is available in the following languages: English

aiokitchen 295 Jan 08, 2023
rosny is a lightweight library for building concurrent systems.

rosny is a lightweight library for building concurrent systems. Installation Tested on: Linux Python = 3.6 From pip: pip install rosny From source: p

Ruslan Baikulov 6 Oct 05, 2021
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

5k Jan 07, 2023
Jug: A Task-Based Parallelization Framework

Jug: A Task-Based Parallelization Framework Jug allows you to write code that is broken up into tasks and run different tasks on different processors.

Luis Pedro Coelho 387 Dec 21, 2022
🌀 Pykka makes it easier to build concurrent applications.

🌀 Pykka Pykka makes it easier to build concurrent applications. Pykka is a Python implementation of the actor model. The actor model introduces some

Stein Magnus Jodal 1.1k Dec 30, 2022
A curated list of awesome Python asyncio frameworks, libraries, software and resources

Awesome asyncio A carefully curated list of awesome Python asyncio frameworks, libraries, software and resources. The Python asyncio module introduced

Timo Furrer 3.8k Jan 08, 2023
Simple package to enhance Python's concurrent.futures for memory efficiency

future-map is a Python library to use together with the official concurrent.futures module.

Arai Hiroki 2 Nov 15, 2022
Ultra fast asyncio event loop.

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood. The project d

magicstack 9.1k Jan 07, 2023