process < name > { [ directives ] input: < process inputs > output: < process outputs > when: < condition > [script|shell|exec]: < user script to be executed > }
process simpleSum { input: val x exec: println "Hello Mr. $x" } workflow { def a = Channel.of('a', 'b', 'c') simpleSum(a) }
当进程只声明一个输入时,可以使用管道 | 操作符为进程提供输入,而不是将其作为参数传递。这两种方法具有相同的语义:
process simpleSum { input: val x exec: println "Hello Mr. $x" } workflow { Channel.of('a', 'b', 'c') | simpleSum }
如果没有明确使用->来定义参数列表,那么这个闭包就定义了一个隐式的参数,这个隐式参数名叫it。
def greeting = { println "Hello ${it}"} greeting('man') //结果:Hello man
def greeting = {it-> println "Hello ${it}"} greeting('man') //结果:Hello man
process foo{ publishDir "test_res", mode:'copy' input: val x output: path 'x.txt' """ echo $x > x.txt """ } workflow{ result = foo(1) result.view { "Result: ${it}" } }
(base) wy@master:~/workspace/nextflow$ /ssd2/application/nextflow/build/releases/nextflow-22.11.0-edge-all run a.nf N E X T F L O W ~ version 22.11.0-edge Launching `a.nf` [hungry_almeida] DSL2 - revision: 3f23c7e232 executor > local (1) [b7/64b38d] process > foo [100%] 1 of 1 ✔ Result: /data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt
result.view { "Result: ${it}" }详细解释
result.view { "Result: ${it}" }
def view(Closure closure){ def path = '/data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt' closure(path) } view{ "Result: ${it}" }
Result: Result: /data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt
process tupleExample { input: tuple val(x), val(reads) exec: println x+"---"+reads } workflow { Channel.of( [1, 'alpha'], [3, 'delta'] ) | tupleExample }
(base) wy@master:~/workspace/nextflow$ /ssd2/application/nextflow/build/releases/nextflow-22.11.0-edge-all run a1.nf N E X T F L O W ~ version 22.11.0-edge Launching `a1.nf` [agitated_ptolemy] DSL2 - revision: ed2d0d4960 executor > local (2) [cd/86f00a] process > tupleExample (1) [100%] 2 of 2 ✔ 3---delta 1---alpha
process INDEX { publishDir "output", mode:'copy' input: path transcriptome output: path 'index' script: """ salmon index --threads $task.cpus -t $transcriptome -i index """ stub: """ mkdir index touch index/seq.bin touch index/info.json touch index/refseq.bin """ } process QUANT { publishDir "output", mode:'copy' input: path index tuple val(pair_id), path(reads) output: path pair_id script: """ salmon quant --threads $task.cpus --libType=U -i $index -1 ${reads[0]} -2 ${reads[1]} -o $pair_id """ } workflow { INDEX("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_1_48850000_49020000.Ggal71.500bpflank.fa") read_pairs_ch = channel.fromFilePairs("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_{1,2}.fq", checkIfExists: true ) QUANT(INDEX.out, read_pairs_ch ) }
channel.fromFilePairs的输出结果是一个数组
channel.fromFilePairs
workflow { read_pairs_ch = channel.fromFilePairs( "/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_{1,2}.fq", checkIfExists: true ) read_pairs_ch.view() }
[ggal_gut, [/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_1.fq, /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_2.fq]]
workflow { INDEX("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_1_48850000_49020000.Ggal71.500bpflank.fa") read_pairs_ch = Channel.of(['ggal_gut', ['/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_1.fq',' /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_2.fq']] , ['ggal_liver', ['/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_liver_1.fq',' /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_liver_2.fq']]) QUANT(INDEX.out, read_pairs_ch ) }
nf run main.nf -profile test,docker --outdir output
------------------------------------------------------ ,--./,-. ___ __ __ __ ___ /,-._.--~' |\ | |__ __ / ` / \ |__) |__ } { | | | __, __/ | \ |___ `-._,-`-, `._,._,' nf-core/rnaseq v3.10.1 ------------------------------------------------------ Core Nextflow options runName : intergalactic_gates containerEngine : docker launchDir : /data/wangyang/nextflow/rnaseq-master workDir : /data/wangyang/nextflow/rnaseq-master/work projectDir : /data/wangyang/nextflow/rnaseq-master userName : wy profile : test,docker configFiles : /data/wangyang/nextflow/rnaseq-master/nextflow.config Input/output options input : https://raw.githubusercontent.com/nf-core/test-datasets/rnaseq/samplesheet/v3.10/samplesheet_test.csv outdir : output UMI options umitools_bc_pattern : NNNN Read filtering options bbsplit_fasta_list : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/bbsplit_fasta_list.txt skip_bbsplit : false Reference genome options fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genome.fasta gtf : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genes.gtf.gz gff : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genes.gff.gz transcript_fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/transcriptome.fasta additional_fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/gfp.fa.gz hisat2_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/hisat2.tar.gz rsem_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/rsem.tar.gz salmon_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/salmon.tar.gz Alignment options pseudo_aligner : salmon Institutional config options config_profile_name : Test profile config_profile_description: Minimal test dataset to check pipeline function Max job request options max_cpus : 2 max_memory : 6.GB max_time : 6.h !! Only displaying parameters that differ from the pipeline defaults !! ------------------------------------------------------ If you use nf-core/rnaseq for your analysis please cite: * The pipeline https://doi.org/10.5281/zenodo.1400710 * The nf-core framework https://doi.org/10.1038/s41587-020-0439-x * Software dependencies https://github.com/nf-core/rnaseq/blob/master/CITATIONS.md ------------------------------------------------------
gtf = WorkflowMain.getGenomeAttribute(params, 'gtf') ch_gtf = Channel.fromPath(gtf) ch_splicesites = HISAT2_EXTRACTSPLICESITES ( ch_gtf ).txt
DataflowBroadcast around DataflowStream[?]
gtf = WorkflowMain.getGenomeAttribute(params, 'gtf') ch_splicesites = HISAT2_EXTRACTSPLICESITES ( gtf ).txt
DataflowVariable(value=null)
ch_acc = Channel.fromPath( 'download.csv' ) .splitCsv(skip: 1) .map {it[1] } // ch_acc.view() println ch_acc println Channel .of( 'SRR8924749') .toList()
DataflowBroadcast around DataflowStream[?]DataflowVariable(value=null)
https://www.nextflow.io/blog/2021/5_tips_for_hpc_users.htmlhttps://training.seqera.io/#_your_first_scripthttps://www.nextflow.io/blog/2020/dsl2-is-here.html