dask
Dask - Scalable Parallel Computing
Dask provides high-level collections (Arrays, DataFrames, Bags) that mimic the APIs of NumPy and pandas but operate in parallel on data sets that are larger than memory.
When to Use
- Processing datasets that don't fit in RAM (Out-of-core computing).
- Speeding up computations by using all available CPU cores.
- Parallelizing custom Python functions or complex workflows (dask.delayed).
- Scaling machine learning pipelines to large clusters.
- Handling large-scale arrays in physics, climate science, or imaging.
- Analyzing massive log files or unstructured data (dask.bag).
Reference Documentation
Official docs: https://docs.dask.org/
Dask Examples: https://examples.dask.org/
Search patterns: dask.dataframe, dask.array, dask.delayed, client.compute, dask.distributed
Core Principles
Lazy Evaluation
Dask doesn't compute results immediately. Instead, it builds a Task Graph. Actual computation only happens when you explicitly call .compute() or .persist().
Chunks and Partitions
- Dask Array: Composed of many small NumPy arrays called chunks.
- Dask DataFrame: Composed of many small pandas DataFrames called partitions.
Use Dask For
| Collection | Analogy | Use Case |
|---|---|---|
dask.array |
NumPy | Large-scale multidimensional math. |
dask.dataframe |
pandas | Large CSV/Parquet/SQL tables. |
dask.bag |
Lists/Toolz | Unstructured data (JSON, Logs). |
dask.delayed |
Functions | Custom parallel logic. |
Do NOT Use For
- Data that fits easily in RAM (pandas/NumPy are faster due to lower overhead).
- Simple tasks where multiprocessing or concurrent.futures suffice.
- Situations where low-latency response is required (Dask adds scheduling overhead).
Quick Reference
Installation
pip install "dask[complete]"
Standard Imports
import dask.array as da
import dask.dataframe as dd
from dask import delayed, compute
from dask.distributed import Client
Basic Pattern - Initializing a Local Cluster
from dask.distributed import Client
# Setup local cluster and dashboard
client = Client()
print(client.dashboard_link) # View real-time computation graph
Critical Rules
✅ DO
- Use the Dashboard - Always monitor the Dask dashboard to find bottlenecks (red blocks = bad).
- Chunk thoughtfully - Aim for chunk sizes of 100MB to 250MB. Too small = high overhead; too large = memory errors.
- Prefer Parquet - Use Parquet instead of CSV for DataFrames; it supports efficient metadata and partitioning.
- Call
.persist()on reused data - If you use the same intermediate result multiple times, persist it in memory. - Let Dask handle the graph - Avoid calling
.compute()too early; try to keep calculations lazy as long as possible. - Use
map_partitions- For custom logic on DataFrames, use this to apply pandas functions directly to each chunk.
❌ DON'T
- Compute too often - Every
.compute()triggers the entire graph execution and pulls data into RAM. - Send large data to workers - Use
client.scatterfor large objects needed by all workers instead of passing them as arguments. - Iterate over rows -
for row in dask_dfis incredibly slow; use vectorized operations. - Use Dask if pandas is enough - Dask is slower for small data due to scheduling time.
Anti-Patterns (NEVER)
import dask.dataframe as dd
# ❌ BAD: Computing a large result into a local variable
# This will crash your local machine by filling RAM
result = dd_df.compute()
# ✅ GOOD: Compute only what you need (aggregations)
mean_val = dd_df['column'].mean().compute()
# ❌ BAD: Too many small tasks (Task Overhead)
# result = [delayed(inc)(i) for i in range(1000000)] # 1 million tasks is too much
# ✅ GOOD: Batch tasks together or use Dask Collections
import dask.array as da
x = da.arange(1000000, chunks=10000) # Only 100 tasks
# ❌ BAD: Hardcoding workers' file paths
# dd.read_csv('/Users/me/data.csv') # Workers on other machines can't see this path!
# ✅ GOOD: Use shared storage (S3, HDFS, NFS)
# dd.read_csv('s3://my-bucket/data.csv')
Dask Array (dask.array)
Scaling NumPy
import dask.array as da
# Create a large random array (100GB)
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# Perform operations (Lazy)
y = x + x.T
z = y[::2, :5000].mean(axis=0)
# Compute result
result = z.compute()
Dask DataFrame (dask.dataframe)
Scaling pandas
import dask.dataframe as dd
# Load massive dataset
df = dd.read_csv('data/*.csv')
# Filtering and Grouping
result = (df[df['value'] > 0]
.groupby('category')
.amount.sum())
# Execute
final_amounts = result.compute()
# Convert from pandas to dask
import pandas as pd
pdf = pd.DataFrame(...)
ddf = dd.from_pandas(pdf, npartitions=10)
Dask Delayed (dask.delayed)
Parallelizing Custom Code
from dask import delayed
@delayed
def load(filename):
...
return data
@delayed
def process(data):
...
return result
@delayed
def summarize(results):
return sum(results)
# Build graph
filenames = ['file1.csv', 'file2.csv', 'file3.csv']
outputs = [process(load(f)) for f in filenames]
total = summarize(outputs)
# Visualize graph (requires graphviz)
# total.visualize()
# Execute in parallel
final_sum = total.compute()
Machine Learning with Dask (dask-ml)
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
# Scaling to large data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_dask)
# Training on large datasets (Parallel SGD)
model = LogisticRegression()
model.fit(X_dask, y_dask)
# Scikit-learn wrapper (for small data, parallelizing search)
from sklearn.ensemble import RandomForestClassifier
from dask_ml.model_selection import GridSearchCV
clf = RandomForestClassifier()
grid = GridSearchCV(clf, param_grid, cv=3)
grid.fit(X, y) # Grid search runs in parallel across cluster
Practical Workflows
1. Massive Log Processing (Bag)
import dask.bag as db
import json
def analyze_logs(pattern):
# Read unstructured text files
b = db.read_text('logs/2023-*.log')
# Parse JSON and filter
records = b.map(json.loads).filter(lambda x: x['level'] == 'ERROR')
# Extract specific field and count frequencies
counts = records.pluck('message').frequencies()
return counts.compute()
2. Large Scale Imaging (Array)
def process_satellite_images(da_stack):
"""Calculate NDVI anomaly across time on 1TB of data."""
# da_stack is a 3D dask array (time, x, y)
# Simple vectorized math (Parallel)
climatology = da_stack.mean(axis=0)
anomaly = da_stack - climatology
# Save results directly to disk without loading into RAM
anomaly.to_zarr('anomalies.zarr')
3. Cleaning Data with Method Chaining
def clean_dataset(ddf):
return (ddf
.dropna(subset=['id'])
.fillna({'status': 'unknown'})
.assign(timestamp=dd.to_datetime(ddf['time_str']))
.groupby('user_id')
.last()
.persist()) # Keep in memory for fast future use
Performance Optimization
The Dask Dashboard Guide
- Progress Bar: Shows how many tasks are finished.
- Task Stream: Shows which worker is doing what. White space = idle workers (bad).
- Memory Plot: Shows RAM usage. If it turns orange/red, workers are hitting limits.
- Worker Table: Check for skewed data distribution.
Optimizing Data Storage
- Zarr: Best for N-dimensional arrays.
- Parquet: Best for tabular DataFrames.
- Compression: Use snappy or lz4 for a balance between speed and size.
Common Pitfalls and Solutions
The "Worker Lost" Error
Problem: Workers crash because they ran out of RAM.
Solution: Decrease chunk size or use a machine with more memory. Check for data skew.
Serialization Errors (Pickle)
Problem: Dask can't send your custom object to workers.
Solution: Use dask.distributed.Client.register_plugin or ensure classes are defined in a separate file accessible by workers.
"Too Many Tasks" Warning
Problem: You created 1,000,000+ tiny tasks.
Solution: Re-chunk your data into larger pieces. Use dask_array.rechunk() or dask_df.repartition().
Best Practices
- Always monitor the Dask dashboard during development to identify bottlenecks.
- Choose chunk sizes carefully - aim for 100-250MB per chunk for optimal performance.
- Use Parquet format for DataFrames instead of CSV for better performance and metadata support.
- Persist intermediate results that are reused multiple times to avoid recomputation.
- Keep computations lazy as long as possible - only call
.compute()when you need the final result. - Use
map_partitionsfor custom pandas operations on Dask DataFrames. - Avoid iterating over rows in Dask DataFrames - use vectorized operations instead.
- Use shared storage (S3, HDFS, NFS) when working with distributed clusters.
- Batch small tasks together to avoid task overhead.
- Don't use Dask for data that fits in RAM - pandas/NumPy are faster for small datasets.
Dask transforms Python from a single-threaded scripting language into a world-class system for distributed computing. It is the bridge between a researcher's laptop and a high-performance compute cluster.