A low-impact profiler to figure out how much memory each task in Dask is using

Overview

dask-memusage

If you're using Dask with tasks that use a lot of memory, RAM is your bottleneck for parallelism. That means you want to know how much memory each task uses:

  1. So you can set the highest parallelism level (process or threads) for each machine, given available to RAM.
  2. In order to know where to focus memory optimization efforts.

dask-memusage is an MIT-licensed statistical memory profiler for Dask's Distributed scheduler that can help you with both these problems.

dask-memusage polls your processes for memory usage and records the minimum and maximum usage for each task in the Dask execution graph in a CSV:

task_key,min_memory_mb,max_memory_mb
"('from_sequence-map-sum-part-e15703211a549e75b11c63e0054b53e5', 0)",44.84765625,96.98046875
"('from_sequence-map-sum-part-e15703211a549e75b11c63e0054b53e5', 1)",47.015625,97.015625
"('sum-part-e15703211a549e75b11c63e0054b53e5', 0)",0,0
"('sum-part-e15703211a549e75b11c63e0054b53e5', 1)",0,0
sum-aggregate-apply-no_allocate-4c30eb545d4c778f0320d973d9fc8ea6,0,0
apply-no_allocate-4c30eb545d4c778f0320d973d9fc8ea6,47.265625,47.265625
task_key,min_memory_mb,max_memory_mb
"('from_sequence-map-sum-part-e15703211a549e75b11c63e0054b53e5', 0)",44.84765625,96.98046875
"('from_sequence-map-sum-part-e15703211a549e75b11c63e0054b53e5', 1)",47.015625,97.015625
"('sum-part-e15703211a549e75b11c63e0054b53e5', 0)",0,0
"('sum-part-e15703211a549e75b11c63e0054b53e5', 1)",0,0
sum-aggregate-apply-no_allocate-4c30eb545d4c778f0320d973d9fc8ea6,0,0
apply-no_allocate-4c30eb545d4c778f0320d973d9fc8ea6,47.265625,47.265625

You may also find the Fil memory profiler useful in tracking down which specific parts of your code are responsible for peak memory allocations.

Example

Here's a working standalone program using dask-memusage; notice you just need to add two lines of code:

from time import sleep
import numpy as np
from dask.bag import from_sequence
from dask import compute
from dask.distributed import Client, LocalCluster

from dask_memusage import install  # <-- IMPORT

def allocate_50mb(x):
    """Allocate 50MB of RAM."""
    sleep(1)
    arr = np.ones((50, 1024, 1024), dtype=np.uint8)
    sleep(1)
    return x * 2

def no_allocate(y):
    """Don't allocate any memory."""
    return y * 2

def make_bag():
    """Create a bag."""
    return from_sequence(
        [1, 2], npartitions=2
    ).map(allocate_50mb).sum().apply(no_allocate)

def main():
    cluster = LocalCluster(n_workers=2, threads_per_worker=1,
                           memory_limit=None)
    install(cluster.scheduler, "memusage.csv")  # <-- INSTALL
    client = Client(cluster)
    compute(make_bag())

if __name__ == '__main__':
    main()

Usage

Important: Make sure your workers only have a single thread! Otherwise the results will be wrong.

Installation

On the machine where you are running the Distributed scheduler, run:

$ pip install dask_memusage

Or if you're using Conda:

$ conda install -c conda-forge dask-memusage

API usage

# Add to your Scheduler object, which is e.g. your LocalCluster's scheduler
# attribute:
from dask_memoryusage import install
install(scheduler, "/tmp/memusage.csv")

CLI usage

$ dask-scheduler --preload dask_memusage --memusage.csv /tmp/memusage.csv

Limitations

  • Again, make sure you only have one thread per worker process.
  • This is statistical profiling, running every 10ms. Tasks that take less than that won't have accurate information.

Help

Need help? File a ticket at https://github.com/itamarst/dask-memusage/issues/new

You might also like...
Inkscape extensions for figure resizing and editing
Inkscape extensions for figure resizing and editing

Academic-Inkscape: Extensions for figure resizing and editing This repository contains several Inkscape extensions designed for editing plots. Scale P

Make a Turtlebot3 follow a figure 8 trajectory and create a robot arm and make it follow a trajectory
Make a Turtlebot3 follow a figure 8 trajectory and create a robot arm and make it follow a trajectory

HW2 - ME 495 Overview Part 1: Makes the robot move in a figure 8 shape. The robot starts moving when launched on a real turtlebot3 and can be paused a

Simple function to plot multiple barplots in the same figure.
Simple function to plot multiple barplots in the same figure.

Simple function to plot multiple barplots in the same figure. Supports padding and custom color.

A simple code for plotting figure, colorbar, and cropping with python
A simple code for plotting figure, colorbar, and cropping with python

Python Plotting Tools This repository provides a python code to generate figures (e.g., curves and barcharts) that can be used in the paper to show th

IPE is a simple tool for analyzing IP addresses. With IPE you can find out the server region, city, country, longitude and latitude and much more in seconds.

