flink学习笔记
学习方式为观看尚硅谷出品的flink教学视频,视频地址:https://www.bilibili.com/video/BV1qy4y1q728
1.flink的特点
分层API1. 事件驱动
2. 基于流处理,一切皆是流,离线数据是有界的流,实时数据是无界的流
3.分层API,越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活
4.支持有状态计算
5.支持exactly-once语义,at most once ,at least once ,exactly once
6.支持事件时间
2.flink部署
2.1 standalone模式
1.Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask
2.为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
3.每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot(注:这里不会涉及CPU的隔离,slot仅仅用来隔离task的受管理内存)
4.可以通过调整task slot的数量去自定义subtask之间的隔离方式。如一个TaskManager一个slot时,那么每个task group运行在独立的JVM中。而当一个TaskManager多个slot时,多个subtask可以共同享有一个JVM,而在同一个JVM进程中的task将共享TCP连接和心跳消息,也可能共享数据集和数据结构,从而减少每个task的负载
TaskManager5.默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。 这样的结果是,一个 slot 可以保存作业的整个管道。
6.Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。 举例:如果总共有3个TaskManager,每一个TaskManager中分配了3个TaskSlot,也就是每个TaskManager可以接收3个task,这样我们总共可以接收9个TaskSot。但是如果我们设置parallelism.default=1,那么当程序运行时9个TaskSlot将只有1个运行,8个都会处于空闲状态,所以要学会合理设置并行度!具体图解如下:
slot2.1.1 Web UI提交job
启动Flink后,可以在Web UI的Submit New Job提交jar包,然后指定Job参数。
主要参数如下:
# 程序的入口,指定入口类(类的全限制名)
Entry Class
# 程序启动参数,例如--host localhost --port 7777
Program Arguments
# 设置Job并行度
Parallelism
# savepoint是通过checkpoint机制为streaming job创建的一致性快照,比如数据源offset,状态等。
Savepoint Path
2.1.2 命令行提交job
1. 查看已提交的所有job
$ bin/flink list
2. 提交job
$ bin/flink run -c <入口类> -p <并行度> <jar包路径> <启动参数>
3. 取消job
$ bin/flink cancel <Job的ID>
4. 停止job
$ bin/flink stop <Job的ID>
2.2 yarn模式
以Yarn模式部署Flink任务时,要求Flink是有 Hadoop 支持的版本,Hadoop 环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。
Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。
2.2.1 Session-Cluster模式
Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。
session-cluster模式2.2.2 Per-Job-Cluster模式(虎符在使用的模式)
一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
per-job-cluster模式2.3 kubernetes模式(略)
3.flink快速上手
3.1 批处理实现WordCount
pom依赖
```xml
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Flink_Tutorial</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.1</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies></project>
```
代码实现
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 String inputPath = "/tmp/Flink_Tutorial/src/main/resources/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 // 按照第一个位置的word分组 // 按照第二个位置上的数据求和 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); resultSet.print(); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = s.split(" "); // 遍历所有word,包成二元组输出 for (String str : words) { out.collect(new Tuple2<>(str, 1)); } } } }
3.2 流处理实现WordCount
在3.1批处理的基础上,新建一个类进行改动。
1.批处理=>几组或所有数据到达后才处理;流处理=>有数据来就直接处理,不等数据堆叠到一定数量级
2.这里不像批处理有groupBy => 所有数据统一处理,而是用流处理的keyBy => 每一个数据都对key进行hash计算,进行类似分区的操作,来一个数据就处理一次,所有中间过程都有输出!
3.并行度:开发环境的并行度默认就是计算机的CPU逻辑核数
代码实现:
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.StreamContextEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class StreamWordCount { public static void main(String[] args) throws Exception { // 创建流处理执行环境 StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment(); // 设置并行度,默认值 = 当前计算机的CPU逻辑核数(设置成1即单线程处理) // env.setMaxParallelism(32); // 从文件中读取数据 String inputPath = "/tmp/Flink_Tutorial/src/main/resources/hello.txt"; DataStream<String> inputDataStream = env.readTextFile(inputPath); // 基于数据流进行转换计算 DataStream<Tuple2<String,Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()) .keyBy(item->item.f0) .sum(1); resultStream.print(); // 执行任务 env.execute(); } }