NYC
skills/delphine-l/claude_global/galaxy-automation

galaxy-automation

SKILL.md

Galaxy Workflow Automation with BioBlend and Planemo

Purpose

This skill provides expert knowledge for automating Galaxy workflows using BioBlend (Python Galaxy API library) and Planemo (Galaxy workflow testing and execution tool).

When to Use This Skill

Use this skill when:

  • ✅ Automating Galaxy workflow execution via API
  • ✅ Building batch processing systems for Galaxy
  • ✅ Using BioBlend to interact with Galaxy
  • ✅ Testing workflows with Planemo
  • ✅ Managing Galaxy histories, datasets, and collections programmatically
  • ✅ Polling workflow invocation status
  • ✅ Implementing error handling and retry logic for Galaxy operations
  • ✅ Creating Galaxy automation pipelines
  • ✅ Integrating Galaxy into larger bioinformatics workflows

This skill is NOT project-specific - it's useful for ANY Galaxy automation project.


Core BioBlend Concepts

1. Galaxy Instance Connection

from bioblend.galaxy import GalaxyInstance

# Connect to Galaxy server
gi = GalaxyInstance(url='https://usegalaxy.org', key='your_api_key')

# Verify connection
print(gi.whoami())

Best practices:

  • Store API keys in environment variables, never in code
  • Use HTTPS URLs for production
  • Mask API keys in logs: f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"

2. History Management

Create or find history:

def get_or_find_history_id(gi, history_name):
    """Get history ID by name, or create if doesn't exist"""
    histories = gi.histories.get_histories(name=history_name)

    if histories:
        return histories[0]['id']
    else:
        history = gi.histories.create_history(name=history_name)
        return history['id']

List histories:

histories = gi.histories.get_histories()
for hist in histories:
    print(f"{hist['name']}: {hist['id']}")

Get history contents:

history_id = '...'
contents = gi.histories.show_history(history_id, contents=True)

for item in contents:
    print(f"{item['name']}: {item['state']}")

3. Workflow Invocation

Get workflow by ID:

workflow_id = 'a1b2c3d4e5f67890'
workflow = gi.workflows.show_workflow(workflow_id)
print(f"Workflow: {workflow['name']}")

Invoke workflow:

# Prepare inputs (dataset IDs or dataset collection IDs)
inputs = {
    '0': {'id': dataset_id, 'src': 'hda'},  # hda = history dataset
    '1': {'id': collection_id, 'src': 'hdca'}  # hdca = history dataset collection
}

# Invoke workflow
invocation = gi.workflows.invoke_workflow(
    workflow_id,
    inputs=inputs,
    history_id=history_id,
    import_inputs_to_history=False  # Inputs already in history
)

invocation_id = invocation['id']
print(f"Invocation ID: {invocation_id}")

4. Invocation Status Checking

Poll invocation status:

def check_invocation_complete(gi, invocation_id, include_steps=False):
    """
    Check if workflow invocation is complete.

    Returns:
        str: 'ok', 'running', 'failed', 'cancelled', 'error'
    """
    invocation = gi.invocations.show_invocation(
        invocation_id,
        include_workflow_steps=include_steps
    )

    state = invocation['state']

    # Possible states: 'new', 'ready', 'scheduled', 'running',
    #                  'ok', 'failed', 'cancelled', 'error'

    return state

Wait for completion:

import time

def wait_for_invocation(gi, invocation_id, poll_interval=30, timeout=3600):
    """Wait for invocation to complete"""
    start_time = time.time()

    while True:
        state = check_invocation_complete(gi, invocation_id)

        if state in ['ok', 'failed', 'cancelled', 'error']:
            return state

        if time.time() - start_time > timeout:
            raise TimeoutError(f"Invocation {invocation_id} timed out after {timeout}s")

        time.sleep(poll_interval)

Get invocation details with steps:

invocation = gi.invocations.show_invocation(
    invocation_id,
    include_workflow_steps=True
)

# Check individual steps
for step_id, step_data in invocation.get('steps', {}).items():
    step_state = step_data['state']
    job_id = step_data.get('job_id')
    print(f"Step {step_id}: {step_state} (job: {job_id})")

5. Error Handling Patterns

Categorize failures:

def categorize_failure(gi, invocation_id):
    """Determine if failure is retriable"""
    invocation = gi.invocations.show_invocation(
        invocation_id,
        include_workflow_steps=True
    )

    if invocation['state'] != 'failed':
        return None

    # Check failed steps
    failed_steps = []
    for step_id, step_data in invocation.get('steps', {}).items():
        if step_data['state'] == 'error':
            failed_steps.append({
                'step_id': step_id,
                'job_id': step_data.get('job_id')
            })

    # Analyze job failures
    for step in failed_steps:
        if step['job_id']:
            job = gi.jobs.show_job(step['job_id'])
            stderr = job.get('stderr', '')

            # Check for specific error patterns
            if 'out of memory' in stderr.lower():
                return 'retriable_memory'
            elif 'timeout' in stderr.lower():
                return 'retriable_timeout'
            elif 'network' in stderr.lower():
                return 'retriable_network'

    return 'permanent_failure'

