skills/gptomics/bioskills/bio-workflow-management-nextflow-pipelines

bio-workflow-management-nextflow-pipelines

SKILL.md

Nextflow Pipelines

Basic Pipeline Structure

// main.nf
nextflow.enable.dsl=2

params.reads = "data/*_{1,2}.fq.gz"
params.outdir = "results"

process FASTQC {
    input:
    tuple val(sample_id), path(reads)

    output:
    path("*.html"), emit: html
    path("*.zip"), emit: zip

    script:
    """
    fastqc ${reads}
    """
}

workflow {
    Channel.fromFilePairs(params.reads)
        | FASTQC
}

DSL2 Modules

// modules/fastqc.nf
process FASTQC {
    tag "${sample_id}"
    publishDir "${params.outdir}/qc", mode: 'copy'

    input:
    tuple val(sample_id), path(reads)

    output:
    tuple val(sample_id), path("*.html"), emit: html
    tuple val(sample_id), path("*.zip"), emit: zip

    script:
    """
    fastqc -t ${task.cpus} ${reads}
    """
}
// main.nf
include { FASTQC } from './modules/fastqc'
include { ALIGN } from './modules/align'

workflow {
    reads_ch = Channel.fromFilePairs(params.reads)
    FASTQC(reads_ch)
    ALIGN(reads_ch)
}

Config File

// nextflow.config
params {
    reads = "data/*_{1,2}.fq.gz"
    outdir = "results"
    genome = "ref/genome.fa"
}

process {
    cpus = 4
    memory = '8 GB'
    time = '2h'

    withName: 'ALIGN' {
        cpus = 16
        memory = '32 GB'
    }
}

profiles {
    docker {
        docker.enabled = true
    }
    singularity {
        singularity.enabled = true
    }
    slurm {
        process.executor = 'slurm'
    }
}

Container Support

process SALMON_QUANT {
    container 'quay.io/biocontainers/salmon:1.10.0--h7e5ed60_0'

    input:
    tuple val(sample_id), path(reads)
    path(index)

    output:
    tuple val(sample_id), path("${sample_id}"), emit: quant

    script:
    """
    salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \
        -o ${sample_id} --threads ${task.cpus}
    """
}

Channel Operations

// From file pairs
Channel.fromFilePairs("data/*_{1,2}.fq.gz")
    .set { reads_ch }

// From path
Channel.fromPath("data/*.bam")
    .map { file -> tuple(file.baseName, file) }
    .set { bam_ch }

// From samplesheet
Channel.fromPath(params.samplesheet)
    .splitCsv(header: true)
    .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2)) }
    .set { samples_ch }

// Combine channels
reads_ch.combine(reference_ch)

Subworkflows

// subworkflows/qc.nf
include { FASTQC } from '../modules/fastqc'
include { MULTIQC } from '../modules/multiqc'

workflow QC {
    take:
    reads

    main:
    FASTQC(reads)
    MULTIQC(FASTQC.out.zip.collect())

    emit:
    qc_report = MULTIQC.out.report
}
// main.nf
include { QC } from './subworkflows/qc'
include { ALIGN } from './subworkflows/align'

workflow {
    reads = Channel.fromFilePairs(params.reads)
    QC(reads)
    ALIGN(reads)
}

Cluster Execution

// nextflow.config for SLURM
process {
    executor = 'slurm'
    queue = 'normal'
    clusterOptions = '--account=myproject'

    withLabel: 'high_memory' {
        memory = '128 GB'
        queue = 'highmem'
    }
}

executor {
    name = 'slurm'
    queueSize = 100
    submitRateLimit = '10 sec'
}

AWS/Cloud Execution

// nextflow.config for AWS Batch
process {
    executor = 'awsbatch'
    queue = 'my-batch-queue'
}

aws {
    region = 'us-east-1'
    batch {
        cliPath = '/usr/local/bin/aws'
    }
}
# Run on AWS
nextflow run main.nf -profile awsbatch -bucket-dir s3://my-bucket/work

Resource Labels

process {
    withLabel: 'process_low' {
        cpus = 2
        memory = '4 GB'
        time = '1h'
    }
    withLabel: 'process_medium' {
        cpus = 8
        memory = '16 GB'
        time = '4h'
    }
    withLabel: 'process_high' {
        cpus = 16
        memory = '64 GB'
        time = '12h'
    }
}
process ALIGN {
    label 'process_high'
    // ...
}

Error Handling

process RISKY_PROCESS {
    errorStrategy 'retry'
    maxRetries 3
    memory { 8.GB * task.attempt }

    script:
    """
    memory_intensive_command
    """
}

process OPTIONAL_PROCESS {
    errorStrategy 'ignore'
    // ...
}

Caching and Resume

# Resume from last run
nextflow run main.nf -resume

# Clean work directory
nextflow clean -f

# Show execution trace
nextflow log

Complete RNA-seq Pipeline

nextflow.enable.dsl=2

params.reads = "data/*_{1,2}.fq.gz"
params.salmon_index = "ref/salmon_index"
params.outdir = "results"

process FASTP {
    tag "${sample_id}"
    publishDir "${params.outdir}/trimmed", mode: 'copy'

    input:
    tuple val(sample_id), path(reads)

    output:
    tuple val(sample_id), path("${sample_id}_{1,2}.trimmed.fq.gz"), emit: reads
    path("${sample_id}.json"), emit: json

    script:
    """
    fastp -i ${reads[0]} -I ${reads[1]} \
        -o ${sample_id}_1.trimmed.fq.gz -O ${sample_id}_2.trimmed.fq.gz \
        --json ${sample_id}.json --thread ${task.cpus}
    """
}

process SALMON_QUANT {
    tag "${sample_id}"
    publishDir "${params.outdir}/salmon", mode: 'copy'

    input:
    tuple val(sample_id), path(reads)
    path(index)

    output:
    tuple val(sample_id), path("${sample_id}"), emit: quant

    script:
    """
    salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \
        -o ${sample_id} --threads ${task.cpus}
    """
}

process MULTIQC {
    publishDir "${params.outdir}", mode: 'copy'

    input:
    path('*')

    output:
    path("multiqc_report.html")

    script:
    """
    multiqc .
    """
}

workflow {
    reads_ch = Channel.fromFilePairs(params.reads)
    index_ch = Channel.fromPath(params.salmon_index)

    FASTP(reads_ch)
    SALMON_QUANT(FASTP.out.reads, index_ch.first())

    qc_files = FASTP.out.json.collect()
        .mix(SALMON_QUANT.out.quant.collect())
    MULTIQC(qc_files.collect())
}

Related Skills

  • workflow-management/snakemake-workflows - Snakemake alternative
  • workflows/rnaseq-to-de - End-to-end RNA-seq
  • read-qc/fastp-workflow - QC processes
Weekly Installs
3
GitHub Stars
339
First Seen
Jan 24, 2026
Installed on
trae2
windsurf1
opencode1
codex1
claude-code1
antigravity1