python 提升生活和办公效率

让繁琐的工作自动化

2019-06-09  本文已影响0人  随想的翅膀

背景

最近琢磨着对全网流量的协议进行一次统计分析,主要是从网络五元素(源地址、源端口、目的地址、目的端口、协议号)来分析服务器开放了哪些服务,以此为参考对防火墙策略进行调整。数据分析流程概括起来就是:数据读写、处理计算、分析建模,可视化。为求分析的准确性,需要对网络上的流量进行一段时间的采样,例如每间隔10分钟采集一次会话表,按此频率采集2-3天,然后把采集到的文件进行合并,对合并后的数据进行筛选和清洗,最后把处理好的数据导入数据库分析。这些繁琐的工作如果交给人工去做显然不太现实,自动化和大数据的分析当然是Python语言的强项。
工欲善其事,必先利其器,首先下载python编译器,安装jupyter--python的交互式编程环境,导入SSH运维库paramiko,excel操作库openpyxl,大数据分析库pandas,剩下的工作就是如何利用这些利器。

自动化的实现

1.从防火墙采集数据,输出到外部文件。

import paramiko,time,sys,shelve
#模块功能:采集数据,输出文件
#创建SSH对象
def ssh_con(ip,port,username,password):
    ssh=paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(ip,port,username,password)
    shell=ssh.invoke_shell()
    return shell
#创建SSHShell对象
def exec_cmd(shell,cmd,period=10):
    shell.send(cmd)
    time.sleep(period)
    result=shell.recv(3000000)
    result=result.decode('utf-8')
    result1=result
#shell每次接收到的最大缓存是2097152
    while len(result1)==2097152:
        result1=shell.recv(3000000)
        print('缓存不够,继续读取exec_cmd: '+str(len(result1)))
        time.sleep(4)
        result=result+result1.decode('utf-8')
    return result
def main():
    connStr=shelve.open('connStr')
    ip,port,username,password,command=connStr['parameter']
    connStr.close()
    shell=ssh_con(ip,port,username,password)
    exec_cmd(shell,"enable\n")
    result=exec_cmd(shell,password+'\n')
    print(result)
    i=0
    while True:
        i+=1
#判断ssh会话是否退出,如果退出重新建立会话
        if shell.exit_status_ready():
            shell = ssh_con(ip, port, username, password)
            exec_cmd(shell, "enable\n")
            exec_cmd(shell, password + '\n')
        if len(sys.argv)>1:
            result=exec_cmd(shell,"show conn \n",int(sys.argv[1]))
        else:
            result = exec_cmd(shell, command)
        print("session%s.txt总长度:"%i+str(len(result))+'bytes')
        file = open("d:\\output_conn\\sessions%s.txt" % i, 'w')
        file.write(result)
        file.close()
if __name__=="__main__":
    main()

2.文件合并,经过几天的数据采集,产生了上百个文本文件,利用多线程把这几百个文件合并成一个excel文件,在插入excel的过程中需要用到正则表达式对数据进行格式化处理。

import openpyxl,os,re,sys,time,threading
#模块功能:数据预处理
#多线程读写文件
def session_process(session):
    ipRegex=re.compile(r'\d+\.\d+\.\d+\.\d+')
    singleSession=session.split(' ')
    newSession=''
    i=0
    for field in singleSession:
        if ipRegex.match(field)!=None:
            field=re.sub(':',' ',field)
        newSession+=field+' ' 
    newSession=re.sub(',','',newSession) 
    return newSession.split(' ')
