大数据:用ApacheKafka和Python来实时提取新冠数据

2020-05-07  本文已影响0人  Detian_e8ab

Apache Kafka 是分布式的流处理平台, 能够发布消息和订阅消息, 并且能够以容错的持久的方式来存储记录数据流, 作为大数据生态的重要组成部分, Apache Kafka主要应用在构建实时的流数据管道,在系统和应用间得到可靠的数据, 并且能够构建转换或响应实时数据流的应用。这里通过用一个小demo展示如何使用 Apache Kafka producer和consumer 来实时发布和订阅数据。

数据的来源是https://covid19api.com/。网站提供完全免费的rest api 新冠数据。如通过以下的Api call 可以获得如下的json.

(https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-03-01T00:00:00Z&to=2020-04-01T00:00:00Z)
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 130,
    "Status": "confirmed",
    "Date": "2020-03-01T00:00:00Z"
  },
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 159,
    "Status": "confirmed",
    "Date": "2020-03-02T00:00:00Z"
  },
  {
    "Country": "Germany",
    "CountryCode": "DE",
    "Province": "",
    "City": "",
    "CityCode": "",
    "Lat": "51.17",
    "Lon": "10.45",
    "Cases": 196,
    "Status": "confirmed",
    "Date": "2020-03-03T00:00:00Z"
  }


在开始数据的发布和订阅之前,首先要开始Kafka 服务。代码如下

(base) cloud_user@yin2c:~$ sudo systemctl start confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-kafka
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-kafka

之后查看kafka broker是否在运行。

这样Kafka就设置好了,下一步要创建一个话题topic

kafka-topics --bootstrap-server localhost:9092 --create --topic py --partitions 1 --replication-factor 1

接下来用python 来创建消息发布者和订阅者。消息的来源是新冠数据, 通过api call来获取数据, 是德国从4月20号以来每天的现存病例数量, 先创建一个发布者实例, 设置好服务器,然后通过loop 把得到的json数据字典中的每天的病例数量发布到topic 里面。当启动发布者之后, 订阅者就会逐行打印得到的信息。

from kafka import KafkaProducer
from json import loads
import json
import requests
from time import sleep

#list of all data from first date
#URL = "https://api.covid19api.com/total/dayone/country/germany/status/confirmed"
URL ="https://api.covid19api.com/live/country/germany/status/confirmed/date/2020-04-20T13:13:30Z"
req = requests.get(url = URL)
data = req.json()
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for i in range (len(data)):
    file = data[i]
    sleep(1)
    producer.send('py', value=str(file["Date"].split("T")[0])+':'+str(file["Active"]))
   

消息的订阅者很简单就是一个监听topic 的订阅者。首先开始订阅者, 由于还没有消息发布, 所以没有信息。当发布者启动之后, 就可以看到信息被逐行打印出来。


image.png
image.png

代码可以通过我的github 分叉:https://github.com/dtdetianyin/ApacheKafka/tree/master/Corona19%20Data%20processed%20with%20ApacheKafka%20and%20Python
_

上一篇 下一篇

猜你喜欢

热点阅读