分布式流式计算-Kafka部署
Kafka是一个高性能的流式消息队列,适用于大数据场景下的消息传输、消息处理和消息存储。在学习过程中,我们通常使用部署单节点,或通过Docker部署,生成环境一般使用多Broker组成的集群。
1.单节点部署
1.1 二进制包部署
首先,从官网下载http://kafka.apache.org/点击下载,选kafka_2.11-2.1.0.tgz下载到本地,然后解压。
在启动kafka之前,需要首先启动zookeeper,zookeeper可以是单节点,也可以是多节点。kafka自带了一个zookeeper,可以通过下面的命令启动:
bin/zookeeper-server-start.sh config/zookeeper.properties
也可以使用docker启动zookeeper
docker run -d -p 2181:2181 wurstmeister/zookeeper
启动之后,我们就是启动kafka了,在kafka_2.11-2.1.0/bin/
下面都是一些脚本,用来启动,停止kafka,创建topic,发送消息和消费消息。
启动命令
# 不需要修改配置,默认端口是9092
bin/kafka-server-start.sh config/server.properties
创建topic
# 指定了副本数量,分区数和名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 列出topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1.2 docker部署
首先我们需要安装docker和docker-compose,然后编写docker-compose.yaml文件
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: zookeeper
# kafka version: 1.1.0
# scala version: 2.12
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
- zoo1
container_name: kafka
之后,输入命令即可启动
docker-compose up -d
需要结束时,可以输入
docker-compose down
使用docker启动后,可以使用本地的kafka中的脚本创建topic,也可以进入docker的kafka容器内部执行上面的脚本。
2.集群部署
2.1 规划
Broker
我们创建3个节点,端口分别为9091,9092,9093;broker.id分别为1,2和3;log.dirs也分别添加后缀端口内容,创建一个topic,指定副本数量为3,分区为6
broker.id=1
listeners=PLAINTEXT://localhost9091:9091
log.dirs=/tmp/kafka-logs-9091
advertised.host.name=localhost9091
advertised.listeners=PLAINTEXT://localhost9091:9091
需要注意,如果需要外部访问,需要修改/etc/hosts中的内容
0.0.0.0 localhost9091
0.0.0.0 localhost9091
0.0.0.0 localhost9091
2.2 配置
启动kafka时,需要一个server.properties配置文件,我们首先创建三个配置文件,分别为server9091.properties,server9092.properties,server9093.properties,之后,使用下面的命令启动三个服务:
./bin/kafka-server-start.sh config/server9091.properties
./bin/kafka-server-start.sh config/server9092.properties
./bin/kafka-server-start.sh config/server9093.properties
创建topic
我们创建一个jianshu-topic
,有6个分区和2个副本
# 指定了副本数量,分区数和名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic jianshu-topic
# 列出topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
topic描述
使用下面的命令可以查看topic的描述信息,可以看到每个分区的Leader所在的节点,和副本保存的节点
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic jianshu-topic
Topic:jianshu-topic PartitionCount:6 ReplicationFactor:3 Configs:
Topic: jianshu-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: jianshu-topic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic: jianshu-topic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: jianshu-topic Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: jianshu-topic Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 2
Topic: jianshu-topic Partition: 5 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
这样就集群就启动了,之后,我们使用java客户端向kafka集群发送消息。
3.生产者
使用java向kafka集群发送消息时,需要引入maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
java生产者的编写,生成者的逻辑是不断的向kafka集群内写入数据
public class DistributeSender {
public static String content []= new String[]{
"The Enterprise Exactly-once Framework has been used to in tons of application, especially in some critical billing system such as Cainiao, Alimama, Aliexpress and so on. it has been approved as stable and correct. What’s more, the performance is better than the old “At-least-once through acker”",
"The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured",
"The Kafka cluster durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.",
"In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from \"now\"."
};
public static void main(String args[]){
Properties props = new Properties();
props.put("bootstrap.servers", "106.12.196.74:9091");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class",MyPartition.class.getName());
//生产者发送消息
String topic = "jianshu-topic";
Producer<String, String> procuder = new KafkaProducer<String,String>(props);
int i = 0;
while (true) {
String value = content[i % content.length];
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, String.valueOf("key"+i),value);
procuder.send(msg);
if (i % 10000 == 0) {
try{
Thread.sleep(2 * 1000);
}catch (Exception e){}
}
if (i > 20000) {
break;
}
}
//列出topic的相关信息
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
partitions = procuder.partitionsFor(topic);
for(PartitionInfo p:partitions)
{
System.out.println(p);
}
System.out.println("send message over.");
procuder.close(100, TimeUnit.MILLISECONDS);
}
}