raydata
Ray Data Job Development
Help developers build, test, deploy, and debug Ray Data jobs at SRP. Ray Data is designed for large-scale ML workloads with distributed CPU/GPU coordination.
When to Use This Skill
Use this skill when:
- Writing new Ray Data jobs for batch inference, data preprocessing, or ETL
- Testing Ray Data code locally or on GPU clusters
- Deploying jobs to production RayCluster
- Debugging performance issues or failures
- Setting up scheduled jobs in Airflow
Development Workflow
1. Local Development on A10
Environment Setup:
# Connect to A10 development machine
ssh oci-dev2.ssh.buildagi.us # A102 dev machine
# OR
ssh oci-dev3.ssh.buildagi.us # A103 dev machine
# Create virtual environment
python3 -m venv my_venv
source my_venv/bin/activate
# Install Ray Data and dependencies
pip install ray[data]
pip install transformers Pillow torch # Add other dependencies as needed
Development Tips:
- Use VSCode/Cursor for remote editing
- Always use virtualenv to avoid dependency conflicts
- Store code in
/data/srp/raydata/code/for RayCluster visibility - Test with small datasets first
Running Locally:
# Simple Python execution
python /data/srp/raydata/code/your-job.py
When to Use Local Development:
- Initial code writing and debugging
- Jobs that don't require GPU
- Models that fit in A10 GPU memory (~24GB)
2. Testing on Slurm H100/H200
When models are too large for A10 or require more GPU memory:
# Connect to Slurm cluster
ssh -p 2222 zhuguangbin@129.80.180.16
# Start apptainer with H100/H200 GPU
sapptainer -c 20 -m 200G -g 1 -p h100 -i /data/srp/apptainer/ray_2.52.0-py310-gpu.sif
# Run your job inside apptainer
python /data/srp/raydata/code/your-job.py
Use Slurm Testing For:
- Large models (e.g., Qwen3-VL-8B-Instruct-FP8)
- GPU-intensive workloads
- Final testing before production
3. Production Deployment on RayCluster
Setup Ray Client:
# On your Mac or any machine with Ray installed
conda create -n ray python=3.10 -y
conda activate ray
pip install -U "ray[all]"
# Set RayCluster address
export RAY_ADDRESS="https://ray.g.yesy.online"
Submit Job:
ray job submit \
--runtime-env-json='{
"pip": ["torch", "torchvision", "transformers"],
"env_vars": {
"INPUT_PATH": "s3://bucket/path/to/input",
"OUTPUT_PATH": "/data/srp/project/output"
}
}' \
-- python /data/srp/raydata/code/your-job.py
Runtime Environment Parameters:
pip: List of Python packages to installenv_vars: Environment variables (input/output paths, configs)
Important Notes:
- Code must be in
/data/srp/(mounted via JuiceFS) - RayCluster provides auto-scaling, failover, and job scheduling
- Dashboard: https://ray.g.yesy.online/
4. Scheduled Jobs in Airflow
For periodic execution, use Airflow BashOperator:
operator: airflow.operators.bash.BashOperator
bash_command: |-
echo "Installing ray[data]..."
pip install ray[data]
~/.local/bin/ray job submit \
--runtime-env-json='{"pip": ["torch"], "env_vars": {"INPUT_PATH": "..."}}' \
-- python /data/srp/raydata/code/your-job.py
env:
RAY_ADDRESS: "https://ray.g.yesy.online"
dependencies:
- start
Reference: https://github.com/SerendipityOneInc/favie-data-etl/tree/main/dags/raydata_job_demo
Code Structure
Basic Ray Data Pipeline
import os
import ray
# 1. Initialize Ray session
ray.init()
# 2. Load data
# From S3
ds = ray.data.read_parquet("s3://bucket/path/*.parquet")
# From JuiceFS
ds = ray.data.read_parquet("/data/srp/project/data/*.parquet")
# Images
ds = ray.data.read_images("s3://bucket/images/", mode="RGB")
# 3. Transform data
ds = ds.map_batches(
YourProcessor,
compute=ray.data.ActorPoolStrategy(size=1),
num_gpus=1,
batch_size=16
)
# 4. Save results
ds.write_parquet("/data/srp/project/output/")
Processor Class Pattern
Simple Function UDF:
def process_batch(batch):
# Process batch data
batch["result"] = [compute(item) for item in batch["input"]]
return batch
ds = ds.map_batches(process_batch, batch_size=32)
Model-Based Processor (Recommended):
from typing import Dict
import numpy as np
from transformers import pipeline
from PIL import Image
BATCH_SIZE = 16
class ImageClassifier:
def __init__(self):
# Load model once in __init__
self.classifier = pipeline(
"image-classification",
model="google/vit-base-patch16-224",
device=0 # GPU device
)
def __call__(self, batch: Dict[str, np.ndarray]):
# Convert numpy arrays to PIL Images
images = [Image.fromarray(img) for img in batch["image"]]
# Run inference
outputs = self.classifier(
images,
top_k=1,
batch_size=BATCH_SIZE
)
# Add results to batch
batch["score"] = [out[0]["score"] for out in outputs]
batch["label"] = [out[0]["label"] for out in outputs]
return batch
# Use with ActorPoolStrategy for GPU efficiency
predictions = ds.map_batches(
ImageClassifier,
compute=ray.data.ActorPoolStrategy(size=1), # Number of GPU replicas
num_gpus=1, # GPUs per replica
batch_size=BATCH_SIZE
)
Using vLLM for Batch Inference
from ray.data.llm import LLMPredictor, vLLMEngineProcessorConfig
model_id = "meta-llama/Llama-3.1-8B-Instruct"
processor = vLLMEngineProcessorConfig(
model_id=model_id,
tensor_parallel_size=1,
max_model_len=2048,
max_num_batched_tokens=4096
)
ds = ds.map_batches(
LLMPredictor,
fn_constructor_kwargs={"processor": processor},
num_gpus=1,
batch_size=32
)
Best Practices
Performance Optimization
-
Batch Sizing:
- Use largest batch size that fits in GPU memory
- Larger batches = better GPU utilization
- Start with batch_size=16, increase until OOM
-
Concurrent Processing:
- Separate CPU preprocessing from GPU inference
- Use multiple
map_batchesoperations - Enables parallel preprocessing while inference runs
-
GPU Utilization:
- Set
num_gpus=1per actor - Use
ActorPoolStrategy(size=N)for N GPU replicas - Match replicas to available GPUs in cluster
- Set
-
Memory Management:
- Monitor memory usage in Ray dashboard
- Reduce batch_size if hitting OOM
- Use smaller models or more capable GPUs
Error Handling
# Set AWS region to avoid endpoint resolution issues
import os
if "AWS_DEFAULT_REGION" not in os.environ:
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
# Use environment variables for paths
input_path = os.environ.get("INPUT_PATH", "default/path")
output_path = os.environ.get("OUTPUT_PATH", "default/output")
# Add error handling in processor
class RobustProcessor:
def __call__(self, batch):
try:
# Process batch
return batch
except Exception as e:
print(f"Error processing batch: {e}")
# Add error column or skip
batch["error"] = str(e)
return batch
Monitoring & Debugging
Ray Dashboard
Access dashboard at: https://ray.g.yesy.online/
Key Metrics:
- Job Status: Running/completed/failed jobs
- Cluster Nodes: Available GPUs, CPU, memory
- Task Timeline: Per-task execution time
- Operator Metrics: Throughput, batch processing time
- Resource Usage: GPU utilization, memory pressure
Viewing Job Logs
# List jobs
ray job list
# Get job logs
ray job logs <job_id>
# Follow logs in real-time
ray job logs <job_id> --follow
Common Issues
| Issue | Cause | Solution |
|---|---|---|
| GPU OOM | Batch too large | Reduce batch_size |
| CPU OOM | Too many concurrent actors | Increase num_cpus per actor |
| Slow preprocessing | Sequential processing | Separate into distinct map operations |
| Low GPU utilization | Batch too small | Increase batch_size |
| Model loading fails | Missing dependencies | Add to runtime-env-json pip list |
| S3 access errors | Missing AWS region | Set AWS_DEFAULT_REGION env var |
Debug Mode
# Run with local mode for debugging
ray.init(local_mode=True) # Single-process execution
# Take small sample for testing
sample = ds.take(10) # Get 10 items
print(sample)
# Limit data for quick tests
ds = ds.limit(100) # Process only 100 items
Example Workflows
1. Image Classification
import os
import ray
from transformers import pipeline
from PIL import Image
ray.init()
ds = ray.data.read_images(
os.environ.get("INPUT_PATH", "s3://bucket/images/"),
mode="RGB"
)
class ImageClassifier:
def __init__(self):
self.classifier = pipeline(
"image-classification",
model="google/vit-base-patch16-224",
device=0
)
def __call__(self, batch):
images = [Image.fromarray(img) for img in batch["image"]]
outputs = self.classifier(images, top_k=1, batch_size=16)
batch["label"] = [out[0]["label"] for out in outputs]
batch["score"] = [out[0]["score"] for out in outputs]
return batch
predictions = ds.map_batches(
ImageClassifier,
compute=ray.data.ActorPoolStrategy(size=1),
num_gpus=1,
batch_size=16
)
predictions.write_parquet(
os.environ.get("OUTPUT_PATH", "/data/srp/output/")
)
2. Vision-Language Model (Qwen-VL)
import os
import ray
import torch
from transformers import AutoProcessor, AutoModelForVision2Seq
from PIL import Image
from qwen_vl_utils import process_vision_info
ray.init()
ds = ray.data.read_images(
os.environ.get("INPUT_PATH", "s3://bucket/images/"),
mode="RGB"
)
PROMPT = os.environ.get(
"PROMPT",
"请详细描述这张图片中的内容,包括主要对象、场景和任何值得注意的细节。"
)
class QwenVLAnalyzer:
def __init__(self):
model_name = "Qwen/Qwen3-VL-8B-Instruct-FP8"
self.processor = AutoProcessor.from_pretrained(model_name)
self.model = AutoModelForVision2Seq.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
self.model.eval()
def __call__(self, batch):
images = [Image.fromarray(img) for img in batch["image"]]
messages_list = [
[{
"role": "user",
"content": [
{"type": "image", "image": img},
{"type": "text", "text": PROMPT}
]
}]
for img in images
]
image_inputs, video_inputs = process_vision_info(messages_list)
texts = self.processor.apply_chat_template(
messages_list,
tokenize=False,
add_generation_prompt=True
)
inputs = self.processor(
text=texts,
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt"
).to(self.model.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=512)
generated_ids_trimmed = [
out[len(inp):]
for inp, out in zip(inputs.input_ids, generated_ids)
]
output_texts = self.processor.batch_decode(
generated_ids_trimmed,
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)
batch["description"] = output_texts
return batch
predictions = ds.map_batches(
QwenVLAnalyzer,
compute=ray.data.ActorPoolStrategy(size=1),
num_gpus=1,
batch_size=20
)
predictions.write_parquet(
os.environ.get("OUTPUT_PATH", "/data/srp/output/")
)
3. Batch Inference with vLLM
Reference: https://github.com/SerendipityOneInc/ray-data-etl/blob/main/jobs/demo/raydata-demo-qwenvl-vllm.py
4. HTTP API Inference
Reference: https://github.com/SerendipityOneInc/ray-data-etl/blob/main/jobs/demo/raydata-demo-qwenvl-http.py
Resources
Official Documentation
- Ray Data Quickstart: https://docs.ray.io/en/latest/data/quickstart.html
- Working with LLMs: https://docs.ray.io/en/latest/data/working-with-llms.html
- Batch Inference: https://docs.ray.io/en/latest/data/batch_inference.html
- Loading Data: https://docs.ray.io/en/releases-2.52.0/data/loading-data.html
- Transforming Data: https://docs.ray.io/en/releases-2.52.0/data/transforming-data.html
- Working with Images: https://docs.ray.io/en/releases-2.52.0/data/working-with-images.html
- Runtime Environments: https://docs.ray.io/en/latest/ray-core/handling-dependencies.html
SRP Resources
- RayData Wiki: https://starquest.feishu.cn/wiki/Kpb3w8MNZieJGkkMhbqcIkrTnTc
- ray-data-etl Project: https://github.com/SerendipityOneInc/ray-data-etl
- RayCluster Dashboard: https://ray.g.yesy.online/
- Airflow Examples: https://github.com/SerendipityOneInc/favie-data-etl/tree/main/dags/raydata_job_demo
Quick Reference
Common Commands
# Local development
ssh oci-dev2.ssh.buildagi.us
python3 -m venv venv && source venv/bin/activate
pip install ray[data]
# Slurm testing
ssh -p 2222 zhuguangbin@129.80.180.16
sapptainer -c 20 -m 200G -g 1 -p h100 -i /data/srp/apptainer/ray_2.52.0-py310-gpu.sif
# Production submission
export RAY_ADDRESS="https://ray.g.yesy.online"
ray job submit --runtime-env-json='...' -- python /data/srp/raydata/code/job.py
# Monitoring
ray job list
ray job logs <job_id> --follow
File Locations
- Code storage:
/data/srp/raydata/code/ - Output storage:
/data/srp/project/output/ - Apptainer images:
/data/srp/apptainer/
Implementation Steps
When helping users with Ray Data jobs, follow this workflow:
-
Understand Requirements:
- What type of processing? (inference, ETL, preprocessing)
- Input data source and format
- Model requirements (GPU memory, batch size)
- Expected output format
-
Choose Development Environment:
- Start with A10 local development for testing
- Move to Slurm H100/H200 if needed for larger models
- Deploy to RayCluster for production scale
-
Write Code:
- Use the processor class pattern
- Include proper error handling
- Use environment variables for paths
- Start with small batch sizes
-
Test Iteratively:
- Test with
.take(10)or.limit(100)first - Verify output format
- Check resource usage in dashboard
- Optimize batch size and concurrency
- Test with
-
Deploy to Production:
- Submit to RayCluster with proper runtime-env
- Monitor via dashboard
- Set up Airflow scheduling if needed
-
Debug Issues:
- Check Ray dashboard for errors
- Review job logs
- Adjust batch size or resources
- Consult troubleshooting table