Mac kafka安装

2020-11-12  本文已影响0人  matthewfly

brew直接安装,包含了zookeeper:

brew install kafka

安装完成后会提示启动命令:

//zookeeper启动
To have launchd start zookeeper now and restart at login:
  brew services start zookeeper
Or, if you don't want/need a background service you can just run:
  zkServer start
//kafka启动
To have launchd start kafka now and restart at login:
  brew services start kafka
Or, if you don't want/need a background service you can just run:
  zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

其中安装路径:

==> Summary
🍺  /usr/local/Cellar/kafka/2.6.0

配置路径:

/usr/local/etc/kafka/zookeeper.properties
/usr/local/etc/kafka/server.properties

输入命令brew services start zookeeper先启动zk,brew services start kafka启动kafka。可以查看配置文件,zk默认端口2181,Kafka默认端口9092。
进入kafka安装目录/usr/local/Cellar/kafka/2.6.0/bin,并创建一个topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test1

查看topic创建情况:

bin % kafka-topics --list --zookeeper localhost:2181                                                      
test1

创建生产者:

kafka-console-producer --broker-list localhost:9092 --topic test1

创建消费者:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning

生产者端输入并回车,可以看到消费者端收到消息,说明正常能消费。

在springboot中使用。
pom.xml中引入kafka依赖:

       <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

yml中添加kafka配置:

spring:
    kafka:
        consumer:
            bootstrap-servers: localhost:9092
            client-id: test-client
            group-id: test-consumer
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

添加生产者producer接口:

@Controller
public class TestController {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/test/kafka")
    public void send(String msg) {
        kafkaTemplate.send("test1", msg);
    }
}

添加消费者监听:

@Component
public class TestKafkaListener {
    @KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void partition0(String msgData) {
        System.out.println("demo3 receive : " + msgData + ", partition: 0" );
    }

    @KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
    public void partition1(String msgData) {
        System.out.println("demo3 receive : " + msgData + ", partition: 1" );
    }

    @KafkaListener(id = "c3", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "0"))})
    public void listenPartitionOnly(String msgData) {
        System.out.println("demo3 receive : " + msgData + ", partition: 2" );
    }
}

注解中必须指定topic,可以指定partition和开始消费的消息offset。
启动项目,并在浏览器访问:http://localhost:8080/test/kafka?msg=123,可以看到消费者收到数据:

demo3 receive : 123, partition: 2
上一篇下一篇

猜你喜欢

热点阅读