Flink_Connector 连接器_kafka

2022-08-20  本文已影响0人  Eqo
image.png
Flink API 提供的专门访问其他 存储系统的一套api 只需要创建对象.调用API内的方法就可以实现 对各种数据库的访问
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/

Kafka Connector

image.png

1. 从kafka中读取数据

2. 向Kafka中写入数据

-FlinkKakfaProducer


Kafka 分布式消息队列 ,分布式消息中间件
Topic 主题队列 partition 分区 ( Sagment 片段) offset 偏移量
主从架构 broker节点 副本机制 主备副本
基于liux 磁盘 页缓存 零拷贝


Kafka消费数据需要

kafka集群地址
消费者组
topic / partition 一个topic 或者一个列表 多个topic
反序列化化类 字符数组->字符串

FlinkKafkaConsumer 可以指定消费位置

        // todo: 1. 最早偏移量位置消费数据
         flinkKafkaConsumer.setStartFromEarliest();

        // todo: 2. 最新偏移量位置消费数据, 不设置时,默认值
         flinkKafkaConsumer.setStartFromLatest() ;

        // todo: 3. 从消费组上次消费的位置开始消费数据
        flinkKafkaConsumer.setStartFromGroupOffsets();

        // todo: 4. 指定时间戳开始消费数据
         flinkKafkaConsumer.setStartFromTimestamp(1660725338991L) ;

        // todo: 5. 指定具体分配偏移量位置消费数据
        Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>() ;
        specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 0), 5L);
        specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 1), 4L);
        specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 2), 9L);
        flinkKafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);

kafka source之 新增Topic 和分区发现

场景一 当我的FlinkKfkaConsumer 程序正在执行的时候,kafka中新增了几个topic
但是不想更改我的job代码
场景二 本来Topic中只有三个分区,然后新增了几个分区,我不想更改我的job代码 怎么办

解决方案

            // 
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(
            // todo: topic名称,使用正则表达式,每topic名称设计的时候,最好符合一定规范
            "flink-topic-[0~9]",
            new SimpleStringSchema(),
            props
        );
   // todo: 设置属性,动态分区发现
 props.setProperty("flink.partition-discovery.interval-millis", "5000") ;

Kafka source

1.14版本建议使用Kafkasource

        // 2. 数据源-source
        // 2-1. 创建Source数据源实例对象,设置属性值
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092")
            .setTopics("flink-topic")
            .setGroupId("flink-gid-2")
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setStartingOffsets(OffsetsInitializer.latest())
                //设置分区发现
            .setProperty("partition.discovery.interval.ms", "5000")
            .build();
        // 2-2. 从数据源中获取数据
        DataStreamSource<String> kafkaStream = env.fromSource(
            source, WatermarkStrategy.noWatermarks(), "Kafka Source"
        );


        // 3. 数据转换-transformation

        // 4. 数据终端-sink
        kafkaStream.printToErr();

        // 5. 触发执行-execute
        env.execute("ConnectorKafkaSourceDemo");
    }
上一篇下一篇

猜你喜欢

热点阅读