flow-management

SKILL.md

Flow Management Patterns

Reference patterns for building and orchestrating Dataiku flows via the Python API.

When to Use This Skill

  • Building datasets that depend on upstream datasets
  • Running multiple recipes in the correct order
  • Creating multi-step pipelines (e.g., aggregate then join then train)
  • Checking job status and handling failures mid-pipeline

Build a Single Dataset

recipe = project.get_recipe("my_recipe")
job = recipe.run(job_type='NON_RECURSIVE_FORCED_BUILD', partitions=None, wait=True, no_fail=False)
state = job.get_status()["baseStatus"]["state"]  # "DONE" or "FAILED"

recipe.run() already waits for completion. Use no_fail=True to prevent exceptions on failure.

Note: job_type options are NON_RECURSIVE_FORCED_BUILD (default), RECURSIVE_BUILD, RECURSIVE_FORCED_BUILD.

Build Multiple Datasets in Dependency Order

When downstream datasets depend on upstream ones, build them sequentially:

def build_recipe(project, recipe_name):
    """Build a recipe and return success status."""
    print(f"Building {recipe_name}...")
    recipe = project.get_recipe(recipe_name)
    job = recipe.run(no_fail=True)
    status = job.get_status()
    state = status.get("baseStatus", {}).get("state")

    if state == "DONE":
        print(f"  {recipe_name}: success")
        return True
    else:
        # Extract error details
        activities = status.get("baseStatus", {}).get("activities", {})
        for name, info in activities.items():
            if info.get("firstFailure"):
                print(f"  {recipe_name} error: {info['firstFailure'].get('message')}")
        return False


# Build in dependency order: upstream first, then downstream
pipeline = [
    "group_LAB_RESULTS_AGG",       # Step 1: aggregate
    "group_CLINICAL_NOTES_AGG",    # Step 2: aggregate (independent of step 1)
    "join_ML_TRAINING_DATA",       # Step 3: join (depends on steps 1 & 2)
]

for recipe_name in pipeline:
    success = build_recipe(project, recipe_name)
    if not success:
        print(f"Pipeline failed at {recipe_name}. Fix and retry.")
        break

Build Independent Recipes in Parallel

For recipes with no dependency between them, you can build the output datasets directly:

# These two aggregations are independent — build them before the join
ds1 = project.get_dataset("LAB_RESULTS_AGG")
ds2 = project.get_dataset("CLINICAL_NOTES_AGG")

# Build both (sequentially via API, but could overlap in Dataiku)
job1 = project.get_recipe("group_LAB_RESULTS_AGG").run(no_fail=True)
job2 = project.get_recipe("group_CLINICAL_NOTES_AGG").run(no_fail=True)

# Then build the dependent join
job3 = project.get_recipe("join_ML_TRAINING_DATA").run(no_fail=True)

Check What Exists Before Creating

Before creating recipes or datasets, check if they already exist to make scripts idempotent:

existing_datasets = [d.get("name") for d in project.list_datasets()]
existing_recipes = [r.get("name") for r in project.list_recipes()]

if "MY_OUTPUT" not in existing_datasets:
    # Create the dataset...
    pass

if "my_recipe" not in existing_recipes:
    # Create the recipe...
    pass

Verify Pipeline Results

After building a pipeline, verify the final output:

ds = project.get_dataset("ML_TRAINING_DATA")
schema = ds.get_settings().get_raw().get("schema", {}).get("columns", [])

print(f"Output has {len(schema)} columns:")
for col in schema:
    print(f"  - {col['name']} ({col.get('type', 'unknown')})")

Common Pipeline Patterns

Pattern Steps Key Concern
Aggregate + Join Group inputs → Join aggregated outputs Build aggregations before the join
Clean + Transform Prepare recipe → Group/Join Schema updates after each step
ETL to Warehouse Prepare → Sync to SQL connection Set SQL schema before sync
ML Pipeline Prep → Aggregate → Join → Train Full dependency chain, verify schema at each step

Handling Failures Mid-Pipeline

for recipe_name in pipeline:
    success = build_recipe(project, recipe_name)
    if not success:
        # Get detailed error info
        jobs = project.list_jobs()
        job = project.get_job(jobs[0]['def']['id'])
        print(job.get_log()[-2000:])  # Last 2000 chars of log
        break

See skills/troubleshooting/ for detailed error diagnosis patterns.

Schema Propagation

When an upstream schema changes, propagate it through the flow:

flow = project.get_flow()
propagation = flow.new_schema_propagation("source_dataset")
future = propagation.start()
future.wait_for_result()

For more control, use the builder options:

propagation.set_auto_rebuild(True)
propagation.stop_at("recipe_name")
propagation.mark_recipe_as_ok("recipe_name")

Build Datasets via Project API

For more control than recipe.run(), use the project-level job builder:

job = project.new_job('RECURSIVE_FORCED_BUILD')
job.with_output('dataset_name', object_type='DATASET')
result = job.start_and_wait(no_fail=True)

This supports building multiple outputs in one job:

job = project.new_job('NON_RECURSIVE_FORCED_BUILD')
job.with_output('dataset_1', object_type='DATASET')
job.with_output('dataset_2', object_type='DATASET')
job.start_and_wait()

Detailed References

Related Skills

Weekly Installs
4
GitHub Stars
6
First Seen
Feb 27, 2026
Installed on
github-copilot4
codex4
kimi-cli4
gemini-cli4
cursor4
amp4