Flink2:Flink快速上手
2020-04-24 本文已影响0人
勇于自信
1.搭建maven工程 flink-2019
1.1 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-study</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.1</version>
</dependency>-->
<!-- scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<!-- java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<useUniqueVersions>false</useUniqueVersions>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.gzjy.wordcount.StreamWordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<archive>
<manifest>
<mainClass>com.gzjy.wordcount.StreamWordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
以上的pom的配置支持scala和Java两种语言编程。
1.2 添加scala框架 和 scala文件夹
![](https://img.haomeiwen.com/i18154391/ddcfe5b2e7af7f5c.png)
2. 批处理wordcount
输入文件内容:
hello zhang
hello li
hello me
hello zhang
Java实现:
需要先新建一个类实现FlatMapFunction接口
package com.gzjy.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[]tokens = value.toLowerCase().split(" ");
for (String token:tokens){
if(token.length()>0){
out.collect(new Tuple2<String, Integer>(token,1));
}
}
}
}
wordcount程序:
package com.gzjy.wordcount;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.*;
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
/*DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String,Integer>> counts = text.flatMap(new com.gzjy.wordcount.LineSplitter()).
groupBy(0).sum(1);
counts.print();
*/
DataSet<String> text = env.readTextFile("data/hello.txt");
DataSet<Tuple2<String,Integer>> counts = text.flatMap(new LineSplitter())
.groupBy(0).sum(1);
counts.print();
}
}
运行代码,输出如下:
![](https://img.haomeiwen.com/i18154391/b21f50c0b4e528c2.png)
scala版本
import org.apache.flink.api.scala._
object WordCount2 {
def main(args: Array[String]): Unit = {
//构造执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//读取文件
val inputDataSet= env.readTextFile("data/hello.txt")
// 其中flatMap 和Map 中 需要引入隐式转换
val wordcounts = inputDataSet.flatMap(_.split(" "))
.map((_,1)).groupBy(0).sum(1)
wordcounts.print()
}
}
3. 流处理 wordcount
java实现:
package com.gzjy.wordcount;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// String host = args[0];
// int port = Integer.parseInt(args[1]);
DataStream<String> dataStream = env.socketTextStream("192.168.36.10", 9999);
DataStream<Tuple2<String,Integer>> wordCounts = dataStream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
wordCounts.print().setParallelism(1);
// wordCounts.writeAsText("/home/badou/flink_test/result.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
// wordCounts.writeAsText("data/wordcount_result.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute();
}
}
在linux系统中用
nc -lk 7777
进行发送测试
![](https://img.haomeiwen.com/i18154391/7b7a33181860a658.png)
控制台输出
![](https://img.haomeiwen.com/i18154391/23dacb09033fb12c.png)
文件输出内容:
![](https://img.haomeiwen.com/i18154391/6eea5e0ec24b58fb.png)
scala实现:
import org.apache.flink.streaming.api.scala._
object StreamWordCount2 {
def main(args: Array[String]): Unit = {
//创建流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.socketTextStream("localhost",9999)
//对每条数据进行处理
val wordCountDataStream = dataStream.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(0)
.sum(1)
wordCountDataStream.print().setParallelism(2)
env.execute()
}
}