skills/doanchienthangdev/omgkit/deployment-paradigms

deployment-paradigms

SKILL.md

Deployment Paradigms

Understanding ML deployment patterns and trade-offs.

Deployment Modes

Batch Inference

# Process all data at once
def batch_inference(model, data_path, output_path):
    data = pd.read_parquet(data_path)
    predictions = model.predict(data)
    predictions.to_parquet(output_path)

# Schedule: Daily/Hourly
# Airflow DAG example
with DAG('batch_inference', schedule_interval='@daily') as dag:
    inference_task = PythonOperator(
        task_id='run_inference',
        python_callable=batch_inference
    )

Real-time Inference

from fastapi import FastAPI
import torch

app = FastAPI()
model = torch.jit.load("model.pt")

@app.post("/predict")
async def predict(request: PredictRequest):
    features = preprocess(request.data)
    with torch.no_grad():
        prediction = model(features)
    return {"prediction": prediction.tolist()}

Streaming Inference

from kafka import KafkaConsumer, KafkaProducer

consumer = KafkaConsumer('input-topic')
producer = KafkaProducer()

for message in consumer:
    data = deserialize(message.value)
    prediction = model.predict(data)
    producer.send('output-topic', serialize(prediction))

Serving Patterns

Online Serving

  • Sub-second latency
  • Feature store for features
  • Model caching
  • Auto-scaling

Offline Serving

  • Batch processing
  • Precomputed predictions
  • Lower cost
  • Higher throughput

Hybrid Serving

class HybridPredictor:
    def __init__(self, cache_ttl=3600):
        self.cache = {}
        self.cache_ttl = cache_ttl
        self.model = load_model()

    def predict(self, user_id, context):
        # Check precomputed cache
        cache_key = f"{user_id}:{hash(context)}"
        if cache_key in self.cache:
            return self.cache[cache_key]

        # Compute real-time
        prediction = self.model.predict(context)
        self.cache[cache_key] = prediction
        return prediction

Edge Deployment

# TFLite for mobile/embedded
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# ONNX for cross-platform
import onnx
torch.onnx.export(model, dummy_input, "model.onnx")

# CoreML for iOS
import coremltools as ct
mlmodel = ct.convert(model, inputs=[ct.TensorType(shape=(1, 3, 224, 224))])

Serverless ML

# AWS Lambda
import json
import boto3

def lambda_handler(event, context):
    runtime = boto3.client('runtime.sagemaker')
    response = runtime.invoke_endpoint(
        EndpointName='my-model',
        ContentType='application/json',
        Body=json.dumps(event['body'])
    )
    return json.loads(response['Body'].read())

Deployment Comparison

Pattern Latency Cost Complexity Use Case
Batch High Low Low Reports, ETL
Real-time Low High Medium User-facing
Streaming Medium Medium High Event-driven
Edge Very Low Low High Offline, IoT
Serverless Variable Pay-per-use Low Sporadic traffic

Commands

  • /omgdeploy:serve - Deploy serving
  • /omgdeploy:edge - Edge deployment

Best Practices

  1. Match paradigm to requirements
  2. Consider latency vs cost trade-offs
  3. Plan for scaling
  4. Test under realistic conditions
  5. Monitor deployed models
Weekly Installs
1
GitHub Stars
3
First Seen
6 days ago
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1