程序员

【原创】用xapian打造一个文件“行”索引系统

2019-02-21  本文已影响0人  嗒嗒噜拉
#coding:utf8
#######################################################################################################
#                             自动索引系统 by DaDaLuLa                                                #
#######################################################################################################
#   程序功能:自动对txt,xls,xlsx,csv,sql,html格式的文本文件按行建立索引                                   #
#   程序要求:1.需索引的文件要放到一个固定的文件夹内,注意文件扩展名要符合要求                              #                        
#            2.文件编码方式须为utf8                                                                    #
#            3.文件名不能有中文                                                                        #
#   执行流程:实例化filesIndex对象-->从上次断点处继续索引-->遍历文件目录检查未索引的文件-->逐个建立索引      #
#   其他:    1.默认的日志文件为/var/log/auto_index.log(自动建立)                                       #
#            2.每个待索引文件的名字,md5值,断点位置等信息保存在mongodb数据库中                             #
#            3.索引文件所在目录默认为/var/lib/xapian/                                                  #
######################################################################################################

#!/bin/python
import os
import sys
import time
import datetime
import hashlib
import string
import re
import io
import logging
import pymongo
import xapian
import jieba
import chardet
#from multiprocessing import Pool
import types
from pyexcel_xls import XLBook
reload(sys)
sys.setdefaultencoding("utf-8")

SG_FILES    = "/var/lib/files/"                                 #存放待索引文件的目录
INDEX_DIR   = "/var/lib/xapian/"                                #存放索引文件的目录
LOG_FILE    = "/var/log/auto_index.log"                         #日志文件
POOL_SIZE   = 4                                                 #进程池大小

