在nextflow与Cromwell的架构中,如何分别用json表示scatter 动态创建执行节点

最后发布时间 : 2026-06-22 22:30:06 浏览量 :

这是个非常好的问题。

实际上如果你想设计 BRAVE 的 Runtime Engine,最值得参考的不是 DSL,而是:

Workflow Definition JSON
        ↓
Runtime Expansion
        ↓
TaskInstance JSON

Nextflow 和 Cromwell 在这里的区别非常明显。


1 Cromwell 模型

Workflow Definition

假设:

call Split

scatter (vcf in Split.vcfs) {
    call Annotate
}

对应 JSON 可以理解为:

{
  "nodes": [
    {
      "id": "split",
      "type": "Split"
    },
    {
      "id": "annotate",
      "type": "Annotate",
      "scatter": {
        "source": "split",
        "field": "vcfs"
      }
    }
  ]
}

注意:

Annotate

只有一个节点定义。


Workflow Start

初始化:

{
  "workflow_id": "wf1",
  "executions": [
    {
      "id": "split",
      "node_id": "split",
      "status": "READY"
    }
  ]
}

此时:

annotate不存在实例

Split 完成

输出:

{
  "vcfs": [
    "a.vcf",
    "b.vcf",
    "c.vcf"
  ]
}

Deferred Scatter Expansion

Runtime 创建:

{
  "executions": [
    {
      "id": "annotate:0",
      "node_id": "annotate",
      "shard_index": 0,
      "inputs": {
        "vcf": "a.vcf"
      }
    },
    {
      "id": "annotate:1",
      "node_id": "annotate",
      "shard_index": 1,
      "inputs": {
        "vcf": "b.vcf"
      }
    },
    {
      "id": "annotate:2",
      "node_id": "annotate",
      "shard_index": 2,
      "inputs": {
        "vcf": "c.vcf"
      }
    }
  ]
}

这就是 Cromwell 的:

ExecutionShard

模式。

特点:

Workflow DAG 不变

Annotate
    ↓
Annotate[0]
Annotate[1]
Annotate[2]

只是 ExecutionStore 多了记录。


2 Nextflow 模型

Nextflow 根本没有 Scatter 节点。


定义:

vcfs = Split()

Annotate(vcfs)

内部其实是:

Channel

Runtime

Split 输出:

{
  "channel": "vcfs",
  "events": [
    "a.vcf",
    "b.vcf",
    "c.vcf"
  ]
}

Scheduler 收到事件:

{
  "channel": "vcfs",
  "value": "a.vcf"
}

立即创建:

{
  "task_instance": {
    "id": "annotate#1",
    "process": "Annotate",
    "inputs": {
      "vcf": "a.vcf"
    }
  }
}

第二个事件:

{
  "channel": "vcfs",
  "value": "b.vcf"
}

创建:

{
  "task_instance": {
    "id": "annotate#2",
    "process": "Annotate",
    "inputs": {
      "vcf": "b.vcf"
    }
  }
}

第三个事件:

{
  "task_instance": {
    "id": "annotate#3",
    "process": "Annotate",
    "inputs": {
      "vcf": "c.vcf"
    }
  }
}

特点:

没有 scatter

没有 shard

只有数据流事件

即:

a.vcf
   ↓
Annotate#1

b.vcf
   ↓
Annotate#2

c.vcf
   ↓
Annotate#3

你的 BRAVE 应该参考哪个?

看你现在的 DAG:

{
  "scatter": {
      "field": "vcfs",
      "mode": "each"
  }
}

已经显式声明:

这是一个 Scatter Node

因此更接近:

Cromwell

模型。


我推荐的 BRAVE JSON

定义层:

{
  "node_id": "visualization",

  "scatter": {
    "field": "vcf",
    "mode": "each"
  }
}

运行层:

{
  "task_group_id": "visualization"
}

动态展开后:

{
  "task_instances": [
    {
      "instance_id": "visualization:0",
      "group_id": "visualization",
      "shard_index": 0,
      "inputs": {
        "vcf": "a.vcf"
      }
    },
    {
      "instance_id": "visualization:1",
      "group_id": "visualization",
      "shard_index": 1,
      "inputs": {
        "vcf": "b.vcf"
      }
    },
    {
      "instance_id": "visualization:2",
      "group_id": "visualization",
      "shard_index": 2,
      "inputs": {
        "vcf": "c.vcf"
      }
    }
  ]
}

Scheduler 维护:

{
  "group_id": "visualization",
  "total": 3,
  "success": 2
}

如果完全参考 Nextflow

那 JSON 会变成:

{
  "ports": {
    "vcfs": {
      "type": "stream"
    }
  }
}

运行时:

{
  "event": {
    "channel": "vcfs",
    "value": "a.vcf"
  }
}

收到一个 event 就创建一个 TaskInstance:

{
  "instance_id": "visualization#123",
  "inputs": {
    "vcf": "a.vcf"
  }
}

根本不需要:

{
  "scatter": {}
}

因为:

Stream -> Task
天然就是 Scatter

对于你的 BRAVE,我建议采用一种混合模式:

WorkflowNode
      ↓
TaskGroup
      ↓
TaskInstance

定义层保留:

{
  "scatter": {
      "field": "vcf",
      "mode": "each"
  }
}

方便 React Flow 展示。

运行层采用:

TaskGroup
    ↓
TaskInstance*

类似 Cromwell 的 shard。

而 Scheduler 采用 Nextflow 的事件驱动思想:

Output Produced
      ↓
Expand Scatter
      ↓
Create TaskInstance

这样既有可视化 DAG,又有 Nextflow 那种运行时动态扩展能力。