6. Rerun Failed Invocations

Galaxy rerun API:

def rerun_failed_invocation(gi, invocation_id, use_cached_job=True,
                           replacement_params=None):
    """
    Rerun a failed invocation using Galaxy's native rerun API.

    Args:
        gi: GalaxyInstance
        invocation_id: Failed invocation ID
        use_cached_job: Reuse successful job results
        replacement_params: Dict of parameter changes

    Returns:
        New invocation ID
    """
    rerun_payload = {
        'use_cached_job': use_cached_job
    }

    if replacement_params:
        rerun_payload['replacement_params'] = replacement_params

    # Call Galaxy rerun API
    response = gi.invocations.rerun_invocation(
        invocation_id,
        **rerun_payload
    )

    new_invocation_id = response['id']
    return new_invocation_id

Detect parameter changes from YAML:

def build_replacement_params_from_yaml(gi, invocation_id, job_yaml_path):
    """
    Compare YAML parameters with invocation parameters.

    Returns dict of changed parameters for rerun.
    """
    import yaml

    # Read new parameters from YAML
    with open(job_yaml_path, 'r') as f:
        new_params = yaml.safe_load(f)

    # Get original invocation parameters
    invocation = gi.invocations.show_invocation(invocation_id)
    orig_params = invocation.get('inputs', {})

    # Find differences
    replacement_params = {}
    for key, new_value in new_params.items():
        if key in orig_params:
            if orig_params[key] != new_value:
                replacement_params[key] = new_value
        else:
            replacement_params[key] = new_value

    return replacement_params

7. Dataset Operations

Upload dataset:

file_path = '/path/to/file.fastq.gz'

dataset = gi.tools.upload_file(
    file_path,
    history_id,
    file_type='fastqsanger.gz'
)

dataset_id = dataset['outputs'][0]['id']

Get dataset details:

dataset = gi.datasets.show_dataset(dataset_id)
print(f"Name: {dataset['name']}")
print(f"State: {dataset['state']}")  # 'ok', 'queued', 'running', 'error'
print(f"Size: {dataset.get('file_size', 0)} bytes")

Wait for dataset upload:

def wait_for_dataset(gi, dataset_id, poll_interval=5, timeout=600):
    """Wait for dataset to finish uploading"""
    start_time = time.time()

    while True:
        dataset = gi.datasets.show_dataset(dataset_id)
        state = dataset['state']

        if state == 'ok':
            return True
        elif state == 'error':
            raise RuntimeError(f"Dataset {dataset_id} failed to upload")

        if time.time() - start_time > timeout:
            raise TimeoutError(f"Dataset upload timeout after {timeout}s")

        time.sleep(poll_interval)

8. Collections (Paired, List, List:Paired)

Create dataset collection:

# List collection (multiple files)
collection_description = {
    'collection_type': 'list',
    'element_identifiers': [
        {'id': dataset_id1, 'name': 'sample1', 'src': 'hda'},
        {'id': dataset_id2, 'name': 'sample2', 'src': 'hda'},
    ]
}

collection = gi.histories.create_dataset_collection(
    history_id,
    collection_description
)

collection_id = collection['id']

Paired collection (forward/reverse reads):

collection_description = {
    'collection_type': 'paired',
    'element_identifiers': [
        {'id': forward_dataset_id, 'name': 'forward', 'src': 'hda'},
        {'id': reverse_dataset_id, 'name': 'reverse', 'src': 'hda'},
    ]
}

List:Paired (multiple paired-end samples):

collection_description = {
    'collection_type': 'list:paired',
    'element_identifiers': [
        {
            'name': 'sample1',
            'collection_type': 'paired',
            'element_identifiers': [
                {'id': sample1_fwd, 'name': 'forward', 'src': 'hda'},
                {'id': sample1_rev, 'name': 'reverse', 'src': 'hda'},
            ]
        },
        {
            'name': 'sample2',
            'collection_type': 'paired',
            'element_identifiers': [
                {'id': sample2_fwd, 'name': 'forward', 'src': 'hda'},
                {'id': sample2_rev, 'name': 'reverse', 'src': 'hda'},
            ]
        }
    ]
}

Core Planemo Concepts

1. Planemo Command Structure

Basic syntax:

