deployment-paradigms
SKILL.md
Deployment Paradigms
Understanding ML deployment patterns and trade-offs.
Deployment Modes
Batch Inference
# Process all data at once
def batch_inference(model, data_path, output_path):
data = pd.read_parquet(data_path)
predictions = model.predict(data)
predictions.to_parquet(output_path)
# Schedule: Daily/Hourly
# Airflow DAG example
with DAG('batch_inference', schedule_interval='@daily') as dag:
inference_task = PythonOperator(
task_id='run_inference',
python_callable=batch_inference
)
Real-time Inference
from fastapi import FastAPI
import torch
app = FastAPI()
model = torch.jit.load("model.pt")
@app.post("/predict")
async def predict(request: PredictRequest):
features = preprocess(request.data)
with torch.no_grad():
prediction = model(features)
return {"prediction": prediction.tolist()}
Streaming Inference
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer('input-topic')
producer = KafkaProducer()
for message in consumer:
data = deserialize(message.value)
prediction = model.predict(data)
producer.send('output-topic', serialize(prediction))
Serving Patterns
Online Serving
- Sub-second latency
- Feature store for features
- Model caching
- Auto-scaling
Offline Serving
- Batch processing
- Precomputed predictions
- Lower cost
- Higher throughput
Hybrid Serving
class HybridPredictor:
def __init__(self, cache_ttl=3600):
self.cache = {}
self.cache_ttl = cache_ttl
self.model = load_model()
def predict(self, user_id, context):
# Check precomputed cache
cache_key = f"{user_id}:{hash(context)}"
if cache_key in self.cache:
return self.cache[cache_key]
# Compute real-time
prediction = self.model.predict(context)
self.cache[cache_key] = prediction
return prediction
Edge Deployment
# TFLite for mobile/embedded
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# ONNX for cross-platform
import onnx
torch.onnx.export(model, dummy_input, "model.onnx")
# CoreML for iOS
import coremltools as ct
mlmodel = ct.convert(model, inputs=[ct.TensorType(shape=(1, 3, 224, 224))])
Serverless ML
# AWS Lambda
import json
import boto3
def lambda_handler(event, context):
runtime = boto3.client('runtime.sagemaker')
response = runtime.invoke_endpoint(
EndpointName='my-model',
ContentType='application/json',
Body=json.dumps(event['body'])
)
return json.loads(response['Body'].read())
Deployment Comparison
| Pattern | Latency | Cost | Complexity | Use Case |
|---|---|---|---|---|
| Batch | High | Low | Low | Reports, ETL |
| Real-time | Low | High | Medium | User-facing |
| Streaming | Medium | Medium | High | Event-driven |
| Edge | Very Low | Low | High | Offline, IoT |
| Serverless | Variable | Pay-per-use | Low | Sporadic traffic |
Commands
/omgdeploy:serve- Deploy serving/omgdeploy:edge- Edge deployment
Best Practices
- Match paradigm to requirements
- Consider latency vs cost trade-offs
- Plan for scaling
- Test under realistic conditions
- Monitor deployed models
Weekly Installs
1
Repository
doanchienthangdev/omgkitGitHub Stars
3
First Seen
6 days ago
Security Audits
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1