kafka

2022-04-11  本文已影响0人  shoyu666

Kafka 是一个分布式流媒体平台,常被用做MQ

kafka组件:
kafka概念:
kafka Partition和副本Partition
image.png

每个Partition会有多个副本Partition,分布在不同的Broker上。

Partition存储
image.png

topic和partition都是逻辑上的概念
每个partition对应一个目录:{topic}-{partition}
比如图中topic car_data,partition 0的目录时 /car_data-0
/car_data-0中存的时segment(逻辑上的),对应实际的是.index文件和.log文件
index文件通过mmap在内存中

index和log文件

image.png
image.png

index存储的是稀疏索引(稀疏索引占用空间少)
index和log文件名称和文件的内容中第一个offset有关。

kafka消费者组

image.png

kafka消费者Offset保存

新版本中offset由broker维护,offset信息由一个特殊的topic “ __consumer_offsets”来保存,offset以消息形式发送到该topic并保存在broker中。这样consumer提交offset时,只需连接到broker,不用访问zk,避免了zk节点更新瓶颈。
broker消息保存目录在配置文件server.properties中

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
#ls /usr/local/var/lib/kafka-log
__consumer_offsets-0            __consumer_offsets-22           __consumer_offsets-36           __consumer_offsets-5
__consumer_offsets-1            __consumer_offsets-23           __consumer_offsets-37

其中numPartitions由offsets.topic.num.partitions参数决定,默认值即50
key是group.id+topic+分区号,而 value 就是 offset 的值

kafka消息投递语义

image.png

Kafka 性能

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 开启 GZIP 压缩
 props.put("compression.type", "gzip");
 Producer<String, String> producer = new KafkaProducer<>(props);

生产者,消费者 sample

Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
 producer.send(record) 

   private static final String TOPIC_NAME = "car";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        String message = null;
        try {
            do {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
                for (ConsumerRecord<String, String> record : records) {
                    message = record.value();
                    System.out.println(message);
                }
            } while (true);
        } catch(Exception e) {
            // exception
        } finally {
            consumer.close();
        }
    }

参考:
https://www.infoq.cn/article/ukoqjkuwr0v0cs7u6p8o

上一篇下一篇

猜你喜欢

热点阅读