python自学

传染病传播模拟(Python实现)

2020-03-27  本文已影响0人  烂泥先生

COVID-19流行期间,很多博主提出了各种各样的传播模型,并利用数据可视化直观地展示了传染病传播情况,收到点赞无数。近期因为工作安排变化,最近时间比较充裕,自学了Python语言,并使用Python语言建立了一个传染病传播模型。此模型用到的技术有:

模型原理

我们建立一个社区(community),社区具有固定面积,在这个社区中随机分布固定的人口,初始时每个人都具有易感性,开始随机产生第一个感染者(零号感染者),他可以感染距离他固定距离内的所有人。当一个人被感染后,经历一段时间潜伏期,潜伏期内没有症状但具有传染性,当潜伏期过了后,立即住院隔离治疗一段时间,住院期间按轻症和重症不同,具有不同的死亡率,度过治疗时间后,存活者回归社区并获得永久免疫,视为治愈人群,死亡者退出社区。

模型分析

具体实现

1. 明确所需变量

#导入库
import math
import numpy as np
import matplotlib.pyplot as plt
#基本参数
itohday=3           #感染到住院天数
obday=14            #住院观察天数
ri=0.12             #感染范围
mildrate=0.96       #轻症比例
severerate=1-mildrate   #重症比例
milddeath=0.001     #轻症每日死亡率
severedeath=0.05    #重症每日死亡率
persons=10000       #社区总人口
days=100            #历史最大天数
#容器
community=[]        #社区人口
latent=[]           #潜伏期人群
hospital=[]         #住院人群
death=[]            #死亡人群

在以上代码中,我们可以通过调整基本变量改变模型参数,容器中保存时时人口分布列表

2. 建立person类

person类是本程序的关键类,包含了一个人的位置、状态信息以及各种状态的转变,该类的每一个实例就代表了一个人。我们现在来整理一下person类一个实例的生命周期:

#pos类作为保存person坐标的辅助类
class pos:
    x=0
    y=0
    def __init__(self):
        pass
    def setpos(self,x0,y0):
        self.x=x0
        self.y=y0
#person类
class person:
    p=pos() #位置坐标
    s=1     #是否易感,1=易感,0=免疫
    i=0     #是否患病/传染,1=患病,0=未患病
    c=0     #是否重症,1=重症,0=轻症,只有在i=1时有效
    ind=-1   #感染日
    add=-1   #住院日
    index=0 #索引号,调试时用
    #构造方法,idx为索引号,x0、y0为位置横纵坐标
    def __init__(self,idx,x0,y0):
        self.p=pos()
        self.index=idx
        self.p.setpos(x0,y0)
    #获得横坐标
    def getx(self):
        return self.p.x
    #获得纵坐标
    def gety(self):
        return self.p.y
    #求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
    def beinfected(self,day):
        if self.s==1:
            for item in latent:
                r=caldistance(self,item)
                if r<ri:
                    self.infected(day)
    #被感染
    def infected(self,day):
        if self.i==0:
            self.i=1
            self.s=0
            self.ind=day
            self.severity()
            latent.append(self)
    #病情分级:轻症、重症
    def severity(self):
        r=np.random.rand()
        if r<mildrate:
            self.c=0
        else:
            self.c=1
    #住院
    def hospital(self,day):
        self.add=day
        hospital.append(self)
        latent.remove(self)
        community.remove(self)
    #在院
    def inhospital(self,day):
        if self.c==0:
            if np.random.rand()<milddeath:
                self.die(day)
        else:
            if np.random.rand()<severedeath:
                self.die(day)
    #治愈出院,获得永久免疫
    def cured(self,day):
        self.i=0
        community.append(self)
        hospital.remove(self)
    #死亡
    def die(self,day):
        death.append(self)
        hospital.remove(self)
    #接口函数
    def tick(self,day):
        if self.i==1:   #若被感染
            d=day-self.ind
            if d<itohday:   #潜伏期
                pass
            elif d==itohday:   #住院日,潜伏期过
                self.hospital(day)
            elif d<obday+itohday:    #在住院
                self.inhospital(day)
            elif d==obday+itohday:    #出院日,被治愈
                self.cured(day)
            else:
                pass
        else:   #若没有被感染
            self.beinfected(day)
#计算两人之间的距离,参数为两person类实例,返回两者距离
def caldistance(p1,p2):
    x1=p1.p.x
    y1=p1.p.y
    x2=p2.p.x
    y2=p2.p.y
    dis=math.sqrt((x1-x2)**2+(y1-y2)**2)
    return dis

