Flink(1.13) 快速上手

2021-08-15  本文已影响0人  万事万物

批与流

在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

有界流与无界流

有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

  1. 读取文件

无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。

  1. 读取 socket 数据
  2. 读取 Kafka 数据
  3. 读取 flume 数据

创建maven项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>

log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

准备 wordcount.txt

java python hello
pon xml log batch
python log java word
count xml python hello
exe txt log xml pon java

批处理

package com.admin.flink.demo01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.Arrays;

/**
 * 批处理
 * @author admin
 * @date 2021/8/6
 */
public class BatchStream {


    public static void main(String[] args) throws Exception {
        // 获取环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        // 读取资源
        DataSource<String> dataSource = executionEnvironment.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");

        // 分隔,扁平化(s, 1)
        FlatMapOperator<String, Tuple2<String, Integer>> flatMap = dataSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割 遍历 收集
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            // 当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
        }).returns(Types.TUPLE(Types.STRING,Types.INT));

        // 按照单词分组并求算结果
        AggregateOperator<Tuple2<String, Integer>> sum = flatMap.groupBy(0).sum(1);
        // 打印输出
        sum.print();

    }
}
(pon,2)
(hello,2)
(log,3)
(xml,3)
(exe,1)
(java,3)
(python,3)
(txt,1)
(batch,1)
(count,1)
(word,1)

有界流

package com.admin.flink.demo01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * 有界流
 * @author admin
 * @date 2021/8/6
 */
public class BoundedStream {

    public static void main(String[] args) throws Exception {
        // 创建有界流环境
        StreamExecutionEnvironment boundedStream = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取资源
        DataStreamSource<String> streamSource = boundedStream.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");

        // 数据扁平化
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切分,遍历 收集
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按key 收集,聚合求出总数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(0).sum(1);

        //输出
        sum.print("test");

        // 开启
        boundedStream.execute();
    }
}
test:14> (batch,1)
test:12> (word,1)
test:3> (java,1)
test:14> (count,1)
test:13> (xml,1)
test:5> (python,1)
test:1> (pon,1)
test:3> (java,2)
test:5> (hello,1)
test:1> (pon,2)
test:3> (java,3)
test:13> (xml,2)
test:11> (log,1)
test:5> (python,2)
test:5> (python,3)
test:7> (txt,1)
test:11> (log,2)
test:5> (hello,2)
test:11> (exe,1)
test:11> (log,3)
test:13> (xml,3)

无界流

package com.atguigu.flink.demo01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 *
 * 无界流
 * @author admin
 * @date 2021/8/6
 */
public class UnboundedStream {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割每行数据,并收集到 Collector中
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1);

        sum.print("test");

        env.execute();

    }
}
[admin@hadoop102 flink]$ nc -lk 9999
java hello
hello java python
hello java scala
test:3> (java,1)
test:5> (hello,1)
test:3> (java,2)
test:5> (hello,2)
test:5> (python,1)
test:3> (java,3)
test:1> (scala,1)
test:5> (hello,3)

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(BoundedStream.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
    at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:319)
    at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:33)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
    at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
    at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
    at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:612)
    at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:27)

当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息

// 数据扁平化
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切分,遍历 收集
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
上一篇 下一篇

猜你喜欢

热点阅读