planemo run <workflow_file> <job_yaml> \
    --engine external_galaxy \
    --galaxy_url "https://usegalaxy.org" \
    --galaxy_user_key "your_api_key" \
    --history_name "My Analysis" \
    --test_output_json "invocation.json"

Common options:

  • --engine external_galaxy: Use external Galaxy server (not local)
  • --simultaneous_uploads: Upload all files simultaneously (faster but more resource-intensive)
  • --check_uploads_ok: Verify uploads completed successfully
  • --test_output_json: Save invocation details to JSON file

2. Job YAML Format

Example job.yml:

# Inputs
input_reads:
  class: File
  path: /path/to/reads.fastq.gz

# Collections
paired_reads:
  class: Collection
  collection_type: paired
  elements:
    - identifier: forward
      class: File
      path: /path/to/forward.fastq.gz
    - identifier: reverse
      class: File
      path: /path/to/reverse.fastq.gz

# Parameters
kmer_size: 21
coverage_threshold: 30

3. Generating Planemo Commands Programmatically

def build_planemo_command(workflow_path, job_yaml, galaxy_url, api_key,
                          history_name, output_json, log_file):
    """
    Build planemo run command.

    Security: Mask API key in display, but use full key in command.
    """
    command = (
        f'planemo run "{workflow_path}" "{job_yaml}" '
        f'--engine external_galaxy '
        f'--galaxy_url "{galaxy_url}" '
        f'--simultaneous_uploads '
        f'--check_uploads_ok '
        f'--galaxy_user_key "{api_key}" '
        f'--history_name "{history_name}" '
        f'--test_output_json "{output_json}" '
        f'> "{log_file}" 2>&1'
    )

    return command

Execute with error handling:

import os

return_code = os.system(planemo_command)

if return_code != 0:
    # Planemo failed - workflow was NOT launched in Galaxy
    # No invocation ID exists
    print(f"ERROR: Planemo failed with return code {return_code}")
    print(f"Check log: {log_file}")
    # DO NOT mark invocation as failed - it was never created
else:
    # Planemo succeeded - workflow launched
    # Invocation ID is in output JSON
    print(f"SUCCESS: Workflow launched")

CRITICAL: os.system() return codes are shifted by 8 bits:

  • Exit code 1 becomes return code 256
  • Exit code 2 becomes return code 512
  • To get actual exit code: actual_exit = return_code >> 8

4. Parsing Planemo Output

Extract invocation ID from JSON:

import json

def extract_invocation_id(output_json_path):
    """Extract invocation ID from planemo test output"""
    with open(output_json_path, 'r') as f:
        data = json.load(f)

    # Planemo output structure
    tests = data.get('tests', [])
    if tests and len(tests) > 0:
        test = tests[0]
        invocation_id = test['data'].get('invocation_id')
        return invocation_id

    return None

Common Automation Patterns

1. Thread-Safe Galaxy Operations

Use locks for concurrent API calls:

import threading

galaxy_lock = threading.Lock()

def thread_safe_invoke_workflow(gi, workflow_id, inputs, history_id):
    """Invoke workflow with thread safety"""
    with galaxy_lock:
        invocation = gi.workflows.invoke_workflow(
            workflow_id,
            inputs=inputs,
            history_id=history_id
        )
        return invocation['id']

Why: Galaxy API can have issues with concurrent uploads/operations from same API key.


2. Batch Processing Pattern

