flink学习笔记

2021-07-14  本文已影响0人  whiletrue_aed4

学习方式为观看尚硅谷出品的flink教学视频,视频地址:https://www.bilibili.com/video/BV1qy4y1q728

1.flink的特点

1. 事件驱动

2. 基于流处理,一切皆是流,离线数据是有界的流,实时数据是无界的流

3.分层API,越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活

4.支持有状态计算

5.支持exactly-once语义,at most once ,at least once ,exactly once

6.支持事件时间

分层API

2.flink部署

2.1 standalone模式

1.Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask

2.为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)

3.每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot(注:这里不会涉及CPU的隔离,slot仅仅用来隔离task的受管理内存)

4.可以通过调整task slot的数量去自定义subtask之间的隔离方式。如一个TaskManager一个slot时,那么每个task group运行在独立的JVM中。而当一个TaskManager多个slot时,多个subtask可以共同享有一个JVM,而在同一个JVM进程中的task将共享TCP连接和心跳消息,也可能共享数据集和数据结构,从而减少每个task的负载

TaskManager

5.默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。 这样的结果是,一个 slot 可以保存作业的整个管道。 

6.Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。 举例:如果总共有3个TaskManager,每一个TaskManager中分配了3个TaskSlot,也就是每个TaskManager可以接收3个task,这样我们总共可以接收9个TaskSot。但是如果我们设置parallelism.default=1,那么当程序运行时9个TaskSlot将只有1个运行,8个都会处于空闲状态,所以要学会合理设置并行度!具体图解如下:

slot

2.1.1 Web UI提交job

启动Flink后,可以在Web UI的Submit New Job提交jar包,然后指定Job参数。

主要参数如下:

# 程序的入口,指定入口类(类的全限制名)

Entry Class

# 程序启动参数,例如--host localhost --port 7777

Program Arguments

# 设置Job并行度

Parallelism

# savepoint是通过checkpoint机制为streaming job创建的一致性快照,比如数据源offset,状态等。

Savepoint Path

2.1.2 命令行提交job

1. 查看已提交的所有job

$ bin/flink list

2. 提交job

$ bin/flink run -c <入口类> -p <并行度> <jar包路径> <启动参数>

3. 取消job

$  bin/flink cancel <Job的ID>

4. 停止job

$  bin/flink stop <Job的ID>

2.2 yarn模式

以Yarn模式部署Flink任务时,要求Flink是有 Hadoop 支持的版本,Hadoop 环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。

Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。

2.2.1 Session-Cluster模式

Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager共享资源;适合规模小执行时间短的作业。在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。

session-cluster模式

2.2.2  Per-Job-Cluster模式(虎符在使用的模式)

一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

per-job-cluster模式

2.3 kubernetes模式(略)

3.flink快速上手

3.1 批处理实现WordCount

pom依赖

```xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Flink_Tutorial</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.1</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies></project>

```

代码实现

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 String inputPath = "/tmp/Flink_Tutorial/src/main/resources/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 // 按照第一个位置的word分组 // 按照第二个位置上的数据求和 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); resultSet.print(); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = s.split(" "); // 遍历所有word,包成二元组输出 for (String str : words) { out.collect(new Tuple2<>(str, 1)); } } } }

3.2 流处理实现WordCount

在3.1批处理的基础上,新建一个类进行改动。

1.批处理=>几组或所有数据到达后才处理;流处理=>有数据来就直接处理,不等数据堆叠到一定数量级

2.这里不像批处理有groupBy => 所有数据统一处理,而是用流处理的keyBy => 每一个数据都对key进行hash计算,进行类似分区的操作,来一个数据就处理一次,所有中间过程都有输出!

3.并行度:开发环境的并行度默认就是计算机的CPU逻辑核数

代码实现:

import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.StreamContextEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class StreamWordCount { public static void main(String[] args) throws Exception { // 创建流处理执行环境 StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment(); // 设置并行度,默认值 = 当前计算机的CPU逻辑核数(设置成1即单线程处理) // env.setMaxParallelism(32); // 从文件中读取数据 String inputPath = "/tmp/Flink_Tutorial/src/main/resources/hello.txt"; DataStream<String> inputDataStream = env.readTextFile(inputPath); // 基于数据流进行转换计算 DataStream<Tuple2<String,Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()) .keyBy(item->item.f0) .sum(1); resultStream.print(); // 执行任务 env.execute(); } }

4.flink运行时架构

4.1 Flink运行时的组件

4.2 任务提交流程

4.3 任务调度原理

上一篇下一篇

猜你喜欢

热点阅读