3.主程序

在主程序中有4个事件:

#产生一个社区,横纵坐标分布为标准正态分布扩大3倍
Xc=np.random.randn(persons)*3
Yc=np.random.randn(persons)*3
for i in range(persons):
    temp=person(i,Xc[i],Yc[i])
    community.append(temp)
#时间开始
day=0
#随机产生零号感染者
community[int(np.random.rand()*persons)].infected(day)
#绘制Day 0图像
plot()
#流行开始
while day<=days:
    day+=1
    #先处理在院人群
    for i in range(-len(hospital)+1,1):     #注意此处为从后往前处理列表中数据
        hospital[-i].tick(day)
    #再处理社区人群
    for i in range(-len(community)+1,1):    #注意此处为从后往前处理列表中数据
        community[-i].tick(day)
    #画图
    plot()
print("END")

需要注意的是处理在院人群和社区人群的两个for循环都是从列表后面向前处理的,这是因为如果从前往后按顺序处理,如果一个数据需要从列表中移除,则列表后面的变量就会前移,而计数变量i的值是不变的,因此可能漏掉列表当前位置后面一个实例的操作,从而引发错误。这是本程序编制过程中的一个坑。给我们的提示是,如果操作一个列表过程中,可能要对列表进行元素的插入append或删除remove操作,最好能从列表尾部向前进行,以免新增或新删除的元素影响到当前位置以后元素的索引值
这里还差一个plot()函数,我们不妨先做散点图再做另外三个统计图。

4.动态散点图绘制

def plot():
    plt.ion()   #打开动态模式
    X=[]        #横坐标列表
    Y=[]        #纵坐标列表
    C=[]        #散点颜色列表
    i=0
    for i in range(len(community)):
        X.append(community[i].getx())
        Y.append(community[i].gety())
        if community[i].i==0:
            if community[i].s==1:
                C.append('b')   #未感染者:蓝色
            else:
                C.append('k')   #治愈者:黑色
        else:
            C.append('r')       #潜伏期:红色
    plt.clf()   #清除图像
    plt.title("%d day" %(day))  #标题
    plt.scatter(X,Y,c=C,s=1)    #绘制散点图,s参数为散点大小,最小值为1
    plt.draw()      #绘图
    plt.pause(1)    #暂停

效果如下:


效果图

5.绘制统计图

为了绘制统计图,我们需要引入4个统计变量,用来保存每天的数据。

#统计数据
infectedpersonbyday=[]  #每日累计感染人数
admittedpersonbyday=[]  #每日现存在院人数
deadpersonbyday=[]      #每日累计死亡人数
curedpersonbyday=[]     #每日累计治愈人数

为此,我们需要修改person类成员函数的相关实现,同时需要在主程序开始时进行初始化。修改后的person类代码如下:

#person类
class person:
    p=pos() #位置坐标
    s=1     #是否易感,1=易感,0=免疫
    i=0     #是否患病/传染,1=患病,0=未患病
    c=0     #是否重症,1=重症,0=轻症,只有在i=1时有效
    ind=-1   #感染日
    add=-1   #住院日
    index=0 #索引号,调试时用
    #构造方法,idx为索引号,x0、y0为位置横纵坐标
    def __init__(self,idx,x0,y0):
        self.p=pos()
        self.index=idx
        self.p.setpos(x0,y0)
    #获得横坐标
    def getx(self):
        return self.p.x
    #获得纵坐标
    def gety(self):
        return self.p.y
    #求感染,未感染者计算与潜伏期患者的距离,若小于最大感染距离则被感染
    def beinfected(self,day):
        if self.s==1:
            for item in latent:
                r=caldistance(self,item)
                if r<ri:
                    self.infected(day)
    #被感染
    def infected(self,day):
        if self.i==0:
            self.i=1
            self.s=0
            self.ind=day
            self.severity()
            infectedpersonbyday[day]+=1      #统计变量
            latent.append(self)
    #病情分级:轻症、重症
    def severity(self):
        r=np.random.rand()
        if r<mildrate:
            self.c=0
        else:
            self.c=1
    #住院
    def hospital(self,day):
        self.add=day
        admittedpersonbyday[day]+=1      #统计变量
        hospital.append(self)
        latent.remove(self)
        community.remove(self)
    #在院
    def inhospital(self,day):
        if self.c==0:
            if np.random.rand()<milddeath:
                self.die(day)
        else:
            if np.random.rand()<severedeath:
                self.die(day)
    #治愈出院,获得永久免疫
    def cured(self,day):
        self.i=0
        curedpersonbyday[day]+=1         #统计变量
        admittedpersonbyday[day]-=1      #统计变量
        community.append(self)
        hospital.remove(self)
    #死亡
    def die(self,day):
        deadpersonbyday[day]+=1          #统计变量
        admittedpersonbyday[day]-=1      #统计变量
        death.append(self)
        hospital.remove(self)
    #接口函数
    def tick(self,day):
        if self.i==1:   #若被感染
            d=day-self.ind
            if d<itohday:   #潜伏期
                pass
            elif d==itohday:   #住院日,潜伏期过
                self.hospital(day)
            elif d<obday+itohday:    #在住院
                self.inhospital(day)
            elif d==obday+itohday:    #出院日,被治愈
                self.cured(day)
            else:
                pass
        else:   #若没有被感染
            self.beinfected(day)

