Kafka入门之安装试用
前言
本文主要介绍 Kafka 在 Windows8.1,CentOS Linux release 7.4.1708 (Core)(云服务器)上安装(单机安装),以及用 java 写一个 HelloWorld 项目测试Kafka是否安装配置成功。
正文
Kafka 安装很简单,只需要去 Apache 官网下载对应的压缩文件,解压即可。但是 Kafka 依赖 Zookeeper,安装 Kafka 之前需要安装 Zookeeper。
一:Windows8.1 安装 Kafka
Zookeeper 安装
不熟悉 Zookeeper 的同学可以 点击了解下
- Zookeeper下载地址:http://ftp.wayne.edu/apache/zookeeper/zookeeper-3.4.13/
- 下载完成后解压到对应的目录,我解压到
D:\Software
,根目录是D:\Software\zookeeper-3.4.13
- 切换到
D:\Software\zookeeper-3.4.13\conf
目录,把zoo_sample.cfg
复制一份,重命名为zoo.cfg
- 编辑
zoo.cfg
文件,修改dataDir=/Software/zookeeper-3.4.13/data
- 打开cmd窗口,切换到
D:\Software\zookeeper-3.4.13\bin
目录,输入:zkServer.cmd
启动 Zookeeper
启动结果如图:
Zookeeper运行结果可以看到 Zookeeper 已经在 2181
端口监听了
Kafka 安装
- Kafka下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka_2.12-2.1.1.tgz
- 下载完成后解压到对应的目录,我解压到
D:\Software
,根目录是D:\Software\kafka_2.12-2.1.1
- 打开cmd窗口切换到 Kafka 安装根目录,输入:
"bin/windows/kafka-server-start.bat" "config/server.properties"
启动 Kafka
运行Kafka
启动结果如图:
Kafka运行结果可以看到,已经成功运行了,这样在 Windows 上面安装 Kafka 就到此结束了,下面我们再来启动一个 Producer 和一个 Customer 看看效果,打开两个cmd窗口都切换到 Kafka 安装根目录:
- 运行
"bin/windows/kafka-topics.bat" --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
来创建一个 topic,topic 名称是 test - 运行
"bin/windows/kafka-console-producer.bat" --broker-list localhost:9092 --topic test
启动 Producer 服务来 生产消息 - 运行
"bin/windows/kafka-console-consumer.bat" --bootstrap-server localhost:9092 --topic test --from-beginning
启动 Customer服务来 消费消息
二:云服务器Centos7.4 安装 Kafka
下载参考上面Windows,Centos7.4 不需要下载 Zookeeper,只需要下载Kafka,下载完成上传到Linux云服务器,我上传到 /root/software 目录下面
- 解压
tar zxvf kafka_2.12-2.1.1.tgz
- 我准备把Kafka安装到
/usr/kafka
目录下,如果没有创建,请创建对应的目录,执行mv kafka_2.12-2.1.1 /usr/kafka
- 如果需要在本地写Deom测试云服务器上面的Kafka,还需要改下
config/server.properties
文件,修改如下:
advertised.listeners=PLAINTEXT://云服务器外网IP:9092
对,这就安装完成了,就是这么简单
现在我们来启动 Kafka 试试,启动 Kafka 之前需要启动下 Zookeeper,切换到 /usr/kafka
目录下
-
启动Zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
-
启动Kafka:
bin/kafka-server-start.sh config/server.properties
同样我们启动一个 Producer 和一个 Customer 看看效果
-
创建Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test
-
查看Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
-
启动Producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
-
启动Consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
可以看到Linux上面安装也没问题了,下面我们来用Java语言写一个Demo试试
KafkaProducerTest.java
public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
// props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
@Override
public void run() {
int messageNo = 1;
try {
for (; ; ) {
String messageStr = "hello, this is " + messageNo + " data";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
if (messageNo % 10 == 0) {
System.out.println("send success " + messageNo + " data");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest(TOPIC);
Thread thread = new Thread(test);
thread.start();
}
KafkaConsumerTest.java
public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
// props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
@Override
public void run() {
int messageNo = 1;
try {
for (; ; ) {
String messageStr = "hello, this is " + messageNo + " data";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
if (messageNo % 10 == 0) {
System.out.println("send success " + messageNo + " data");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest(TOPIC);
Thread thread = new Thread(test);
thread.start();
}
}
public static final String TOPIC = "mr_topic_test";
public static final String BROKER_LIST = "云服务器IP:9092";
public static final String GROUP_ID = "test_group1";
各位看官动动小手试试吧,有问题欢迎评论下面留言