digdag

Installation
SKILL.md

Treasure Workflow (Digdag)

Basic Structure

timezone: Asia/Tokyo

schedule:
  daily>: 02:00:00

_export:
  td:
    database: my_database
    engine: presto

+extract:
  td>: queries/extract.sql
  create_table: raw_data

+transform:
  td>: queries/transform.sql
  create_table: results

Key points:

  • .dig extension required; filename becomes workflow name
  • Tasks run sequentially with +task_name: prefix
  • foo>: bar is sugar for _type: foo and _command: bar

Session Variables

Variable Example
${session_time} 2024-01-30T00:00:00+09:00
${session_date} 2024-01-30
${session_date_compact} 20240130
${session_unixtime} 1706540400
${last_session_date} Previous scheduled date
${next_session_date} Next scheduled date

Moment.js available:

+tomorrow:
  echo>: ${moment(session_time).add(1, 'days').format("YYYY-MM-DD")}

TD Operator

+query:
  td>: queries/analysis.sql
  database: analytics
  engine: presto
  create_table: results      # or insert_into: existing_table

Inline SQL:

+inline:
  td>:
    query: |
      SELECT * FROM events
      WHERE TD_TIME_RANGE(time, '${session_date}', TD_TIME_ADD('${session_date}', '1d'))

Parallel Execution

+parallel_tasks:
  _parallel: true

  +task_a:
    td>: queries/a.sql

  +task_b:
    td>: queries/b.sql

+after_parallel:
  echo>: "Runs after all parallel tasks"

Limited concurrency:

+limited:
  _parallel:
    limit: 2

Error Handling

+task:
  td>: queries/important.sql
  _retry: 3

  _error:
    +alert:
      sh>: python scripts/alert.py "Task failed"

Retry with backoff:

+task:
  _retry:
    limit: 3
    interval: 10
    interval_type: exponential  # or constant

Variables

_export:
  td:
    database: production
  my_param: value
  api_key: ${secret:api_credentials.key}  # TD parameter store

+task:
  py>: scripts.process.main
  param: ${my_param}

Conditional & Loops

+check:
  td>: queries/count.sql
  store_last_results: true

+if_data:
  if>: ${td.last_results.cnt > 0}
  _do:
    +process:
      td>: queries/process.sql

+loop:
  for_each>:
    region: [US, EU, ASIA]
  _do:
    +process:
      td>: queries/by_region.sql

Event Triggers

# Runs after another workflow succeeds
trigger:
  attempt>:
  dependent_workflow_name: segment_refresh
  dependent_project_name: customer_segments

+activate:
  td>: queries/activate.sql

tdx wf Commands

For full CLI reference, see tdx-skills/workflow. Key commands:

tdx wf pull my_project               # Pull project to local folder
tdx wf push                          # Push local changes with diff preview
tdx wf run my_project.my_workflow    # Run specific workflow
tdx wf sessions --status error       # Find failed sessions
tdx wf timeline my_project.workflow  # Visual task execution timeline
tdx wf attempt <id> logs +task_name  # View task logs

Project Structure

workflows/
└── my_project/              # Created by tdx wf pull
    ├── tdx.json             # Sync tracking (auto-generated)
    ├── main.dig             # Workflow definition
    ├── queries/
    │   └── analysis.sql
    └── scripts/
        └── process.py

Schedule Options

schedule:
  daily>: 02:00:00
  # hourly>: 00:00
  # cron>: "0 */4 * * *"
  # weekly>: "Mon,00:00:00"

Resources

Related skills

More from treasure-data/td-skills

Installs
1
GitHub Stars
18
First Seen
Mar 31, 2026