pipeline-check
Pipeline Development Assistant
Note: This skill is specific to DeepRead's LangGraph-based document processing architecture. Adapt the file paths and patterns if using this in a different LangGraph project.
You are DeepRead's pipeline specialist. You validate that pipeline code follows the LangGraph architecture patterns and conventions established in this codebase.
Architecture Rules
Layer Separation (MANDATORY)
src/pipelines/
├── nodes/ → LLM orchestration, @traceable decorated, async
├── tools/ → Pure utilities, NO LLM calls, sync or async
├── graphs/ → StateGraph builders, wiring nodes together
├── optimizer/ → Blueprint optimization pipeline
└── state.py → PipelineState TypedDict (single source of truth)
Rules:
- Nodes call LLMs via services. They receive
PipelineStateand return a partial dict update. - Tools are pure functions. They must NOT import from
src/services/or make LLM calls. - Graphs wire nodes into a
StateGraph. They should not contain business logic.
Validation Checks
1. Node Contract
Every node function must:
# CORRECT pattern
from langsmith import traceable
@traceable(name="descriptive_name")
async def my_node(state: PipelineState) -> dict:
"""Docstring explaining what this node does."""
# ... logic ...
return {"key": value} # Partial state update
Check for:
@traceabledecorator present on all node functions- Function takes
PipelineStateas first argument - Function is
async - Function returns
dict(partial state update) - Docstring present
2. Step Timings (MANDATORY for nodes)
All nodes must track execution time and add it to step_timings:
import time
@traceable(name="my_node")
async def my_node(state: PipelineState) -> dict:
start = time.time()
# ... node logic ...
elapsed = time.time() - start
step_timings = dict(state.get("step_timings", {}))
step_timings["my_node"] = round(elapsed, 2)
return {"result": value, "step_timings": step_timings}
Check: Every node in src/pipelines/nodes/ must update step_timings.
3. Tool Purity
Files in src/pipelines/tools/ must NOT:
- Import from
src/services/(no external service calls) - Import
langchain,openai,google.generativeai, or other LLM libraries - Make HTTP requests
- Access the database
Check: Scan imports in tool files for violations.
4. State Type Safety
The PipelineState TypedDict in src/pipelines/state.py is the contract. Any new state keys added by nodes must be defined there.
Check:
- Read
src/pipelines/state.pyto get all valid keys - Scan node return dicts for keys not in
PipelineState - Flag any undeclared state keys
5. Cost Tracking
Nodes that make LLM calls should track costs:
from src.pipelines.tools.cost_tracking import track_cost
# After LLM call
track_cost(state, model_name, input_tokens, output_tokens)
Check: Nodes importing LLM services should also use cost tracking.
6. Error Handling in Nodes
Nodes must handle errors gracefully and not crash the pipeline:
try:
result = await llm_call()
except Exception as e:
logger.error(f"Node failed: {e}", exc_info=True)
# Return safe defaults, don't crash the graph
return {"error": str(e), "step_timings": step_timings}
7. Concurrency Guards
Nodes processing pages in parallel must use asyncio.Semaphore to prevent rate limits:
semaphore = asyncio.Semaphore(15) # Max concurrent requests
async def process_page(page):
async with semaphore:
return await llm_call(page)
Check: Look for asyncio.gather or asyncio.create_task patterns without semaphore protection.
Execution Steps
- Identify changed pipeline files (
src/pipelines/) - Classify each file as node, tool, or graph
- Run the appropriate checks for each type
- Report violations with file paths and line numbers
- Suggest fixes for each violation
Output Format
## Pipeline Check Results
### Files Analyzed
- src/pipelines/nodes/new_node.py (NODE)
- src/pipelines/tools/helper.py (TOOL)
### Checks Passed
✅ Layer separation respected
✅ All nodes have @traceable
✅ Step timings tracked
✅ Tools are pure
### Violations
| File | Line | Check | Issue |
|------|------|-------|-------|
| nodes/new_node.py | 15 | step_timings | Missing step_timings update |
### Suggestions
- Add step_timings tracking to `new_node` (see pattern above)
Quick Smoke Test
If the user passes $ARGUMENTS containing "test" or "smoke", also run the micro benchmark:
uv run pytest tests/benchmarks/test_benchmark_micro.py -v --timeout=120
Report pass/fail and any accuracy metrics.
More from deepread-tech/skills
api
Full DeepRead API reference. All endpoints, auth, request/response formats, blueprints, webhooks, error handling, and code examples.
9setup
Get started with DeepRead. Automatically obtains an API key via device authorization flow, then walks through first request, structured extraction, and blueprints.
8doc-sync
Detects when code changes have made documentation outdated and flags or updates the affected docs. Use after implementing features, changing APIs, or modifying architecture.
2enforce
Validates code changes against DeepRead's mandatory patterns and standards defined in AGENTS.md. Use this after writing or modifying code to catch violations before committing.
2prepare
Session opener. Analyzes a task description and creates a scoped plan with a checklist, affected files, and which skills to run. Use at the start of every coding session before writing any code.
2pii-removal
PII removal API. Upload documents, automatically detect and redact personal information (SSN, credit cards, names, etc.), download redacted files. Endpoints, auth, request/response formats, and code examples.
1