kafka简介

2017-05-17  本文已影响167人  紫玥迩

简介

消息中间件
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
broker
topic
partition
producer
consumer
consumer group

安装

  1. 下载解压
  2. 启动zookeeper
  3. 修改config/server.proprties的:
log.dirs=E:\data\kafka-logs
zookeeper.connect=10.129.83.213:2181
listeners=PLAINTEXT://10.143.47.32:9092
  1. 启动命令
    kafka_2.12-0.10.2.1 需要jdk1.8
    kafka_2.11-0.10.0.1 需要jdk1.7
    Kafka Shell基本命令(包括topic的增删改查)
#win
.\bin\windows\kafka-server-start.bat .\config\server.properties
#linux
 bin/kafka-server-start.sh config/server.properties
#创建一个名为“test”的Topic,只有一个分区和一个备份
bin/kafka-topics.sh --create --zookeeper 10.129.83.213:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic
bin/kafka-topics.sh --list --zookeeper 10.129.83.213:2181
#查看consumer-groups
bin/kafka-consumer-groups.sh --list --bootstrap-server 10.143.47.32:9092
bin/kafka-consumer-groups.sh --bootstrap-server 10.143.47.32:9092 --describe --group myGroup
#查看消费了多少数据
 bin/kafka-run-class.sh  kafka.tools.ConsumerOffsetChecker --group myGroup  --topic test  --zookeeper 10.129.83.213:2181
#查看test详细信息
bin/kafka-topics.sh --describe --topic test --zookeeper 10.129.83.213:2181
#发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 
#发送消息
bin/kafka-console-producer.sh --broker-list 10.143.47.32:9092 --topic test 
#消息内容
This is a message
This is another message
Hello World
#消费消息
#消费者线程数必须是小等于topic的partition分区数
bin/kafka-console-consumer.sh --zookeeper 10.129.83.213:2181 --topic test --from-beginning
#消费消息
bin/kafka-console-consumer.sh --bootstrap-server 10.143.47.32:9092 --topic test --from-beginning
Paste_Image.png

每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
patition 和 key 都未指定,使用轮询选出一个 patition。
  1. kafka_2.10-0.10.2.0 需要jdk1.7(springboot 1.5.3集成)
    pom.xml
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>1.5.3.RELEASE</version>
        <exclusions>  
                    <exclusion>  
                        <groupId>org.springframework.boot</groupId>  
                        <artifactId>spring-boot-starter-logging</artifactId>  
                    </exclusion>  
                </exclusions> 
    </dependency>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <version>1.5.3.RELEASE</version>
        </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
            <version>1.5.3.RELEASE</version>
        </dependency>

application.properties

spring.application.name=springboot-kafka-test
#kafka
spring.kafka.bootstrap-servers=10.143.47.32:9092
spring.kafka.consumer.group-id=myGroup
#charset
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.template.default-topic=test
spring.kafka.listener.concurrency=1
spring.kafka.producer.batch-size=1000
#log4j2
logging.config=classpath:log4j2.xml

配置

@Configuration
@EnableKafka
public class KafkaConfig {
}

生产者

@Component
public class MsgProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void send(String value){
        System.out.println("send start-----------");
        kafkaTemplate.send("test", value+"1");
        kafkaTemplate.send("test", value+"2");
        System.out.println("send end-----------");
    }
}

消费者

@Component
public class MsgConsumer {
    static Logger subscribelogger = LoggerFactory.getLogger("subscribelogger"); 
    
    @KafkaListener(topics="test")
    public void processMsg1(String s){
        subscribelogger.info("{}|{}","myGroup",s); 
    }
    
    /*@KafkaListener(topics="test")
    public void processMsg(ConsumerRecord<?, ?> record){
        subscribelogger.info("{}|{}","myGroup1",record.value());    
    }*/
}

参考

kafka入门经典教程
kafka教程

上一篇下一篇

猜你喜欢

热点阅读