简读笔记-深入理解kafka-第一部分

2019-05-05  本文已影响0人  你的头发真的好长

第一章 初始kafka

参考书籍: 朱小厮--深入理解Kafka 核心设计与实践原理

Kafka体系结构

主题和分区

多副本(Replica机制)

多副本架构

几个重要名词概念

HW和LEO的关系

第二章 生产者

KafkaProducer是线程安全的

public class KafkaProducerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }
    
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
        producer.send(record);
    }
}

发送消息的三种模式

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

序列化器

分区器

分区器 是根据key这个字段来计算partition值。它的作用是为消息分配分区

生产者拦截器

生产者拦截器既可用来在消息发送前做一些准备工作如 按照某个规则过滤掉不符合要求的消息,修改消息内容等。也可以用来在发送回调逻辑前做一些定制化需求,如统计工作。 还可以指定多个拦截器形成拦截器链

生产者整体架构

生产者架构

元数据的更新

元数据是指Kafka集群中的元数据,这些元数据记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follwer副本分配在哪些节点上,哪些副本在AR,ISR集合中,集群有哪些节点,控制节点又是哪一个等信息。

元数据更新会挑选 InFlightRequests中当前负载最小的节点发送更新元数据请求。 由于Sender线程需要更新,而主线程需要读取。因此数据同步问题也要考虑。使用synchronized和final保证。

几个重要的参数

第三章 消费者

kafkaConsumer是线程不安全的

消费者和消费者组

消费者客户端开发

KafkaConsumer是非线程安全的

public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic()
                            + ", partition = " + record.partition()
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

反序列化器

将字节数组转化为对象,与生产者的序列化器要一一对应

消息消费

几个常用API

#KafkaConsumer
public ConsumerRecords<K, V> poll(Duration timeout);
#ConsumerRecords
public List<ConsumerRecord<K, V>> records(TopicPartition partition);
public Iterable<ConsumerRecord<K, V>> records(String topic);

位移提交

控制或关闭消费

指定位移消费

再均衡

消费者拦截器

多线程的实现

上一篇下一篇

猜你喜欢

热点阅读