def process_samples_batch(gi, workflow_id, samples, max_concurrent=3):
    """
    Process multiple samples with concurrency limit.

    Args:
        gi: GalaxyInstance
        workflow_id: Workflow to run
        samples: List of sample dicts with 'name' and 'files'
        max_concurrent: Max parallel invocations
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed

    def process_one_sample(sample):
        # Create history
        history_id = get_or_find_history_id(gi, sample['name'])

        # Upload files
        dataset_ids = []
        for file_path in sample['files']:
            ds = gi.tools.upload_file(file_path, history_id)
            dataset_ids.append(ds['outputs'][0]['id'])

        # Invoke workflow
        inputs = {'0': {'id': dataset_ids[0], 'src': 'hda'}}
        invocation_id = thread_safe_invoke_workflow(
            gi, workflow_id, inputs, history_id
        )

        # Wait for completion
        state = wait_for_invocation(gi, invocation_id)

        return {
            'sample': sample['name'],
            'invocation_id': invocation_id,
            'state': state
        }

    # Process with limited concurrency
    with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
        futures = {executor.submit(process_one_sample, s): s for s in samples}

        results = []
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            print(f"Completed: {result['sample']} - {result['state']}")

        return results

3. Resume Capability Pattern

Track processed samples:

import json
import os

STATE_FILE = 'processing_state.json'

def load_state():
    """Load processing state"""
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE, 'r') as f:
            return json.load(f)
    return {'completed': [], 'failed': []}

def save_state(state):
    """Save processing state"""
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)

def process_with_resume(samples):
    """Process samples with resume capability"""
    state = load_state()

    for sample in samples:
        sample_name = sample['name']

        # Skip if already completed
        if sample_name in state['completed']:
            print(f"Skipping {sample_name} (already completed)")
            continue

        try:
            # Process sample
            result = process_one_sample(sample)

            if result['state'] == 'ok':
                state['completed'].append(sample_name)
            else:
                state['failed'].append(sample_name)

            save_state(state)

        except Exception as e:
            print(f"Error processing {sample_name}: {e}")
            state['failed'].append(sample_name)
            save_state(state)

Security Best Practices

1. API Key Management

Store in environment variables:

import os

api_key = os.environ.get('GALAXY_API_KEY')
if not api_key:
    raise ValueError("GALAXY_API_KEY environment variable not set")

gi = GalaxyInstance(url, api_key)

Mask in logs:

def mask_api_key(key):
    """Mask API key for display"""
    if len(key) <= 8:
        return '*' * len(key)
    return f"{key[:4]}{'*' * (len(key) - 8)}{key[-4:]}"

masked_key = mask_api_key(api_key)
print(f"Using API key: {masked_key}")

2. Path Handling

Always quote paths in shell commands:

# ✅ Good - handles spaces
command = f'planemo run "{workflow_path}" "{job_yaml}"'

# ❌ Bad - breaks with spaces
command = f'planemo run {workflow_path} {job_yaml}'

Debugging

1. Galaxy History Inspection

def inspect_history(gi, history_id):
    """Print detailed history information"""
    history = gi.histories.show_history(history_id)
    print(f"History: {history['name']} ({history['id']})")
    print(f"State: {history['state']}")

    contents = gi.histories.show_history(history_id, contents=True)

    for item in contents:
        print(f"  [{item['state']}] {item['name']} (type: {item['history_content_type']})")

2. Invocation Step Analysis

def analyze_failed_invocation(gi, invocation_id):
    """Analyze why invocation failed"""
    invocation = gi.invocations.show_invocation(
        invocation_id,
        include_workflow_steps=True
    )

    print(f"Invocation: {invocation_id}")
    print(f"State: {invocation['state']}")

    for step_id, step_data in invocation.get('steps', {}).items():
        step_state = step_data['state']
        job_id = step_data.get('job_id')

        if step_state == 'error':
            print(f"\nFailed Step {step_id}:")

            if job_id:
                job = gi.jobs.show_job(job_id)
                print(f"  Tool: {job.get('tool_id')}")
                print(f"  Exit code: {job.get('exit_code')}")
                print(f"  Stderr:\n{job.get('stderr', 'N/A')}")

Common Pitfalls

  1. Planemo failures vs Galaxy failures

    • Planemo return code != 0: Workflow was NOT launched, no invocation exists
    • Invocation state = 'failed': Workflow was launched but Galaxy job failed
    • Don't confuse these two failure modes
  2. Concurrent uploads

    • Too many simultaneous uploads can overwhelm Galaxy
    • Use max_concurrent limits (typically 3-5)
    • Consider --simultaneous_uploads vs sequential
  3. Dataset state checking

    • Don't invoke workflows before uploads complete
    • Always wait for dataset state = 'ok'
  4. History name conflicts

    • Use unique history names (add timestamps or suffixes)
    • Check for existing histories before creating
  5. Return code interpretation

    • os.system() shifts exit codes (exit 1 → return 256)
    • Use return_code >> 8 to get actual exit code
  6. Invocation ID recovery

    • Terminal disconnection loses invocation ID
    • Always save invocation IDs to file immediately
    • Use --test_output_json with planemo

Best Practices Summary

  1. ✅ Use environment variables for API keys
  2. ✅ Mask API keys in logs and output
  3. ✅ Quote all file paths in shell commands
  4. ✅ Implement thread-safety for concurrent operations
  5. ✅ Save state frequently for resume capability
  6. ✅ Wait for dataset uploads before invoking workflows
  7. ✅ Poll invocation status with reasonable intervals (30-60s)
  8. ✅ Distinguish planemo failures from Galaxy failures
  9. ✅ Implement proper error handling and retry logic
  10. ✅ Use unique history names to avoid conflicts

Related Skills

  • galaxy-tool-wrapping: For creating Galaxy tool wrappers
  • galaxy-workflow-development: For creating Galaxy workflows
  • vgp-pipeline: VGP-specific orchestration (uses this skill as dependency)

Resources

Weekly Installs
12
First Seen
Jan 24, 2026
Installed on
claude-code9
antigravity7
codex7
windsurf7
gemini-cli7
opencode7