Github开源生信云平台 DEMO
这是个非常好的问题。
实际上如果你想设计 BRAVE 的 Runtime Engine,最值得参考的不是 DSL,而是:
Workflow Definition JSON ↓ Runtime Expansion ↓ TaskInstance JSON
Nextflow 和 Cromwell 在这里的区别非常明显。
假设:
call Split scatter (vcf in Split.vcfs) { call Annotate }
对应 JSON 可以理解为:
{ "nodes": [ { "id": "split", "type": "Split" }, { "id": "annotate", "type": "Annotate", "scatter": { "source": "split", "field": "vcfs" } } ] }
注意:
Annotate
只有一个节点定义。
初始化:
{ "workflow_id": "wf1", "executions": [ { "id": "split", "node_id": "split", "status": "READY" } ] }
此时:
annotate不存在实例
输出:
{ "vcfs": [ "a.vcf", "b.vcf", "c.vcf" ] }
Runtime 创建:
{ "executions": [ { "id": "annotate:0", "node_id": "annotate", "shard_index": 0, "inputs": { "vcf": "a.vcf" } }, { "id": "annotate:1", "node_id": "annotate", "shard_index": 1, "inputs": { "vcf": "b.vcf" } }, { "id": "annotate:2", "node_id": "annotate", "shard_index": 2, "inputs": { "vcf": "c.vcf" } } ] }
这就是 Cromwell 的:
ExecutionShard
模式。
特点:
Workflow DAG 不变 Annotate ↓ Annotate[0] Annotate[1] Annotate[2]
只是 ExecutionStore 多了记录。
Nextflow 根本没有 Scatter 节点。
定义:
vcfs = Split() Annotate(vcfs)
内部其实是:
Channel
Split 输出:
{ "channel": "vcfs", "events": [ "a.vcf", "b.vcf", "c.vcf" ] }
Scheduler 收到事件:
{ "channel": "vcfs", "value": "a.vcf" }
立即创建:
{ "task_instance": { "id": "annotate#1", "process": "Annotate", "inputs": { "vcf": "a.vcf" } } }
第二个事件:
{ "channel": "vcfs", "value": "b.vcf" }
创建:
{ "task_instance": { "id": "annotate#2", "process": "Annotate", "inputs": { "vcf": "b.vcf" } } }
第三个事件:
{ "task_instance": { "id": "annotate#3", "process": "Annotate", "inputs": { "vcf": "c.vcf" } } }
没有 scatter 没有 shard 只有数据流事件
即:
a.vcf ↓ Annotate#1 b.vcf ↓ Annotate#2 c.vcf ↓ Annotate#3
看你现在的 DAG:
{ "scatter": { "field": "vcfs", "mode": "each" } }
已经显式声明:
这是一个 Scatter Node
因此更接近:
Cromwell
模型。
定义层:
{ "node_id": "visualization", "scatter": { "field": "vcf", "mode": "each" } }
运行层:
{ "task_group_id": "visualization" }
动态展开后:
{ "task_instances": [ { "instance_id": "visualization:0", "group_id": "visualization", "shard_index": 0, "inputs": { "vcf": "a.vcf" } }, { "instance_id": "visualization:1", "group_id": "visualization", "shard_index": 1, "inputs": { "vcf": "b.vcf" } }, { "instance_id": "visualization:2", "group_id": "visualization", "shard_index": 2, "inputs": { "vcf": "c.vcf" } } ] }
Scheduler 维护:
{ "group_id": "visualization", "total": 3, "success": 2 }
那 JSON 会变成:
{ "ports": { "vcfs": { "type": "stream" } } }
运行时:
{ "event": { "channel": "vcfs", "value": "a.vcf" } }
收到一个 event 就创建一个 TaskInstance:
{ "instance_id": "visualization#123", "inputs": { "vcf": "a.vcf" } }
根本不需要:
{ "scatter": {} }
因为:
Stream -> Task 天然就是 Scatter
对于你的 BRAVE,我建议采用一种混合模式:
WorkflowNode ↓ TaskGroup ↓ TaskInstance
定义层保留:
{ "scatter": { "field": "vcf", "mode": "each" } }
方便 React Flow 展示。
运行层采用:
TaskGroup ↓ TaskInstance*
类似 Cromwell 的 shard。
而 Scheduler 采用 Nextflow 的事件驱动思想:
Output Produced ↓ Expand Scatter ↓ Create TaskInstance
这样既有可视化 DAG,又有 Nextflow 那种运行时动态扩展能力。