主程序代码修改如下:

#产生一个社区,横纵坐标分布为标准正态分布扩大3倍
Xc=np.random.randn(persons)*3
Yc=np.random.randn(persons)*3
for i in range(persons):
    temp=person(i,Xc[i],Yc[i])
    community.append(temp)
#时间开始
day=0
#初始化统计变量
curedpersonbyday.append(0)
infectedpersonbyday.append(0)
admittedpersonbyday.append(0)
deadpersonbyday.append(0)
#随机产生零号感染者
community[int(np.random.rand()*persons)].infected(day)
#绘制Day 0图像
plots()
#流行开始
#la=1    #潜伏期人数
while day<=days:
    day+=1
    #统计变量列表增加新增1天
    curedpersonbyday.append(curedpersonbyday[day-1])
    infectedpersonbyday.append(infectedpersonbyday[day-1])
    admittedpersonbyday.append(admittedpersonbyday[day-1])
    deadpersonbyday.append(deadpersonbyday[day-1])
    #先处理在院人群
    for i in range(-len(hospital)+1,1):     #注意此处为从后往前处理列表中数据
        hospital[-i].tick(day)
    #再处理社区人群
    for i in range(-len(community)+1,1):    #注意此处为从后往前处理列表中数据
        community[-i].tick(day)
    
    #print((infectedpersonbyday[day]-infectedpersonbyday[day-1])/la)
    #la=len(latent)
    #画图
    plots()
print("END")

6.动态多图绘图代码

我们用plots()函数来同时绘制同一窗口显示四张图表,左上为人群分布散点图,蓝色为易感人群,红色为潜伏期人群,黑色为免疫人群(治愈人群);右上为感染率曲线;左下为住院人数曲线;右下为死亡人数曲线。

def plots():
    X=[]        #横坐标列表
    Y=[]        #纵坐标列表
    C=[]        #散点颜色列表
    Xp=np.arange(0,days)    #统计图横坐标
    for i in range(len(community)):
        X.append(community[i].getx())
        Y.append(community[i].gety())
        if community[i].i==0:
            if community[i].s==1:
                C.append('b')   #未感染者:蓝色
            else:
                C.append('k')   #治愈者:黑色
        else:
            C.append('r')       #潜伏期:红色
    plt.ion()       #打开动态模式
    plt.clf()       #清除图像
    #绘制散点图
    apen=plt.subplot(2,2,1)
    apen.set_title("Day %d" %(day))
    apen.scatter(X,Y,c=C,s=1)
    #绘制累计感染人数图
    bpen=plt.subplot(2,2,2)
    bpen.set_title("Infected %.2f%%" %(infectedpersonbyday[day]/persons*100))
    bpen.plot(Xp[0:day+1],infectedpersonbyday)
    #绘制实时在院人数图
    cpen=plt.subplot(2,2,3)
    cpen.set_title("In hospital %.2f%%" %(admittedpersonbyday[day]/persons*100))
    cpen.plot(Xp[0:day+1],admittedpersonbyday)
    #绘制累计死亡人数图
    dpen=plt.subplot(2,2,4)
    dpen.set_title("Death %.2f%%" %(deadpersonbyday[day]/persons*100))
    dpen.plot(Xp[0:day+1],deadpersonbyday)
    plt.pause(1)    #暂停
    plt.show()      #显示图像
    plt.savefig('Day %d.png'%(day))     #保存图像

7.调试参数

至此,全部代码完成,效果如下:


效果图(节选)

下面可以调节不同的初始参数来改变模型初始值。我们可以根据新闻或官方报道数据调整参数,使模型更具有可信度。

上一篇 下一篇

猜你喜欢

热点阅读