2020年美国新冠肺炎疫情数据分析案例总结

2020-08-05  本文已影响0人  胖波波玻璃球

本案例出自于厦门大学数据库实验室,原采用的方法是PySpark, 在此基础之上,我们通过spark-sql、zeppelin及可视化的方式加以改进。

一 数据集说明

数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至今(2020-05-19)的相关数据。数据字典如下:

字段名称          字段含义                             例子
date        日期                            2020/1/21;2020/1/22;etc
county      区县(州的下一级单位)               Snohomish;
state       州                                  Washington
cases       截止该日期该区县的累计确诊人数        1,2,3…
deaths      截止该日期该区县的累计死亡人数        1,2,3…

二 使用Zeppelin对数据进行分析

1 导入数据并注册为临时表

把数据存放到HDFS文件系统中,用到的方法是:

./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

这里,我们可以把文件放到本地(本地指Linux)或者上传到hdfs皆可,为了表现不同,我们从本地直接导入文件,代码如下:

import org.apache.spark.sql.types._
## 定义schema
val fields = Array(StructField("date", StringType,true), 
                   StructField("county", StringType,true), 
                   StructField("state", StringType,true), 
                   StructField("cases", IntegerType,true), 
                   StructField("deaths", IntegerType,true))      
val schema = StructType(fields)
val file = ""file:///home..." ## 本地使用file:///
val df = spark.read.option("header", "true").schema(schema).csv(file)
df.show(5)
# 注册为临时表
df.createOrReplaceTempView("usinfo")

2 计算每日的累计确诊病例数和死亡数

我们这里直接运用了spark-sql的方法,代码如下:

import org.apache.spark.sql.functions._
%sql
select date, sum(cases) as `累积确诊`, sum(deaths) as `累积死亡`
from usinfo
group by date
order by date asc

结果导入HDFS数据库存储,方法如下:

val df1 = result1.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.printSchema
df1.repartition(1).write.json("file:///.../result1.json") 

具体结果如下图:


累积.png

3 计算每日较昨日的新增确诊病例数和死亡病例数

这里注意,我们需要把上面的结果再注册一下临时表,方便下面使用spark-sql,在sql中我们使用了自连接的方法,最后查询的结果保存了到HDFS上,如下:

## 注册为临时表供下一步使用
df1.createOrReplaceTempView("ustotal")
## sql自连接
val df2 = spark.sql("""
        select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease 
        from ustotal t1,ustotal t2 
        where t1.date = date_add(t2.date,1)
    """)
# 再保存到HDFS
df2.repartition(1).write.json("file:///.../result2.json"

结果如下图:


新增.png

4 统计截止5.19日 美国各州的累计确诊人数和死亡人数

操作类似:

val df3 = spark.sql("""
             select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate 
             from usinfo  
             where date = '2020-05-19'
             group by date,state
           """)
# 再保存到HDFS
df3.repartition(1).write.json("file:///.../result3.json"

展示如下:


累计人数.png

5 统计截止5.19全美各州的病死率

val df4 = spark.sql("""
        select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate 
        from eachStateInfo 
        group by date 
        union 
        select 2 as sign,date,state,deathRate from eachStateInfo
    """)
# 再保存到HDFS
df4.repartition(1).write.json("file:///.../result4.json"

展示结果:


各州死亡率.png
上一篇下一篇

猜你喜欢

热点阅读