IPE is a simple tool for analyzing IP addresses. With IPE you can find out the server region, city, country, longitude and latitude and much more in seconds.

A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

InfraGenie is allows you to split out your infrastructure project into separate independent pieces, each with its own terraform state.
InfraGenie is allows you to split out your infrastructure project into separate independent pieces, each with its own terraform state.

🧞 InfraGenie InfraGenie is allows you to split out your infrastructure project into separate independent pieces, each with its own terraform state. T

Read and write rasters in parallel using Rasterio and Dask

dask-rasterio dask-rasterio provides some methods for reading and writing rasters in parallel using Rasterio and Dask arrays. Usage Read a multiband r

Demo of using DataLoader to prevent out of memory

Demo of using DataLoader to prevent out of memory

Code for HLA-Face: Joint High-Low Adaptation for Low Light Face Detection (CVPR21)
Code for HLA-Face: Joint High-Low Adaptation for Low Light Face Detection (CVPR21)

HLA-Face: Joint High-Low Adaptation for Low Light Face Detection The official PyTorch implementation for HLA-Face: Joint High-Low Adaptation for Low L

Official code of
Official code of "R2RNet: Low-light Image Enhancement via Real-low to Real-normal Network."

R2RNet Official code of "R2RNet: Low-light Image Enhancement via Real-low to Real-normal Network." Jiang Hai, Zhu Xuan, Ren Yang, Yutong Hao, Fengzhu

A library for low-memory inferencing in PyTorch.

Pylomin Pylomin (PYtorch LOw-Memory INference) is a library for low-memory inferencing in PyTorch. Installation ... Usage For example, the following c

:truck: Agile Data Preparation Workflows made easy with dask, cudf, dask_cudf and pyspark
:truck: Agile Data Preparation Workflows made easy with dask, cudf, dask_cudf and pyspark

To launch a live notebook server to test optimus using binder or Colab, click on one of the following badges: Optimus is the missing framework to prof

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library,  for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library,  for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

A high-level plotting API for pandas, dask, xarray, and networkx built on HoloViews
A high-level plotting API for pandas, dask, xarray, and networkx built on HoloViews

hvPlot A high-level plotting API for the PyData ecosystem built on HoloViews. Build Status Coverage Latest dev release Latest release Docs What is it?

A high-level plotting API for pandas, dask, xarray, and networkx built on HoloViews
A high-level plotting API for pandas, dask, xarray, and networkx built on HoloViews

hvPlot A high-level plotting API for the PyData ecosystem built on HoloViews. Build Status Coverage Latest dev release Latest release Docs What is it?

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library,  for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Turn a STAC catalog into a dask-based xarray

StackSTAC Turn a list of STAC items into a 4D xarray DataArray (dims: time, band, y, x), including reprojection to a common grid. The array is a lazy

Comments
  • send_recv_from_rpc() takes 0 positional arguments but 1 was given

    send_recv_from_rpc() takes 0 positional arguments but 1 was given

    Hi,

    I am following the instructions on the github site. First, I installed dask-memusage with pip install dask_memusage. I then create my dask cluster with cluster = LocalCluster(n_workers=3, threads_per_worker=1); client = Client(cluster). When I use install(client.scheduler, "/path/to/csv"), i get the following error:

    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/torresba/.pyenv/versions/3.8.4/lib/python3.8/site-packages/dask_memusage.py", line 123, in install
        scheduler.add_plugin(plugin)
    TypeError: send_recv_from_rpc() takes 0 positional arguments but 1 was given  
    

    Am I doing something wrong here?

    P.S: Also, I think there is a typo in the github site. Instead of from dask_memoryusage import install, I had to use from dask_memusage import install

    Thanks

    opened by kristiantorres 5
  • Typo in front page doc CLI usage line.

    Typo in front page doc CLI usage line.

    Change

    $ dask-scheduler --preload dask_memusage --memusage.csv /tmp/memusage.csv
    

    to

    $ dask-scheduler --preload dask_memusage --memusage-csv /tmp/memusage.csv
    
    opened by y-he2 1
  • Add dask_memusage.install introduce

    Add dask_memusage.install introduce "ValueError: Inputs contain futures that were created by another client."

    Thank you for the wonderful tool!

    I would like to profile peak memory of my dask application. I can run it successfully without dask_memusage. However, after I add memusage.install, it causes "ValueError: Inputs contain futures that were created by another client." I use dask-memusage v1.1, dask-core v2021.3.0.

    Attached my chunk of code here:

    import dask_memusage
    import gc
    from utility import get_batch_index
    from dask.distributed import Client, LocalCluster
    from sklearn.neighbors import NearestNeighbors
    
    
    CLUSTER_KWARGS = {
        'n_workers': 4,
        'threads_per_worker': 1,
        'processes': False,
        'memory_limit': '8GB',
    }
    
    cluster = LocalCluster(**CLUSTER_KWARGS)
    dask_memusage.install(cluster.scheduler, 'memory_stats/memusage.csv')
    
    def kNN_graph(X, key_index, ref_index, n_neighbors=10):
        gc.collect()
        nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(X[ref_index[0]:ref_index[1], :])
        distance, indices = nbrs.kneighbors(X[key_index[0]:key_index[1], :])
        return (distance, indices)
    
    
    contamination = 0.1  # percentage of outliers
    n_train = args.n_train  # number of training points
    n_test = 1000  # number of testing points
    n_features = args.dim
        
    # Generate sample data
    X_train, y_train, X_test, y_test = \
        generate_data(n_train=n_train,
                      n_test=n_test,
                      n_features=n_features,
                      contamination=contamination,
                      random_state=42)
    
    k = 5
    batch_size = 5000
    n_samples = n_train
    
    
    start = time.time()
    batch_index = get_batch_index(n_samples=n_samples, batch_size=batch_size)
    n_batches = len(batch_index)
    
    # save the intermediate results
    full_list = []
    
    # scatter the data
    future_X = client.scatter(X_train)
    
    delayed_knn =  delayed(kNN_graph)
    
    for i, index_A in enumerate(batch_index):
        for j, index_B in enumerate(batch_index):
            full_list.append(delayed_knn(future_X, index_A, index_B, k))
            
    full_list = dask.compute(full_list)
    
    
    opened by CAROLZXYZXY 2
  • Explain requirement for Distributed, and how to use LocalCluster

    Explain requirement for Distributed, and how to use LocalCluster

    If you want 8 worker processes:

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(n_workers=8, threads_per_worker=1, memory_limit=None)
    Client(cluster)
    
    opened by itamarst 0
