nextflow kafka 消息订阅、监控
学习资料
watchTopic
include { watchTopic } from 'plugin/nf-kafka'
ch = channel.watchTopic('my-topic')
.subscribe { println "new message received ${it[0]} = ${it[1]}" }
.until{ it[1] == 'done' }
watchPath
Channel
.watchPath( '/path/*.fa' )
.subscribe { println "Fasta file: $it" }
subscribe
Channel
.of( 1, 2, 3 )
.subscribe onNext: { println it }, onComplete: { println 'Done' }