def loadFile(fileName):
    print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}开始读取时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
    proRegex=re.compile(r'^TCP|^UDP')
    session=[]
    file=open(path+fileName)
    rows=0
    for line in file.readlines():
        proMo=proRegex.search(line)
        if proMo is not None:
            session=session_process(line)
        rows+=1
        lock.acquire()
        ws.append(session)
        lock.release()
    file.close()
    print(f"线程名称:{threading.current_thread().name}文件名称:{fileName}读取{rows}行,读取结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
#创建一个线程类
class loadFileThread(threading.Thread):
    def __init__(self,target,args):
        super().__init__()
        self.target=target
        self.args=args
    def run(self):
        self.target(*self.args)
#创建一个线程锁,由于excel文件写入不能并行写入,所以在写入excel的过程中启用线程锁
lock=threading.Lock()
path="f:\\sessions\\"  
wb=openpyxl.Workbook()
ws=wb.active
def main():
    print(f"主线程开始时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
    startTime=time.time()
    thread_list=[] 
    for fileName in os.listdir(path):
        t=loadFileThread(target=loadFile,args=(fileName,))
#         t=threading.Thread(target=loadFile,args=(fileName,))
        thread_list.append(t)
    for thread in thread_list:
          thread.start()
    for thread in thread_list:
          thread.join()
    wb.save("f:\\sessions\\output.xlsx")
    print(f"主线程结束时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")   
if __name__=='__main__':
    main()

在这种I/O密集型的任务中,采用多线程进行并发操作,读写文件的时间相当于读写最长文件的耗时,程序的执行效率瞬间有了质的提升

线程名称:Thread-66文件名称:sessions59.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-67文件名称:sessions60.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-68文件名称:sessions61.txt开始读取时间:2019-06-06 16:58:30
线程名称:Thread-55文件名称:sessions48.txt读取5390行,读取结束时间:2019-06-06 16:58:40
线程名称:Thread-57文件名称:sessions50.txt读取5058行,读取结束时间:2019-06-06 16:58:40
线程名称:Thread-59文件名称:sessions52.txt读取6050行,读取结束时间:2019-06-06 16:58:41

3.利用pandas对海量的数据(637906条数据)进行分析仅需几秒的时间,最后把分析好的数据输出到excel文件,整个流程结束,除了采集数据需要几天的时间,分析数据,计算处理,数据建模只花了几分钟的时间。

import pandas as pd
import os
import time
#模块功能:分析数据,处理计算,数据建模
#1.数据处理:打开数据文件
print(f'数据分析开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
with pd.ExcelFile('H:\\output_0606\\output1.xlsx') as xlsx1:
    df1=pd.read_excel(xlsx1)
with pd.ExcelFile('H:\\output_0606\\output2.xlsx') as xlsx2:
    df2=pd.read_excel(xlsx2)
with pd.ExcelFile('H:\\output_0606\\output3.xlsx') as xlsx3:
    df3=pd.read_excel(xlsx3)
#1.数据处理:合并数据文件
frames=[df1,df2,df3]
result=pd.concat(frames)
#数据清洗:移除数据头和尾的空字符
for column in result.columns:
    result[column] =[str(value).strip() for value in result[column]] 
#数据建模:按照源IP,源端口,目的IP,目的端口统计建模
portList=[]
portFile=open('h:\\output\\trustPortList.txt')
portList=portFile.read().split('\n')   
reDstPortList=[]
reSrcPortList=[]
for port in portList:
    reDstPortList.append(result[result['dst_port'] == port])
    reSrcPortList.append(result[result['src_port'] == port])
reDstPort=pd.concat(reDstPortList)
reSrcPort=pd.concat(reSrcPortList)
reDstPort.groupby(['src_area','src_ip','dst_ip','dst_port',]).count().flag.to_excel('H:\\output_0606\\0604_0606Dst.xlsx')
reSrcPort.groupby(['src_area','src_ip','src_port','dst_ip',]).count().flag.to_excel('H:\\output_0606\\0604_0606Src.xlsx')
print(f'数据分析结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')

结语

本文通过一个具体的案例实践介绍了一些python的用法,由于作者水平有限,代码并非最优的解决方案,但有一点Python让我更加关注问题的解决方案而非语言本身。本文作者是搞运维的,所以更在乎的是如何利用python提高工作效率,实现自动化运维。

上一篇 下一篇

猜你喜欢

热点阅读