Releases(v1.1)
Owner
Itamar Turner-Trauring
Helping software teams using Python to ship features faster.
Itamar Turner-Trauring
Silky smooth profiling for Django

Silk Silk is a live profiling and inspection tool for the Django framework. Silk intercepts and stores HTTP requests and database queries before prese

Jazzband 3.7k Jan 01, 2023
Pyccel stands for Python extension language using accelerators.

Pyccel stands for Python extension language using accelerators.

Pyccel 242 Jan 02, 2023
Shrapnel is a scalable, high-performance cooperative threading library for Python.

This Python library was evolved at IronPort Systems and has been provided as open source by Cisco Systems under an MIT license. Intro Shrapnel is a li

216 Nov 06, 2022
Django query profiler - one profiler to rule them all. Shows queries, detects N+1 and gives recommendations on how to resolve them

Django Query Profiler This is a query profiler for Django applications, for helping developers answer the question "My Django code/page/API is slow, H

Django Query Profiler 116 Dec 15, 2022
guapow is an on-demand and auto performance optimizer for Linux applications.

guapow is an on-demand and auto performance optimizer for Linux applications. This project's name is an abbreviation for Guarana powder (Guaraná is a fruit from the Amazon rainforest with a highly ca

Vinícius Moreira 19 Nov 18, 2022
This tool allows to gather statistical profile of CPU usage of mixed native-Python code.

Sampling Profiler for Python This tool allows to gather statistical profile of CPU usage of mixed native-Python code. Currently supported platforms ar

Intel Corporation 13 Oct 04, 2022
Cinder is Instagram's internal performance-oriented production version of CPython

Cinder is Instagram's internal performance-oriented production version of CPython 3.8. It contains a number of performance optimizations, including bytecode inline caching, eager evaluation of corout

Facebook Incubator 2.2k Dec 30, 2022
PerfSpect is a system performance characterization tool based on linux perf targeting Intel microarchitectures

PerfSpect PerfSpect is a system performance characterization tool based on linux perf targeting Intel microarchitectures. The tool has two parts perf

Intel Corporation 139 Dec 30, 2022
Sampling profiler for Python programs

py-spy: Sampling profiler for Python programs py-spy is a sampling profiler for Python programs. It lets you visualize what your Python program is spe

Ben Frederickson 9.5k Jan 01, 2023
Rip Raw - a small tool to analyse the memory of compromised Linux systems

Rip Raw Rip Raw is a small tool to analyse the memory of compromised Linux systems. It is similar in purpose to Bulk Extractor, but particularly focus

Cado Security 127 Oct 28, 2022
A low-impact profiler to figure out how much memory each task in Dask is using

dask-memusage If you're using Dask with tasks that use a lot of memory, RAM is your bottleneck for parallelism. That means you want to know how much m

Itamar Turner-Trauring 23 Dec 09, 2022
Python compiler that massively increases Python's code performance without code changes.

Flyable - A python compiler for highly performant code Flyable is a Python compiler that generates efficient native code. It uses different techniques

Flyable 35 Dec 16, 2022
Pearpy - a Python package for writing multithreaded code and parallelizing tasks across CPU threads.

Pearpy The Python package for (pear)allelizing your tasks across multiple CPU threads. Installation The latest version of Pearpy can be installed with

MLH Fellowship 5 Nov 01, 2021