nextflow基本概念
process < name > {
[ directives ]
input:
< process inputs >
output:
< process outputs >
when:
< condition >
[script|shell|exec]:
< user script to be executed >
}
process simpleSum {
input:
val x
exec:
println "Hello Mr. $x"
}
workflow {
def a = Channel.of('a', 'b', 'c')
simpleSum(a)
}
当进程只声明一个输入时,可以使用管道 | 操作符为进程提供输入,而不是将其作为参数传递。这两种方法具有相同的语义:
process simpleSum {
input:
val x
exec:
println "Hello Mr. $x"
}
workflow {
Channel.of('a', 'b', 'c') | simpleSum
}
groovy 语法
如果没有明确使用->来定义参数列表,那么这个闭包就定义了一个隐式的参数,这个隐式参数名叫it。
def greeting = { println "Hello ${it}"}
greeting('man') //结果:Hello man
def greeting = {it-> println "Hello ${it}"}
greeting('man') //结果:Hello man
一个输入一个输出的channel
process foo{
publishDir "test_res", mode:'copy'
input:
val x
output:
path 'x.txt'
"""
echo $x > x.txt
"""
}
workflow{
result = foo(1)
result.view { "Result: ${it}" }
}
(base) wy@master:~/workspace/nextflow$ /ssd2/application/nextflow/build/releases/nextflow-22.11.0-edge-all run a.nf
N E X T F L O W ~ version 22.11.0-edge
Launching `a.nf` [hungry_almeida] DSL2 - revision: 3f23c7e232
executor > local (1)
[b7/64b38d] process > foo [100%] 1 of 1 ✔
Result: /data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt
result.view { "Result: ${it}" }
详细解释
def view(Closure closure){
def path = '/data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt'
closure(path)
}
view{
"Result: ${it}"
}
Result: Result: /data/wangyang/nextflow/work/b7/64b38d3d335a522e01ffcbf4bb4142/x.txt
输入类型为tuple
process tupleExample {
input:
tuple val(x), val(reads)
exec:
println x+"---"+reads
}
workflow {
Channel.of( [1, 'alpha'], [3, 'delta'] ) | tupleExample
}
(base) wy@master:~/workspace/nextflow$ /ssd2/application/nextflow/build/releases/nextflow-22.11.0-edge-all run a1.nf
N E X T F L O W ~ version 22.11.0-edge
Launching `a1.nf` [agitated_ptolemy] DSL2 - revision: ed2d0d4960
executor > local (2)
[cd/86f00a] process > tupleExample (1) [100%] 2 of 2 ✔
3---delta
1---alpha
处理RNA-seq数据
process INDEX {
publishDir "output", mode:'copy'
input:
path transcriptome
output:
path 'index'
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i index
"""
stub:
"""
mkdir index
touch index/seq.bin
touch index/info.json
touch index/refseq.bin
"""
}
process QUANT {
publishDir "output", mode:'copy'
input:
path index
tuple val(pair_id), path(reads)
output:
path pair_id
script:
"""
salmon quant --threads $task.cpus --libType=U -i $index -1 ${reads[0]} -2 ${reads[1]} -o $pair_id
"""
}
workflow {
INDEX("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_1_48850000_49020000.Ggal71.500bpflank.fa")
read_pairs_ch = channel.fromFilePairs("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_{1,2}.fq", checkIfExists: true )
QUANT(INDEX.out, read_pairs_ch )
}
channel.fromFilePairs
的输出结果是一个数组
workflow {
read_pairs_ch = channel.fromFilePairs( "/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_{1,2}.fq", checkIfExists: true )
read_pairs_ch.view()
}
[ggal_gut, [/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_1.fq, /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_2.fq]]
workflow {
INDEX("/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_1_48850000_49020000.Ggal71.500bpflank.fa")
read_pairs_ch = Channel.of(['ggal_gut', ['/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_1.fq',' /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_gut_2.fq']] ,
['ggal_liver', ['/ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_liver_1.fq',' /ssd1/wy/workspace/nextflow/rnaseq-nf/data/ggal/ggal_liver_2.fq']])
QUANT(INDEX.out, read_pairs_ch )
}
nf-core rnaseq
nf run main.nf -profile test,docker --outdir output
output
------------------------------------------------------ ,--./,-. ___ __ __ __ ___ /,-._.--~' |\ | |__ __ / ` / \ |__) |__ } { | | | __, __/ | \ |___ `-._,-`-, `._,._,' nf-core/rnaseq v3.10.1 ------------------------------------------------------ Core Nextflow options runName : intergalactic_gates containerEngine : docker launchDir : /data/wangyang/nextflow/rnaseq-master workDir : /data/wangyang/nextflow/rnaseq-master/work projectDir : /data/wangyang/nextflow/rnaseq-master userName : wy profile : test,docker configFiles : /data/wangyang/nextflow/rnaseq-master/nextflow.config Input/output options input : https://raw.githubusercontent.com/nf-core/test-datasets/rnaseq/samplesheet/v3.10/samplesheet_test.csv outdir : output UMI options umitools_bc_pattern : NNNN Read filtering options bbsplit_fasta_list : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/bbsplit_fasta_list.txt skip_bbsplit : false Reference genome options fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genome.fasta gtf : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genes.gtf.gz gff : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/genes.gff.gz transcript_fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/transcriptome.fasta additional_fasta : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/gfp.fa.gz hisat2_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/hisat2.tar.gz rsem_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/rsem.tar.gz salmon_index : https://github.com/nf-core/test-datasets/raw/rnaseq/reference/salmon.tar.gz Alignment options pseudo_aligner : salmon Institutional config options config_profile_name : Test profile config_profile_description: Minimal test dataset to check pipeline function Max job request options max_cpus : 2 max_memory : 6.GB max_time : 6.h !! Only displaying parameters that differ from the pipeline defaults !! ------------------------------------------------------ If you use nf-core/rnaseq for your analysis please cite: * The pipeline https://doi.org/10.5281/zenodo.1400710 * The nf-core framework https://doi.org/10.1038/s41587-020-0439-x * Software dependencies https://github.com/nf-core/rnaseq/blob/master/CITATIONS.md ------------------------------------------------------
gtf = WorkflowMain.getGenomeAttribute(params, 'gtf')
ch_gtf = Channel.fromPath(gtf)
ch_splicesites = HISAT2_EXTRACTSPLICESITES ( ch_gtf ).txt
DataflowBroadcast around DataflowStream[?]
gtf = WorkflowMain.getGenomeAttribute(params, 'gtf')
ch_splicesites = HISAT2_EXTRACTSPLICESITES ( gtf ).txt
DataflowVariable(value=null)
ch_acc = Channel.fromPath( 'download.csv' )
.splitCsv(skip: 1)
.map {it[1] }
// ch_acc.view()
println ch_acc
println Channel
.of( 'SRR8924749')
.toList()
DataflowBroadcast around DataflowStream[?]
DataflowVariable(value=null)
https://www.nextflow.io/blog/2021/5_tips_for_hpc_users.html
https://training.seqera.io/#_your_first_script
https://www.nextflow.io/blog/2020/dsl2-is-here.html