Cromwell RNA-seq 分析

最后发布时间 : 2026-04-01 18:16:08 浏览量 :

下面给你一个 从零到可直接运行Cromwell + WDL RNA-seq 分析完整方案,包括:

  • 流程设计(Trimming → Alignment → Quantification → QC → 汇总)
  • 可直接运行的 WDL 工作流示例
  • Cromwell 运行命令
  • 输入 JSON 模板
  • 输出结构说明
  • 可扩展到 FastAPI/BRAVE 的方式(你当前平台可直接复用)

内容尽量靠近你之前开发 BRAVE/Nextflow 的逻辑。


⭐ 1. Cromwell 如何用于 RNA-seq 分析?

RNA-seq 是典型的 多步骤管线型任务,Cromwell(WDL)非常适合,因为:

  • 每个步骤是独立 task
  • 输入/输出自动连接
  • 支持 scatter(并行处理多个样本)
  • 支持 Docker(保证可复现)
  • Metadata API 可监控每个 step(你非常需要)
  • 支持本地、Slurm、K8s、Google、AWS 等后端

⭐ 2. RNA-seq WDL 标准流程结构

典型 RNA-seq 流程:

  1. FastQC(原始数据 QC)
  2. Trimming(如 TrimGalore/fastp,可选)
  3. 比对(STAR / HISAT2)
  4. 转录本定量(featureCounts / salmon / kallisto)
  5. 统计 QC/Summary(multiqc)

下面给你一个全流程、可直接用的 WDL。


⭐ 3. 可运行的 WDL:rnaseq_pipeline.wdl

支持:FASTQ → STAR → featureCounts → MultiQC
Docker 已配置,可直接运行

version 1.0

workflow RNASeqPipeline {
    input {
        Array[File] fastq_r1
        Array[File] fastq_r2
        File star_index_tar
        File gtf
        Int threads = 8
    }

    scatter(i in range(length(fastq_r1))) {
        call FastQC as RawQC {
            input:
                r1 = fastq_r1[i],
                r2 = fastq_r2[i]
        }

        call Trim {
            input:
                r1 = fastq_r1[i],
                r2 = fastq_r2[i]
        }

        call STARAlign {
            input:
                trimmed_r1 = Trim.trim_r1,
                trimmed_r2 = Trim.trim_r2,
                star_index_tar = star_index_tar,
                threads = threads
        }

        call FeatureCounts {
            input:
                bam = STARAlign.aligned_bam,
                gtf = gtf,
                threads = threads
        }

        call FastQC as MappedQC {
            input:
                r1 = STARAlign.aligned_bam
        }
    }

    call MultiQC {
        input:
            dirs = RawQC.qc_dir + MappedQC.qc_dir
    }

    output {
        Array[File] aligned_bam = STARAlign.aligned_bam
        Array[File] count_file = FeatureCounts.count_file
        File multiqc_report = MultiQC.report
    }
}

task FastQC {
    input {
        File r1
        File? r2
    }

    command <<<
        fastqc ${r1} ${if defined(r2) then r2 else ''}
    >>>

    output {
        File qc_dir = glob("*_fastqc.zip")[0]
    }

    runtime {
        docker: "biocontainers/fastqc:v0.12.1_cv8"
    }
}

task Trim {
    input:
        File r1
        File r2

    command <<<
        fastp \
            -i ${r1} -I ${r2} \
            -o trimmed_r1.fq.gz -O trimmed_r2.fq.gz \
            --detect_adapter_for_pe
    >>>

    output {
        File trim_r1 = "trimmed_r1.fq.gz"
        File trim_r2 = "trimmed_r2.fq.gz"
    }

    runtime {
        docker: "quay.io/biocontainers/fastp:0.23.4--h5f740d0_1"
    }
}

task STARAlign {
    input:
        File trimmed_r1
        File trimmed_r2
        File star_index_tar
        Int threads

    command <<<
        tar -xvf ${star_index_tar}
        INDEX=$(find . -type d -name "STARindex")
        
        STAR \
            --runThreadN ${threads} \
            --genomeDir $INDEX \
            --readFilesIn ${trimmed_r1} ${trimmed_r2} \
            --readFilesCommand zcat \
            --outSAMtype BAM SortedByCoordinate \
            --outFileNamePrefix out_
    >>>

    output {
        File aligned_bam = "out_Aligned.sortedByCoord.out.bam"
    }

    runtime {
        docker: "quay.io/biocontainers/star:2.7.10b--h9ee0642_0"
        memory: "20G"
    }
}

task FeatureCounts {
    input:
        File bam
        File gtf
        Int threads

    command <<<
        featureCounts \
            -T ${threads} \
            -a ${gtf} \
            -o counts.txt \
            ${bam}
    >>>

    output {
        File count_file = "counts.txt"
    }

    runtime {
        docker: "quay.io/biocontainers/subread:2.0.6--hed695b0_0"
    }
}

task MultiQC {
    input:
        Array[File] dirs

    command <<<
        multiqc ${sep=' ' dirs}
    >>>

    output {
        File report = "multiqc_report.html"
    }

    runtime {
        docker: "ewels/multiqc:1.19"
    }
}

⭐ 4. 输入 JSON 文件模板:rnaseq_input.json

{
  "RNASeqPipeline.fastq_r1": [
    "samples/S1_R1.fq.gz",
    "samples/S2_R1.fq.gz"
  ],
  "RNASeqPipeline.fastq_r2": [
    "samples/S1_R2.fq.gz",
    "samples/S2_R2.fq.gz"
  ],
  "RNASeqPipeline.star_index_tar": "ref/star_index.tar.gz",
  "RNASeqPipeline.gtf": "ref/genes.gtf",
  "RNASeqPipeline.threads": 8
}

⭐ 5. Cromwell 运行命令

java -jar cromwell.jar run rnaseq_pipeline.wdl -i rnaseq_input.json

如果你想与 FastAPI 后端集成(你现在常做的):

java -jar cromwell.jar server

然后用 REST API 调度:

POST /api/workflows/v1
Content-Type: multipart/form-data

⭐ 6. 输出文件结构(典型)

cromwell-executions/
  RNASeqPipeline/
    xxx/
      call-RawQC/
      call-Trim/
      call-STARAlign/
      call-FeatureCounts/
      call-MappedQC/
    multiqc/
      multiqc_report.html

你可以通过 Cromwell metadata API 获取:

  • 每个样本状态(Running/Done/Failed)
  • BAM 文件路径
  • count matrix 路径
  • 日志(stdout/stderr)
  • 开始结束时间

这也正好适用于你的 SSE 实时监控系统


⭐ 7. 如果你想要更多,我可以提供:

✔ 完整的 BRAVE → WDL 转换方案

你的 Nextflow 流程可以无损迁移到 WDL。

✔ Cromwell + FastAPI 调度系统模板

包括:

  • 提交任务
  • analysis_id 标识
  • SSE 推送 metadata 状态
  • 实时日志流式读取
  • 前端 React 显示 DAG/日志/状态

✔ 更高级的 RNA-seq pipeline

  • 支持 salmon/kallisto 无比对定量
  • DESeq2 差异分析(Rscript WDL task)
  • 合并 count matrix
  • 基因注释(biomaRt/annovar)