基于NW全局比对算法制作的引物查询工具

2021-09-11  本文已影响0人  无话_

写在前面

首先,此工具关于比对算法源码部分来自https://zhuanlan.zhihu.com/p/78027062,感谢知乎作者 我不是大熊 的开源分享。
本人对算法的理解比较浅薄(上班也一直用不上......学校学的点啥都不剩了),此工具只是对其源码的简单修改与封装,想要详细了解算法原理,可移步该作者的知乎专栏。

处理情景

工作中,有时会出现这种尴尬的情况,已有Fastq文件,即使已经知道了扩展区域,却能确定是所用的是啥引物(同一区域引物种类很多),有时甚至不知道是什么区域。客户只有一句话:反正是在你们公司测的序.....
公司测序所有的引物加起来上百种,肉眼看?去除引物慢慢试?整合所有引物信息后直接拿来与想知道引物的Fastq文件比对显然直接有效的多。

工具源码

import sys
import gzip

degenerate = {
    'A': ["A"],
    'T': ["T"],
    'C': ["C"],
    'G': ["G"],
    'R': ["A", "G"],
    'Y': ["C", "T"],
    'M': ["A", "C"],
    'K': ["G", "T"],
    'S': ["G", "C"],
    'W': ["A", "T"],
    'H': ["A", "T", "C"],
    'B': ["G", "T", "C"],
    'V': ["G", "A", "C"],
    'D': ["G", "A", "T"],
    'N': ["A", "T", "C", "G"]
}

def theta(a, b): # degenerate bases in b
    if a == '-' or b == '-' :   # gap or mismatch
        return -1
    elif not degenerate.get(b):          # invalid character
        return -2
    elif a not in degenerate.get(b):     # not match
        return -2
    elif a in degenerate.get(b):         # match 
        return 2

def make_score_matrix(seq1, seq2):
    """
    return score matrix and map(each score from which direction)
    0: diagnosis
    1: up
    2: left
    """
    seq1 = '-' + seq1
    seq2 = '-' + seq2
    score_mat = {}
    trace_mat = {}
    max_score=0
    for i,p in enumerate(seq1):
        score_mat[i] = {}
        trace_mat[i] = {}
        
        for j,q in enumerate(seq2):
            if i == 0:                    # first row, gap in seq1
                score_mat[i][j] = -j
                trace_mat[i][j] = 1
                continue
            if j == 0:                    # first column, gap in seq2
                score_mat[i][j] = -i
                trace_mat[i][j] = 2
                continue
            ul = score_mat[i-1][j-1] + theta(p, q)     # from up-left, mark 0
            l  = score_mat[i][j-1]   + theta('-', q)   # from left, mark 1, gap in seq1
            u  = score_mat[i-1][j]   + theta(p, '-')   # from up, mark 2, gap in seq2
            picked = max([ul,l,u])
            score_mat[i][j] = picked
            trace_mat[i][j] = [ul, l, u].index(picked)   # record which direction
            if (picked > max_score):
                max_score=picked
    return score_mat, trace_mat,max_score

def traceback(seq1, seq2, trace_mat):
    '''
    find one optimal traceback path from trace matrix, return path code
    -!- CAUTIOUS: if multiple equally possible path exits, only return one of them -!-
    '''
    seq1, seq2 = '-' + seq1, '-' + seq2
    i, j = len(seq1) - 1, len(seq2) - 1
    path_code = ''
    while i > 0 or j > 0:
        direction = trace_mat[i][j]
        if direction == 0:                    # from up-left direction
            i = i-1
            j = j-1
            path_code = '0' + path_code
        elif direction == 1:                  # from left
            j = j-1
            path_code = '1' + path_code
        elif direction == 2:                  # from up
            i = i-1
            path_code = '2' + path_code
    return path_code

def print_m(seq1, seq2, m):
    """print score matrix or trace matrix"""
    seq1 = '-' + seq1; seq2 = '-' + seq2
    print()
    print(' '.join(['%3s' % i for i in ' '+seq2]))
    for i, p in enumerate(seq1):
        line = [p] + [m[i][j] for j in range(len(seq2))]
        print(' '.join(['%3s' % i for i in line]))
    print()
    return

def pretty_print_align(seq1, seq2, path_code):
    '''
    return pair alignment result string from
    path code: 0 for match, 1 for gap in seq1, 2 for gap in seq2
    '''
    align1 = ''
    middle = ''
    align2 = ''
    for p in path_code:
        if p == '0':
            align1 = align1 + seq1[0]
            align2 = align2 + seq2[0]
            if not degenerate.get(seq2[0]):
                middle = middle + ' '
            elif seq1[0] in degenerate.get(seq2[0]):
                middle = middle + '|'
            else:
                middle = middle + ' '
            seq1 = seq1[1:]
            seq2 = seq2[1:]
        elif p == '1':
            align1 = align1 + '-'
            align2 = align2 + seq2[0]
            middle = middle + ' '
            seq2 = seq2[1:]
        elif p == '2':
            align1 = align1 + seq1[0]
            align2 = align2 + '-'
            middle = middle + ' '
            seq1 = seq1[1:]

    print('Alignment:\n\n   ' + align1 + '\n   ' + middle + '\n   ' + align2 + '\n')
    return

def usage():
    print('Usage:\n\tpython nwAligner.py xx.fq.gz xx.fasta\n')
    return

def main():
    try:
        fq_gz,fasta = map(str, sys.argv[1:3])
        with gzip.open(fq_gz,'rt') as fz:
            i = 0
            for line in fz :
                i += 1
                if i == 2 :
                    seq1 = line[0:28]
                    break       
        seq2s={}
        with open (fasta,'rt') as fa :
            for line in fa:
                if line.startswith(">"):
                    name = line
                    seq2s[name] = ''
                else:
                    seq2s[name]+=line   
    except:
        usage()
        return
    dir_sq_score_path={} #字典
    for name,seq2 in seq2s.items() :
        score_mat , trace_mat , max_score = make_score_matrix(seq1, seq2)
        path_code = traceback(seq1, seq2, trace_mat)
        dir_sq_score_path[name]=[path_code,seq2,max_score]

    sortdssp=sorted(dir_sq_score_path.items(),reverse=True,key = lambda kv:(kv[1][2]))[0:4]#字典按最大得分排序,取前五
    #sortdssp成了list
    for x in sortdssp:
        print (x[0]+" max score "+str(x[1][2]))
        pretty_print_align(seq1,x[1][1], x[1][0])
    print("source code of blast from https://zhuanlan.zhihu.com/p/78027062")
if __name__ == '__main__':
    main()

使用方法:

第0步:准备引物序列的fasta文件

如: image.png
使用与结果: image.png

基本原理

其实原理很简单,就是将Fastq文件中的第一条序列的前28个bp截取,与fasta文件中所有的引物序列比对打分,最终输出得分前五的引物序列(注意:比对时未考虑反向互补序列情况)。
值得注意的是由于引物中有简并碱基存在,故打分时不能简单地判断相等,这点尤其要感谢B哥的改良。

上一篇 下一篇

猜你喜欢

热点阅读