10.Spark学习(Python版本):使用Kafka数据源
Step1. Kafka的安装和准备
Apache Kafka 官方下载地址
注意:Kafka_2.11-0.10.1.0.tgz,前面的2.11就是支持的scala版本号,后面的0.10.1.0是Kafka自身的版本号。需留意Spark的scala版本是否与kafka匹配。
安装命令:
cd ~/下载
sudo tar -zxf kafka_2.11-0.10.1.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-0.10.1.0/ ./kafka
sudo chown -R mashu ./kafka
Step2. 启动Zookeeper服务
打开1个终端,输入以下命令启动Zookeeper:
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
注意,执行上面命令以后,终端窗口会返回一堆信息,Zookeeper服务器启动,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了。
Step3. 启动Kafka服务
打开第2个终端,输入以下命令启动Kafka:
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
Step4. 创建Kafka Topic
打开第3个终端,输入以下命令创建Topic:
cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
//这个topic叫wordsendertest,2181是zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在kafka集群中使用,这里单机版就不用备份了
//可以用list列出所有创建的topics,来查看上面创建的topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181
注意:删除Topic前需要把server.properties的delete.topic.enable=true
取消注释让它生效。然后重启zookeeper 和kafka,执行以下语句:
cd /usr/local/kafka
./bin/kafka-topics.sh --delete --topic wordsendertest --zookeeper localhost:2181
./bin/kafka-topics.sh --list --zookeeper localhost:2181
下面,我们需要用producer来产生一些数据,请在当前终端内继续输入下面命令:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest
Step5. Spark准备工作
打开第4个新的终端,然后启动pyspark:
cd /usr/local/spark
./bin/pyspark
Spark如果要使用Kafka,则需要下载spark-streaming-kafka-0-8_2.11相关jar包。
在Maven Repository下载
spark-streaming-kafka-0-8_2.11-2.1.0.jar文件,其中,2.11表示scala的版本,2.1.0表示Spark版本号(我用的是不支持Hive的预编译版本)。
Spark2.3版本中spark-streaming-kafka-0-10不支持python, 所以如果用在pyspark中就会报错。故而我这里使用Spark 2.1.0.

把spark-streaming-kafka-0-8_2.11-2.1.0.jar文件拷贝到了“/usr/local/spark/jars/kafka”目录下。请新打开第5个终端,输入下面命令:
cd /usr/local/spark/jars
mkdir kafka
cd ~/下载
cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/kafka
同时,修改spark目录下conf/spark-env.sh文件,修改该文件下面的SPARK_DIST_CLASSPATH变量。
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
Step6. 编写Spark程序使用Kafka数据源
cd /usr/local/spark/python_code
mkdir kafka && cd kafka
vim KafkaWordProducer.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
运行python ./KafkaWordProducer.py localhost:2181 wordsendertest
。
这时候 回到第3个终端,输入一些单词,第5个窗口中就已经在监听并且统计词频了。
