Skip to content

nextflow kafka 消息订阅、监控

学习资料

watchTopic

include { watchTopic } from 'plugin/nf-kafka'

ch = channel.watchTopic('my-topic')
    .subscribe { println "new message received ${it[0]} = ${it[1]}" }
    .until{ it[1] == 'done' }

watchPath

Channel
    .watchPath( '/path/*.fa' )
    .subscribe { println "Fasta file: $it" }

subscribe

Channel
    .of( 1, 2, 3 )
    .subscribe onNext: { println it }, onComplete: { println 'Done' }