NextFlow用法4--Channel (1)
Nextflow基于Dataflow编程模型,其中进程通过通道(channel )进行通信。
channel 有两个主要属性:
(1)发送消息是一种异步操作,它立即完成,而不必等待接收过程。
(2)接收数据是一种阻止操作,它会在消息到达之前停止接收过程。
1.1 channel 类型
Nextflow区分两种不同的channel :队列通道(queue channel) 和值通道(value channel)。
1.1.1 队列 channel
队列channel 是连接两个 processes or operators的非阻挡单向FIFO队列。
队列channel 通常使用方法(如 from ,fromPath 等)来创建,或者结合使用channel 运算符(如 map , flatMap 等)。
队列channel 也可以通过process 的输出声明使用into子句创建。
注意:
(1)相同的队列channel 不能2次和2次以上用作process输出,也不能2次和2次以上用作process输入。
(2) 如果需要将一个process的输出channel 连接到多个process or operator,使用into运算符创建这个channel 的两个(或更多)副本,并使用每个副本channel连接单独的process。
1.1.2 值 channel
根据定义,值channel (也称为单channel )是单个变量值绑定的,并且可以无限次读取和引用。
注意
值channel 的内容可以多次用于process的input。
使用 value factory method方法或返回单个值的运算符创建值channel ,例如 first ,last, collect,count,min,max,reduce,sum。
注意:
当在input的from 中指定简单值时,process 将隐式创建值channel 。此外,当一个process的inputs只是value channels时,也将隐式地为该channel创建一个output的value channel 。
例如:
process foo {
input:
val x from 1
output:
file 'x.txt' into result
""" echo $x > x.txt """}
上面代码段中的process 声明了一个input,是一个value channel。因此,result中的output的channel也是一个value channel,可以被多个process读取。
1.2 channel factory
channel 可以由process 的output声明隐式创建,也可以使用以下channel 工厂方法显式创建。
可用的方法是:
· create
· empty
· from
· fromPath
· fromSRA
· value
1.2.1 create
使用create方法创建新channel ,如下所示:
channelObj = Channel.create()
1.2.2 from
用 from方法创建channel,该channel可以输出from方法中指定的参数的内容,例如:
ch = Channel.from( 1, 3, 5, 7 )
ch.subscribe { println "value: $it" }
此示例中的第一行创建了一个包含channel 对象的ch变量。此channel 输出的值被指定为from方法的参数。因此第二行将打印以下内容:
value: 1
value: 3
value: 5
value: 7
以下示例显示如何从一系列数字或字符串创建channel :
zeroToNine = Channel.from( 0..9 )
strings = Channel.from( 'A'..'Z' )
注意:Note that when the from argument is an object implementing the (Java) Collection interface, the resulting channel emits the collection entries as individual emissions.
因此,以下两个声明生成的结果是一样的:
Channel.from( 1, 3, 5, 7, 9 )
Channel.from( [1, 3, 5, 7, 9] )
但是,当提供不止一个参数时,它们总是作为单个内容进行释放。因此,以下示例创建的是一个释放3次内容的channel ,每次的内容是包含两个元素的列表:
Channel.from( [1, 2], [5,6], [7,9] )
1.2.3 value
value 方法用于创建一个值channel。传入参数可以将channel与参数值绑定。例如:
expl1 = Channel.value()
expl2 = Channel.value( 'Hello there' )
expl3 = Channel.value( [1,2,3,4,5] )
示例中的第一行创建一个“空”变量。第二行创建一个channel 并将字符串绑定到channel 。最后一个创建一个channel 并将一个列表对象绑定到channel ,该list将作为一个整体进行释放。
1.2.4 fromPath
可以使用该fromPath方法创建channel,该channel可以发出一个或多个文件的路径,只需将路径字符串指定为参数。例如:
myFileChannel = Channel.fromPath( '/data/some/bigfile.txt' )
上面的行创建了一个channel ,并将一个文件的Path项绑定到该channel 。
注意:它不检查文件是否存在,如果文件不存在可能会出错。
每当fromPath参数包含一个*或?通配符时,它都被解释为glob路径匹配器。例如:
myFileChannel = Channel.fromPath( '/data/big/*.txt' )
此示例创建一个channel ,并释放 /data/big 路径下所有以 .txt 文件名结果的文件。
注意:
两个星号,即 ** 类似 * 但跨越目录边界。此语法通常用于匹配完整路径。圆括号指定子模式的集合。
例如:
files = Channel.fromPath( 'data/**.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )
第一行返回一个channel ,该channel 发出data文件夹中以后缀.fa结尾的文件,并递归其所有子文件夹。而第二个只释放 /data/ 所有子文件夹中以 .fa 结果的文件。最后一个示例释放两个文件:data/file_1.fq和data/file_2.fq。
注意:与在Linux Bash中一样,*通配符不匹配隐藏文件(即名称以.字符开头的文件)。
要包含隐藏文件,您需要使用句点字符启动模式或指定选项。例如:hidden: true
expl1 = Channel.fromPath( '/path/.*' )
expl2 = Channel.fromPath( '/path/.*.fa' )
expl3 = Channel.fromPath( '/path/*', hidden: true )
第一个示例返回指定路径中的所有隐藏文件。第二个返回以.fa后缀结尾的所有隐藏文件。最后,最后一个示例返回该路径中的所有文件(隐藏和非隐藏)。
默认情况下,glob模式仅查找与指定条件匹配的常规文件路径,即它不会返回目录路径。
您可以使用type指定值的参数file,dir或者any定义所需的路径类型。例如:
myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' )
第一个示例将返回以b为后缀结尾的所有目录路径,而第二个示例将返回以a前缀开头的任何文件和目录。
Name 描述
glob 当true时,可以把 *,?,[]和{}作为通配符,否则处理它们的正常字符(默认值:true)
type 待返回路径的类型,file,dir或any(默认值:file)
hidden 当true时可以查找隐藏文件(默认值:false)
maxDepth 要访问的最大目录级别数(默认值:无限制)
followLinks 当true它可以在目录树的遍历链接,否则会被作为文件处理(默认:true)
relative 当true时,返回的路径是相对于最顶层的公共目录的相对路径(默认值:false)
checkIfExists 当true时,如果指定路径不在文件中,会抛出异常(默认值:false)
注意:可以使用列表作为参数指定多个路径的glob模式:
Channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )
1.2.5 fromFilePairs
该fromFilePairs方法创建的channel,可以释放与用户提供的glob模式匹配的文件对的元组对象。在该元组中,第一个元素是匹配文件对的共有名称,第二个元素是文件列表(按字典顺序排序)。例如:
Channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq')
.println()
它将产生类似于以下的输出:
[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]
注意:glob模式必须至少包含一个星形通配符。
1.2.6 fromSRA
该fromSRA方法查询NCBI SRA数据库,并返回符合指定标准(即项目或登录号)的FASTQ文件的channel 。例如:
Channel
.fromSRA('SRP043510')
.println()
#结果:
[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)
可以使用列表对象指定多个ID:
ids = ['ERR908507', 'ERR908506', 'ERR908505']Channel
.fromSRA(ids)
.println()
#结果:
[ERR908507,
[ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz,
ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
[ERR908506,
[ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz,
ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
[ERR908505,
[ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz,
ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]
可用参数表:
Name : 描述
apiKey : NCBI用户API密钥。
cache : 启用/禁用缓存API请求(默认值:true)
max : 可以重试的最大条目数(默认值:无限制)。
要访问NCBI搜索服务,应提供NCBI API密钥:
(1)使用apiKey可选参数,例如。Channel.fromSRA(ids, apiKey:'0123456789abcdef')
(2)在您的环境中导出变量NCBI_API_KEY,例如。export NCBI_API_KEY=0123456789abcdef
注意:此功能需要Nextflow版本19.04.0或更高版本。
1.2.7 watchPath
该watchPath方法监视文件夹以查找与指定模式匹配的一个或多个文件。只要存在满足指定条件的文件,就会通过该watchPath 方法返回的channel 发出该文件。监视条件可以通过使用*或?通配符来指定,即通过指定glob路径匹配条件。
例如:
Channel
.watchPath( '/path/*.fa' )
.subscribe { println "Fasta file: $it" }
默认情况下,它仅监视在指定文件夹中创建的新文件。此外,可以提供第二个参数来指定要监视的事件。支持的事件是:
Name 描述
create 创建了一个新文件(默认)
modify 文件被修改
delete 文件被删除
可以使用逗号分隔的字符串指定多个这些事件,如下所示:
Channel
.watchPath( '/path/*.fa', 'create,modify' )
.subscribe { println "File created or modified: $it" }
警告:该watchPath方法无休止地执行匹配符合指定模式(一个或多个)的文件。因此,无论何时在脚本中使用它,生成的管道都将永远不会完成。
1.2.8 empty
根据empty定义,该方法创建一个空channel ,不发出任何值。