Kafka结合SparkStreaming开发
Apache Kafka 是一种分布式流式平台
Kafka基本搭建 :
Step1
kafka下载地址
wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
tar zxvf kafka_2.11-0.10.2.0.tgz
cd kafka_2.11-0.10.2.0
Step2: 启动Server
Kafka使用ZooKeeper,所以如果你没有一个ZooKeeper Server你需要首先去启动它。你可以通过一个脚本来获取一个快的单节点的ZooKeeper实例。
bin/zookeeper-server-start.sh config/zookeeper.properties
这时候你就可以启动Kafka server:
bin/kafka-server-start.sh config/server.properties
Step3: 创建一个话题Topic
下面我们创建一个名为test的topic,其 只有一个分区和复制
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
我们来看看我们创建的话题topic
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Step4 发送消息,生产者
Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.
比如在centos中发送消息:
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello
Step5 接收消息,消费者
Kafka also has a command line consumer that will dump out messages to standard output.
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello
以上就是Single Broker cluster,但是我们可以开发multi-broker
Step6 设置multi-broker cluster
So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).简单看就是在本地主机上设置三个node构成kafka的broker集群
首先我们需要给每个broker生成一个配置文件,简单讲就是复制一份啊
cd /home/kason/kafka/kafka_2.11-0.10.2.0/config/
su
cp server.properties server-1.properties
cp server.properties server-2.properties
现在编辑这些新创建的server-1,server-2文件,设置如下:
server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id是集群中的每个node的唯一并且持久的标识名字。同时我们为这台机器上的每个node设置了不同的端口监听以及日志路径进而进行区分
万事俱备,启动另外两个broker
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
现在我们创建一个分区三个复制的新的话题topic
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
但是如何知道每一个broker暗杀的呢 可以查看describe topics命令
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
我们解释解释上面的东西的含义:第一行给出分区总和,每一个额外的行给出一个分区的信息,由于我们这个topic只有一个分区因此只有一行
leader 是为指定的分区负责读写的节点node,它是随机选的
replicas 节点node list
isr 同步replicas
在这个broker集群中发送消息 生产者:
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
my test message 1
^[^T^[^T
hello world
hello kafka
接收消息 消费者:
[kason@kason kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
����
hello world
hello kafka
现在因为这是集群我们来测试测试它的容错性,根据上面我们知道Leader是node broker1,现在查到其进程号并手动杀死.。例子就不举了
Step7 使用Kafka来导入或者导出数据
Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data
Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.
首先我们手动创建一个文件并写入几个数据,test.txt要放在kafka目录下
echo -e "foo\nbar" > test.txt
然后我们启动两个连接器运行在standalone模式下(就是单独本地的进程),提供三个配置文件最为入参,第一个是Kafka Connect 进程的配置文件,包含一些普通的配置文件如Kafka brokers以及序列化数据的格式,剩下的配置文件每一个指定了要创建的connector, 这些文件包含一个独一无二的connector名字
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kafka启动图片:
这里写图片描述Spark Streaming
Spark Streaming Code
package com.scala.action.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by kason_zhang on 4/11/2017.
*/
object MyKafkaSparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyKafkaStreamingDemo").setMaster("local[3]")
val ssc = new StreamingContext(conf,Seconds(5))
val topicLines = KafkaUtils.createStream(ssc,"10.64.24.78:2181"
,"StreamKafkaGroupId",Map("spark" -> 1))
topicLines.map(_._2).flatMap(str => str.split(" ")).print()
ssc.start()
ssc.awaitTermination()
}
}
在这里有几点需要注意的地方
因为我没有在centos kafka server.properties里面设置
listeners = PLAINTEXT://your.host.name:9092,它将采用默认的listeners,这样的话host将获取centos的host名,但是我的SparkStreaming程序是在Windows中开发的,他不能识别host,所以需要在C盘的hosts文件里面加入10.64.24.78 kason让其能够识别host
同时还需要在centos中开放zookeeper的2181端口以及你的kafka的端口。
IDEA的结果如图:
这里写图片描述