实时数据相关flink

Flink使用广播实现配置动态更新

2019-04-01  本文已影响97人  猫留下你走吧

本着开源的精神将学习成果分享,转载请注明出处。

问题复现

场景

对每条流数据进行关键字检测,对符合条件的消息进行拦截。例如关键字是 java,则消息 java是世界上最优秀的语言就会被拦截。

需求

拦截的关键字不一定是 java,可能需要变更拦截关键词,例如: php。因此关键字必须做到是可配置的。

问题

我们首先想到的是存在数据库或外部传入参数。但又因为该关键词是在算子中作为一个变量,一旦作业启动,想修改关键字不得不停掉作业,然后再重新启动作业。繁琐且不友好,有没有什么可以动态的修改算子里变量的方法?

解决方案

没错,使用广播的方式去解决。画了2张图表示了他们的区别


算子处理广播流方式 算子处理数据流方式

广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新

源码

package com.example.flink;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * 作者(author):miao
 * 时间(date): 2019-04-01 15:17
 * 功能描述(description):使用广播流实现配置的动态更新
 */
public class BroadcastStreamDemo {

    public static void main(String[] args) throws Exception {

        // 构建流处理环境
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置处理环境的并发度为4
        environment.setParallelism(4);

        final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

        // 自定义广播流(单例)
        BroadcastStream<String> broadcastStream = environment.addSource(new RichSourceFunction<String>() {

            private volatile boolean isRunning = true;
            //测试数据集
            private String[] dataSet = new String[] {
                    "java",
                    "swift",
                    "php",
                    "go",
                    "python"
            };

            /**
             * 数据源:模拟每30秒随机更新一次拦截的关键字
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                int size = dataSet.length;
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(30);
                    int seed = (int) (Math.random() * size);
                    //随机选择关键字发送
                    ctx.collect(dataSet[seed]);
                    System.out.println("读取到上游发送的关键字:" + dataSet[seed]);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).setParallelism(1).broadcast(CONFIG_KEYWORDS);

        // 自定义数据流(单例)
        DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() {

            private volatile boolean isRunning = true;

            //测试数据集
            private String[] dataSet = new String[] {
                    "java是世界上最优秀的语言",
                    "swift是世界上最优秀的语言",
                    "php是世界上最优秀的语言",
                    "go是世界上最优秀的语言",
                    "python是世界上最优秀的语言"
            };

            /**
             * 模拟每3秒随机产生1条消息
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                int size = dataSet.length;
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(3);
                    int seed = (int) (Math.random() * size);
                    ctx.collect(dataSet[seed]);
                    System.out.println("读取到上游发送的消息:" + dataSet[seed]);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }).setParallelism(1);

        // 数据流和广播流连接处理并将拦截结果打印
        dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {

            //拦截的关键字
            private String keywords = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                keywords = "java";
                System.out.println("初始化模拟连接数据库读取拦截关键字:java");
            }

            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                if (value.contains(keywords)) {
                    out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keywords);
                }
            }

            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                keywords = value;
                System.out.println("关键字更新成功,更新拦截关键字:" + value);
            }
        }).print();

        // 懒加载执行
        environment.execute();
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.flink</groupId>
    <artifactId>broadcast-stream-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.7.1</flink.version>
        <java.version>1.8</java.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.15</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 指定jdk版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <!-- 源码的编译器版本 -->
                    <source>${java.version}</source>
                    <!-- class的编译器版本 -->
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.flink.BroadcastStreamDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

运行结果

初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
初始化模拟连接数据库读取拦截关键字:java
读取到上游发送的消息:java是世界上最优秀的语言
2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:swift是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
4> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
2> 拦截消息:java是世界上最优秀的语言, 原因:包含拦截关键字:java
读取到上游发送的关键字:php
读取到上游发送的消息:php是世界上最优秀的语言
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
关键字更新成功,更新拦截关键字:php
3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
3> 拦截消息:php是世界上最优秀的语言, 原因:包含拦截关键字:php
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
关键字更新成功,更新拦截关键字:swift
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:go是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:php是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:java是世界上最优秀的语言
读取到上游发送的消息:python是世界上最优秀的语言
读取到上游发送的消息:swift是世界上最优秀的语言
1> 拦截消息:swift是世界上最优秀的语言, 原因:包含拦截关键字:swift
读取到上游发送的消息:python是世界上最优秀的语言

参考资料

聊聊flink的Broadcast State

上一篇下一篇

猜你喜欢

热点阅读