kafka 单机部署

2020-11-13  本文已影响0人  飞鹰雪玉

一 、部署服务

1、下载(一定要下载二进制的包,而不是源码的包)
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.13-2.5.0.tgz
2、安装

mv kafka_2.13-2.5.0.tgz /usr/local
cd /usr/local
tar -zxvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0

3 、启动zookeeper(kafka自带的二进制包文件里面就有zookeeper)

> bin/zookeeper-server-start.sh config/zookeeper.properties
INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

4、启动kafka
另开一个终端窗口

> bin/kafka-server-start.sh config/server.properties
INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

这个时候zookeeper 和kafka 都已经启动
5 创建一个 topic

另开一个终端窗口
创建一个名为“test”的topic,它有一个分区和一个副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以运行list(列表)命令来查看这个topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

6 、发送一些消息
另开一个终端窗口

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

[root@localhost kafka_2.13-2.5.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is a message from liupeng
this is another message for test
liuminbo is a good boy

7 、启动一个 consumer
另开一个终端窗口
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

[root@localhost kafka_2.13-2.5.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a message from liupeng
>this is another message for test 
>liuminbo is a good boy

所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。

二、 java程序嵌入

image.png

1、引入依赖
pom.xml

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.0</version>
        </dependency>

2、编写produce

package org.springblade.common.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

/**
 * Kafka Producer
 *
 * @author liupeng
 * @date 2020-11-12
 */
@Slf4j
public class ProduceKafka {

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "localhost:9092";
    private static KafkaProducer<String,String> producer = null;

    // 初始化生产者
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<>(configs);
    }

    /**
     * 初始化配置
     */
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        //消息实体
        ProducerRecord<String , String> record = new ProducerRecord<>(TOPIC,"message"+"this is a message");
        producer.send(record);
        producer.close();
    }

}

3、编写consumer

package org.springblade.common.kafka;


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

/**
 * Kafka Consumer
 *
 * @author liupeng
 * @date 2020-11-12
 */
@Slf4j
public class ConsumerKafka {


    private static final String BROKER_LIST = "localhost:9092";
    private static KafkaConsumer<String,String> consumer = null;

    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<>(configs);
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers",BROKER_LIST);
        properties.put("group.id","0");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }


    public static void main(String[] args) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                log.info(String.valueOf(record));
            }
        }
    }

}

运行produce的main函数

consumer终端窗口可以看见发送的消息


image.png

官方文档中文版:
https://kafka.apachecn.org/uses.html

上一篇 下一篇

猜你喜欢

热点阅读