Flink实战—Flink SQL+Kafka在Streamin
2020-04-12 本文已影响0人
北邮郭大宝
继续Flink的实战,这次实现的是Flink+Kafka,实现在streaming场景下的应用。全部代码请关注GitHub
Flink版本是1.9.1,kafka版本是2.1.0,使用java8开发。
本例是Flink SQL在Streaming场景下的应用,目标是从kafka中读取json串,串中包含id, site, proctime,计算5秒内的网站流量pv。
1. 数据准备
数据的json结构很简单,包含id,site,proctime三个字段。可以写个脚本不停的写入kafka的topic,我这里就简单使用kafka-console-producer.sh往里面粘贴数据了。
{"id": 1, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:01"}
{"id": 2, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:02"}
{"id": 3, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:03"}
{"id": 4, "site": "www.baidu.com/", "proctime": "2020-04-11 00:00:05"}
{"id": 5, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:06"}
{"id": 6, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:07"}
{"id": 7, "site": "https://github.com/tygxy", "proctime": "2020-04-11 00:00:08"}
{"id": 8, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:09"}
{"id": 9, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:11"}
{"id": 10, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:18"}
2. 创建工程
这里直接使用上一篇Flink SQL in Batch创建的项目了,具体信息可参考Flink实战—Flink SQL在Batch场景的Demo
唯一注意的是pox.xml里添了一个处理json的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
3. 实现功能
创建SQLStreaming的JAVA类。
package com.cmbc.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import java.sql.Timestamp;
public class SQLStreaming {
public static void main(String[] args) throws Exception {
// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// kafka source
Kafka kafka = new Kafka()
.version("0.10")
.topic("flink-streaming")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181");
tableEnv.connect(kafka)
.withFormat(
new Json().failOnMissingField(true).deriveSchema()
)
.withSchema(
new Schema()
.field("id", Types.INT)
.field("site", Types.STRING)
.field("proctime", Types.SQL_TIMESTAMP).proctime()
)
.inAppendMode()
.registerTableSource("Data");
// do sql
String sql = "SELECT TUMBLE_END(proctime, INTERVAL '5' SECOND) as processtime," +
"count(1) as pv, site " +
"FROM Data " +
"GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND), site";
Table table = tableEnv.sqlQuery(sql);
// to sink
tableEnv.toAppendStream(table, Info.class).print();
tableEnv.execute("Flink SQL in Streaming");
}
public static class Info {
public Timestamp processtime;
public String site;
public Long pv;
public Info() {
}
public Info(Timestamp processtime, String site, Long pv) {
this.processtime = processtime;
this.pv = pv;
this.site = site;
}
@Override
public String toString() {
return
"processtime=" + processtime +
", site=" + site +
", pv=" + pv +
"";
}
}
}
功能也比较简单,简单说一下:
- 初始化flink env
- 读取kafka内容,配置基本信息并,映射schema,注册成表
- 消费数据,执行sql
- 数据保存或输出
4. 运行和结果
- 启动flink on local的模式 ,在flink的安装路径下找到脚本start-cluster.sh
- 开启zookeeper, sh zkServer start
- 开启kafka
sh kafka-server-start ../config/server.properties
- 开启kafka-console-producer.sh,开始塞数据
sh kafka-console-producer --broker-list localhost:9092 --topic flink-streaming
-
启动flink程序,查看结果
1586655394408.jpg