从参考基因组快速读取指定序列
2022-01-12 本文已影响0人
茄子_0937
介绍
faidx 是FASTA 和 FASTQ 文件的随机读取索引。
描述
fai index 一般以文件名增加.fai 后缀,作为文件的名称。
fasta 文件 fai index 有5列,fastq 文件会多一列质量信息。
fai index 是纯文本文件,以Tab键分割;
列名 | 描述 |
---|---|
Name | 参考序列的名字,一般是">"后面,空格前的值 |
Length | 这条参考序列的总碱基数量 |
Offset | 这条参考序列的首个碱基距离文件头的偏离值 |
LineBases | 每一行中的碱基数量 |
LineWidth | 这一行的总共字节数(一般比碱基数多一个换行符) |
QualOffset | 这条序列首个质量信息距离文件头的距离值 |
实际展示
image.png如图,hs37d5.fa 文件第一行展示,染色体号为1;对应的fai 文件中标注,1 号 染色体全长 249250621 个碱基,第一个碱基距离头部52个字节,每一行60个碱基,每一行61个字节。
由此我们可以计算,249250621/60 得到 4154177 余1 。所以第 4154179 行是第一个染色体的最后一个碱基所在,下一行是染色体2的开端。
代码实现
python
import os,sys
class chunkParser:
def __init__(self,chunk) -> None:
self.chunk = chunk
def seq(self):
new_chunck = ''
n = 0
m = 0
for i in self.chunk: #这里应该是按字节遍历读取的内存
m += 1
if i == 10: # this is \\n(LF)
n += 1
print(m,";",end="")
continue
new_chunck += chr(i) # 读取的内存默认是8位的数字,要转化成对应的ascii
return new_chunck
class fastaReader:
def __init__(self,fasta) -> None:
if os.path.exists(fasta):
self.file = open(fasta,'rb')
self.index_dic = fastaReader.getIndex(fasta)
else:
raise FileNotFoundError
@staticmethod
def getIndex(fasta):
if os.path.exists(fasta+".fai"):
index_dic = {}
with open(fasta+".fai",'r') as f:
for i in f:
t = i.strip().split('\t')
index_dic[t[0]] = map(lambda x : int(x),t[1:])
return index_dic
else:
raise FileNotFoundError
# 根据index 快速定位到文件的位置
def fech(self,chrom,start,end):
start -= 1
end -= 1 # 索引是以0位开始计算碱基位,即 pos 是0 时对应第一个碱基,但是IGV 中,以及使用习惯上,人们以 1 为第一个碱基。
assert chrom in self.index_dic, "chrom is not in fai index!\n"
seq_len,offset,line_base,line_byte = self.index_dic[chrom]
assert start >= 0 ,"start need bigger than 0 \n"
assert start <= end , "start is bigger than end!\n"
assert seq_len >= end, "end is bigger than seq length!\n"
chunk_size = end - start + 1
lines = int(start/line_base)
pos = start % line_base
shift = offset + lines*line_byte + pos
chunk_size = chunk_size + int((chunk_size+pos-1)/line_base) # 补充pos位,以计算跨越的行,加上每一行的LF
self.file.seek(shift)
chunk = self.file.read(chunk_size)
return chunkParser(chunk)
def close(self):
self.file.close()
if __name__ == "__main__":
from pyfaidx import Faidx
if len(sys.argv) < 3:
print(sys.argv[0],"reference 1:10-100")
sys.exit(1)
fasta = fastaReader(sys.argv[1])
chrom,pos = sys.argv[2].split(':')
start,end = map(lambda x : int(x) ,pos.split('-'))
seq = fasta.fech(chrom,start,end).seq()
fasta.close()
refer = Faidx(sys.argv[1])
seq2 = refer.fetch(chrom,start,end).seq
print("### from my won func")
print(seq)
print("### from pyfaidx ")
print(seq2)
C++
#include <string>
#include <iostream>
#include <fstream>
#include <map>
#include<vector>
#include<sstream>
#include<typeinfo>
using namespace std;
class ChunkParse
{
public:
ChunkParse(char* chunk_in,int len):
chunk{chunk_in},length{len}{}
~ChunkParse(){ delete[] chunk;}
string seq();
private:
char* chunk;
int length;
};
class fastqReader
{
public:
fastqReader(string fasta_name)
:fasta{fasta_name}{
index_dic = get_index(fasta_name);
}
~fastqReader(){fasta.close();}
ChunkParse fetch(string chrom,int start,int end);
private:
ifstream fasta;
map<string,vector<int>> index_dic;
map<string,vector<int>> static get_index(string& fasta_name);
};
map<string,vector<int>> fastqReader::get_index(string& fasta_name){
ifstream fai {fasta_name + ".fai",ifstream::binary};
string chrom;
int seq_len,offset,line_bases,line_bytes;
map<string,vector<int>> index_dic;
for(;fai >> chrom >> seq_len >> offset >> line_bases >> line_bytes;)
index_dic[chrom] = vector<int>{seq_len,offset,line_bases,line_bytes};
fai.close();
return index_dic;
}
/*
def fech(self,chrom,start,end):
start -= 1
end -= 1 # 索引是以0位开始计算碱基位,即 pos 是0 时对应第一个碱基,但是IGV 中,以及使用习惯上,人们以 1 为第一个碱基。
assert chrom in self.index_dic, "chrom is not in fai index!\n"
seq_len,offset,line_base,line_byte = self.index_dic[chrom]
assert start >= 0 ,"start need bigger than 0 \n"
assert start <= end , "start is bigger than end!\n"
assert seq_len >= end, "end is bigger than seq length!\n"
chunk_size = end - start + 1
lines = int(start/line_base)
pos = start % line_base
shift = offset + lines*line_byte + pos
chunk_size = chunk_size + int((chunk_size+pos)/line_byte) # 补充pos位,以计算跨越的行,加上每一行的LF
self.file.seek(shift)
chunk = self.file.read(chunk_size)
return chunkParser(chunk)
*/
ChunkParse fastqReader::fetch(string chrom,int start,int end)
{
int seq_len,offset,line_bases,line_bytes;
vector<int> v = index_dic[chrom];
seq_len = v[0];
offset = v[1];
line_bases = v[2];
line_bytes = v[3];
start--;
end --;
int chunk_size = end - start + 1;
int lines = start/line_bases;
int pos = start % line_bases;
int shift = offset + lines*line_bytes + pos;
chunk_size = chunk_size + (chunk_size+pos-1)/line_bases;
fasta.seekg(shift);
char* buffer = new char[chunk_size];
fasta.read(buffer,chunk_size);
return ChunkParse(buffer,chunk_size);
}
string ChunkParse::seq()
{
string s;
for(int i=0;i<length;i++)
{
if(chunk[i] != '\n') s += chunk[I];
}
return s;
}
// for string delimiter
vector<string> split (string s, string delimiter) {
size_t pos_start = 0, pos_end, delim_len = delimiter.length();
string token;
vector<string> res;
while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
token = s.substr (pos_start, pos_end - pos_start);
pos_start = pos_end + delim_len;
res.push_back (token);
}
res.push_back (s.substr (pos_start));
return res;
}
int main(int argc,char *argv[])
{
if(argc < 3){
cerr << argv[0] << " reference 1:1-100\n";
return -1;
}
vector<string> regin_v = split(argv[2],":");
string chrome = regin_v[0];
stringstream ss{regin_v[1]};
int start,end;
char delimter;
ss >> start >> delimter >> end;
fastqReader fasta {argv[1]};
string seq = fasta.fetch(chrome,start,end).seq();
cout << seq;
}