class filesIndex(object):
    def __init__(self):
        self.all_files = set()                             #存放遍历出来的所有文件
        self.need_indexed_files = set()
        self.bad_chars = [' ',',',u',','"',"'","(",")",u"(",u")","/","~","^","-",u".",u"\r",u"\n",u"\t","NULL","null",u'[',u']',u'{',u'}']   #结巴分词需要过滤的坏字符
    
        logging.basicConfig(
                    level = logging.INFO,
                                    format ='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                                    datefmt = '%a, %d %b %Y %H:%M:%S',
                                    filename = LOG_FILE,
                                    filemode = 'a'
                                   )
        self.log = logging                                 #日志模块
    
        #self.pool = Pool(processes = POOL_SIZE)           #多进程
    
        self.conn = pymongo.MongoClient("localhost",27017) #连接mongodb
        self.col = self.conn.sg_db.files                   #打开sg_db中的files集合

        self.ix_dir = INDEX_DIR                            #索引文件存放位置
        if not os.path.exists(self.ix_dir):
            os.mkdir(self.ix_dir)

        self.walkDir()                                 #遍历社工库文件夹中所有文件
        self.check_new_file()                          #检查所有待索引新文件
        self.init_index()
            
    #遍历待索引文件夹中所有文件
    def walkDir(self):
        for root,dirs,files in os.walk(SG_FILES,topdown = True):
            for name in files:
                #print name
                fname = os.path.join(root,name)
                ext = os.path.splitext(fname)[-1].strip('.').lower()
                if ext in ("txt","log","csv","xls","xlsx"):
                    self.all_files.add(fname)
                elif ext == "sql":
                    self.all_files.add(self.__sql2txt(fname))
                elif ext == "old":
                    pass
                else:
                    self.log.info(u"文件 "+fname+u" 格式无法处理")
    
    #sql格式转为txt
    def __sql2txt(self,fname):
        f = open("sql.sed","w")
        f.write("s/),(/)-\\n(/g\ns/);/)-\\n/g")
        f.close()
        self.log.info(u"文件 "+fname+u" 开始转化格式")
        fname_new = os.path.splitext(fname)[0]+"_sql.txt"
        fname_modify = os.path.splitext(fname)[0]+"_sql.old"
        os.system("cat "+fname+"| sed -f 'sql.sed' | grep ')-' | sed 's/)-/)/' > "+fname_new)
        os.rename(fname,fname_modify)   
        self.log.info(u"文件 "+fname+u" 转化格式完成")
        return fname_new

    #检查没有被索引的文件
    def check_new_file(self):
        old_files = set()
        for file_item in self.col.find():
            old_files.add(file_item["file_name"])
        new_files = self.all_files - old_files
        for f in new_files:
            print "[!]"+f 
            md5value = self.__md5_file(f)
            dup_file = self.col.find({"md5value":md5value})
            if dup_file.count():
                self.log.info(u"文件 "+f+u" 与文件 "+dup_file[0]["file_name"]+u" 系同一个文件")
                os.remove(f)
            else:
                charset = chardet.detect(open(f,"r").read(10000))["encoding"]
                self.col.insert_one({"file_name":f,"md5value":md5value,"charset":charset,"is_indexed":0,"line_pointer":0})
            
    #计算文件的md5值
    def __md5_file(self,file_name):
        m = hashlib.md5()   
        f = io.FileIO(file_name,'r')
        bytes = f.read(1024)
        while(bytes != b''):
            m.update(bytes)
            bytes = f.read(1024)
        f.close()
        md5value = m.hexdigest()
        return md5value
    
    #按文件类别找到索引入口
    def __indexFiles(self,fname,start = 0):
        ext = os.path.splitext(fname)[-1].strip('.')
        ext = ext.lower()
        if ext in ("txt","log","html","csv"):
            self.__index_txt(fname,start)
        elif ext in("xls","xlsx"):
            self.__index_excel(fname,start)
        else:
            self.log.info(u"文件"+fname+u"格式无法识别")    
    
    #针对txt,log,csv格式文件索引函数  
    def __index_txt(self,fname,start):
        f = open(fname,"r")
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        print "开始"  
        if start > 0:                                           #断点索引
            line_num = start
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
            while start > 0:                                #移动文件指针到断点位置
                f.readline()
                #print start
                start = start -1
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
            line_num = 0
            
        datas = f.readlines(20000000)                           #一次读取20M加快速度
        try:
            while datas:
                lines = 0
                for data in datas:
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        #print seg
                        doc.add_term(seg)
                    index_db.add_document(doc)
                    lines = lines + 1
                    #print lines
                    #index_db.flush()
                
                line_num = line_num + lines
                print line_num
                datas= f.readlines(20000000)
        
        except:
                line_num = line_num + lines
                f.close()
                print "[!]############" 
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                print "[!]***********"
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        f.close()
    
    #针对excel文件的处理函数     
    def __index_excel(self,fname,start):
        book = XLBook(fname)
        xls_data = dict(book.sheets())
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        line_num = 0
        if start > 0: 
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
        try:
            for sheet in xls_data:
                for row in xls_data[sheet]:
                    line_num = line_num + 1
                    if line_num < start:
                        continue
                    data = ""
                    for col in row:
                        if type(col) == float:
                            col = str(int(col))
                        elif type(col) == int or type(col) == datetime.datetime or type(col) == datetime.date:
                            col = str(col)
                        else:
                            col = col.encode("utf-8")
                        data = data+col+ ","
                    data = data.strip(",")
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        doc.add_term(seg)
                    index_db.add_document(doc)
        except:
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成")                    #执行到这一步说明索引成功完成
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        
    #索引断点处理程序 (保存断点位置,写入日志)
    def __index_exit(self,fname,line_num):
        #记录索引断点的位置
        self.col.update_one({"file_name":fname},{"$set":{"line_pointer":line_num}})

        self.log.info(u"文件"+fname+u"索引中断发生在第"+str(line_num)+u"行")
        sys.exit()
        
    #索引入口,判断索引模式(断点索引还是正常索引)
    def init_index(self):
        #断点索引
        for doc in self.col.find({"is_indexed":0,"line_pointer":{"$gt":0}}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],doc["line_pointer"]))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]    #数据标签
            self.__indexFiles(doc["file_name"],doc["line_pointer"])             
        #新文件索引      
        for doc in self.col.find({"is_indexed":0,"line_pointer":0}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]
            self.__indexFiles(doc["file_name"])
        #self.pool.close()
        #self.pool.join()       

    #数据检索接口     
    def search(self,keyword):
        db = xapian.Database()
        for d in os.listdir(self.ix_dir):
            print self.ix_dir+d
            db_d = xapian.Database(self.ix_dir+d)
            db.add_database(db_d)
        enquire =xapian.Enquire(db)
        query = xapian.Query(keyword)
        print keyword
        enquire.set_query(query)
        matches = enquire.get_mset(0,50)
        print matches.size()
        for m in matches:
            print "%i %i%% docid=%i [%s]" % (m.rank+1,m.percent,m.docid,m.document.get_data())  
    #i析构函数
    def __del__(self):
        self.conn.close()      #关闭mongodb连接

if __name__ == "__main__":
    reload(sys)
    sys.setdefaultencoding("utf8")
    INDEX = filesIndex()
    INDEX.walkDir()                                 #遍历待索引文件夹中所有文件
    INDEX.check_new_file()                          #检查所有待索引新文件
    INDEX.init_index()
    #start_time = time.time()
    #INDEX.search("123456")                          #数据检索测试
    #finsh_time = time.time()
    #print "[+] 检索耗时 "+str(finsh_time-start_time)

【原创文章,转载文章和文中代码请注明出处】

上一篇下一篇

猜你喜欢

热点阅读