通过flink 插入数据Stream_load 方式
2022-01-04 本文已影响0人
wudl
1. 场景:
通过模拟数据 发送kafa ---> flink 接收到kakfa 数据--> 然后通过flink 进行 streamLoad 插入数据到doris 中;
2. 建表语句
CREATE TABLE `wudl_doris01` (
`id` int NULL COMMENT "",
`name` varchar(200) NULL COMMENT "",
`address` string NULL COMMENT "",
`city` varchar(2000) NULL COMMENT "",
`phone` varchar(200) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT "flink sink 测试表"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
3.flink 插入数据代码:
项目结构
flink-doris目录.png
3.1 DorisBean:
package com.wudl.flink.doris.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
* @author :wudl
* @date :Created in 2022-01-02 20:31
* @description:
* @modified By:
* @version: 1.0
*/
public class DorisBean implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String address;
private String city;
private String phone;
@Override
public String toString() {
return "DorisBean{" +
"id=" + id +
", name='" + name + '\'' +
", address='" + address + '\'' +
", city='" + city + '\'' +
", phone='" + phone + '\'' +
'}';
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getPhone() {
return phone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void setPhone(String phone) {
this.phone = phone;
}
public DorisBean() {
}
public DorisBean(Integer id, String name, String address, String city, String phone) {
this.id = id;
this.name = name;
this.address = address;
this.city = city;
this.phone = phone;
}
}
3.2 返回的一个bean
package com.wudl.flink.doris.bean;
import java.io.Serializable;
/**
*
*
* @author wudl
*/
public class RespContent implements Serializable {
private static final long serialVersionUID = 1L;
private int TxnId;
private String Label;
private String Status;
private String ExistingJobStatus;
private String Message;
private long NumberTotalRows;
private long NumberLoadedRows;
private int NumberFilteredRows;
private int NumberUnselectedRows;
private long LoadBytes;
private int LoadTimeMs;
private int BeginTxnTimeMs;
private int StreamLoadPutTimeMs;
private int ReadDataTimeMs;
private int WriteDataTimeMs;
private int CommitAndPublishTimeMs;
private String ErrorURL;
public int getTxnId() {
return TxnId;
}
public void setTxnId(int txnId) {
TxnId = txnId;
}
public String getLabel() {
return Label;
}
public void setLabel(String label) {
Label = label;
}
public String getStatus() {
return Status;
}
public void setStatus(String status) {
Status = status;
}
public String getExistingJobStatus() {
return ExistingJobStatus;
}
public void setExistingJobStatus(String existingJobStatus) {
ExistingJobStatus = existingJobStatus;
}
public String getMessage() {
return Message;
}
public void setMessage(String message) {
Message = message;
}
public long getNumberTotalRows() {
return NumberTotalRows;
}
public void setNumberTotalRows(long numberTotalRows) {
NumberTotalRows = numberTotalRows;
}
public long getNumberLoadedRows() {
return NumberLoadedRows;
}
public void setNumberLoadedRows(long numberLoadedRows) {
NumberLoadedRows = numberLoadedRows;
}
public int getNumberFilteredRows() {
return NumberFilteredRows;
}
public void setNumberFilteredRows(int numberFilteredRows) {
NumberFilteredRows = numberFilteredRows;
}
public int getNumberUnselectedRows() {
return NumberUnselectedRows;
}
public void setNumberUnselectedRows(int numberUnselectedRows) {
NumberUnselectedRows = numberUnselectedRows;
}
public long getLoadBytes() {
return LoadBytes;
}
public void setLoadBytes(long loadBytes) {
LoadBytes = loadBytes;
}
public int getLoadTimeMs() {
return LoadTimeMs;
}
public void setLoadTimeMs(int loadTimeMs) {
LoadTimeMs = loadTimeMs;
}
public int getBeginTxnTimeMs() {
return BeginTxnTimeMs;
}
public void setBeginTxnTimeMs(int beginTxnTimeMs) {
BeginTxnTimeMs = beginTxnTimeMs;
}
public int getStreamLoadPutTimeMs() {
return StreamLoadPutTimeMs;
}
public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
StreamLoadPutTimeMs = streamLoadPutTimeMs;
}
public int getReadDataTimeMs() {
return ReadDataTimeMs;
}
public void setReadDataTimeMs(int readDataTimeMs) {
ReadDataTimeMs = readDataTimeMs;
}
public int getWriteDataTimeMs() {
return WriteDataTimeMs;
}
public void setWriteDataTimeMs(int writeDataTimeMs) {
WriteDataTimeMs = writeDataTimeMs;
}
public int getCommitAndPublishTimeMs() {
return CommitAndPublishTimeMs;
}
public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
CommitAndPublishTimeMs = commitAndPublishTimeMs;
}
public String getErrorURL() {
return ErrorURL;
}
public void setErrorURL(String errorURL) {
ErrorURL = errorURL;
}
@Override
public String toString() {
return "RespContent{" +
"TxnId=" + TxnId +
", Label='" + Label + '\'' +
", Status='" + Status + '\'' +
", ExistingJobStatus='" + ExistingJobStatus + '\'' +
", Message='" + Message + '\'' +
", NumberTotalRows=" + NumberTotalRows +
", NumberLoadedRows=" + NumberLoadedRows +
", NumberFilteredRows=" + NumberFilteredRows +
", NumberUnselectedRows=" + NumberUnselectedRows +
", LoadBytes=" + LoadBytes +
", LoadTimeMs=" + LoadTimeMs +
", BeginTxnTimeMs=" + BeginTxnTimeMs +
", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
", ReadDataTimeMs=" + ReadDataTimeMs +
", WriteDataTimeMs=" + WriteDataTimeMs +
", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
", ErrorURL='" + ErrorURL + '\'' +
'}';
}
}
3.4 DorisSink
package com.wudl.flink.doris.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.RespContent;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author :wudl
* @date :Created in 2022-01-02 20:00
* @description:
* @modified By:
* @version: 1.0
*/
public class DorisSink extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private DorisStreamLoad dorisStreamLoad;
private String columns;
private String jsonFormat;
public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
this.dorisStreamLoad = dorisStreamLoad;
this.columns = columns;
this.jsonFormat = jsonFormat;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
/**
* 判断StreamLoad是否成功
*
* @param respContent streamload返回的响应信息(JSON格式)
* @return
*/
public static Boolean checkStreamLoadStatus(RespContent respContent) {
return DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
&& respContent.getNumberTotalRows() == respContent.getNumberLoadedRows();
}
@Override
public void invoke(String value, Context context) throws Exception {
// 截取有效数据
JSONObject data = JSONObject.parseObject(value);
// value = JSON.toJSONString(value);
value = JSON.toJSONString(data.get("data"));
System.out.println("value----"+value);
DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
if (loadResponse != null && loadResponse.status == 200) {
RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
if (!checkStreamLoadStatus(respContent)) {
log.error("Stream Load fail{}:", loadResponse);
} else {
log.info("Stream Load success{}:", loadResponse);
}
} else {
log.error("Stream Load Request failed:{}", loadResponse);
}
}
}
3.5 GenerateData 生成数据
package com.wudl.flink.doris.source;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wudl.flink.doris.bean.DorisBean;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author :wudl
* @date :Created in 2022-01-01 13:05
* @description: 产生数据
* @modified By:
* @version: 1.0
*/
public class GenerateData implements SourceFunction<String> {
private boolean isRunning = true;
String[] citys = {"北京","广东","山东","江苏","河南","上海","河北","浙江","香港","山西","陕西","湖南","重庆","福建","天津","云南","四川","广西","安徽","海南","江西","湖北","山西","辽宁","内蒙古"};
Integer i = 0;
List<DorisBean> list = new ArrayList<>();
@Override
public void run(SourceContext<String> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning) {
int number = random.nextInt(4) + 1;
Integer id = i++;
String name = df.format(new Date());
// Integer name = id+100;
String address = "1";
String city = citys[random.nextInt(citys.length)];
// int age = random.nextInt(25);
String phone = getTel();
DorisBean dorisBean = new DorisBean(id,name,address,city,phone);
list.add(dorisBean);
if (list.size()==2000)
{
Map<String,List> map = new HashMap<>();
map.put("data",list);
String s = JSON.toJSONString(map);
System.out.println("map--->"+s);
list = new ArrayList<>();
ctx.collect(s);
// Thread.sleep(5000*2);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
private static String[] telFirst="134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
private static String getTel() {
int index=getNum(0,telFirst.length-1);
String first=telFirst[index];
String second=String.valueOf(getNum(1,888)+10000).substring(1);
String third=String.valueOf(getNum(1,9100)+10000).substring(1);
return first+second+third;
}
public static int getNum(int start,int end) {
return (int)(Math.random()*(end-start+1)+start);
}
public static void main(String[] args) throws Exception {
String default_topic = "wudltopicdoris01";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
DataStreamSource<String> generateSource = env.addSource(new GenerateData());
generateSource.print("--------");
generateSource.addSink(MyKafkaUtil.getKafkaProducer(default_topic));
env.execute();
}
}
3.6 doris 加载工具类
package com.wudl.flink.doris.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;
/**
* Doris Stream Load 工具类
*
* @author wudl
*/
public class DorisStreamLoad implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
/**
* 连接地址,这里使用的是连接FE
*/
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
/**
* fe ip地址
*/
private String hostPort;
/**
* 数据库
*/
private String db;
/**
* 要导入的数据表名
*/
private String table;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 导入连接
*/
private String loadUrlStr;
/**
* 授权
*/
private String authEncoding;
public DorisStreamLoad(String hostPort, String db, String table, String username, String password) {
this.hostPort = hostPort;
this.db = db;
this.table = table;
this.username = username;
this.password = password;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, table);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8));
}
/**
* 获取http连接信息
*/
private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "0");
conn.addRequestProperty("strict_mode", "true");
conn.addRequestProperty("columns", columns);
conn.addRequestProperty("format", "json");
conn.addRequestProperty("jsonpaths", jsonformat);
conn.addRequestProperty("strip_outer_array", "true");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
}
public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;
public LoadResponse(int status, String respMsg, String respContent) {
this.status = status;
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status: ").append(status);
sb.append(", resp msg: ").append(respMsg);
sb.append(", resp content: ").append(respContent);
return sb.toString();
}
}
/**
* 执行数据导入
*/
public LoadResponse loadBatch(String data, String columns, String jsonformat) {
Calendar calendar = Calendar.getInstance();
//导入的lable,全局唯一
String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
try {
// build request and send to fe
feConn = getConnection(loadUrlStr, label, columns, jsonformat);
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
if (status != 307) {
throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
}
String location = feConn.getHeaderField("Location");
if (location == null) {
throw new Exception("redirect location is null");
}
// build request and send to new be location
beConn = getConnection(location, label, columns, jsonformat);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(data.getBytes());
bos.close();
// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
return new LoadResponse(status, respMsg, response.toString());
} catch (Exception e) {
e.printStackTrace();
String err = "failed to load audit via AuditLoader plugin with label: " + label;
log.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
} finally {
if (feConn != null) {
feConn.disconnect();
}
if (beConn != null) {
beConn.disconnect();
}
}
}
}
3.7 kafka 工具类
package com.wudl.flink.doris.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* @ClassName : MyKafkaUtil
* @Description : kakfa 工具类
* @Author :wudl
* @Date: 2021-10-07 21:18
*/
public class MyKafkaUtil {
private static String brokers = "192.168.1.161:6667";
private static String default_topic = "wudltopic";
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return new FlinkKafkaProducer<T>(default_topic,
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return new FlinkKafkaConsumer<String>(topic,
new SimpleStringSchema(),
properties);
}
/**
* 拼接kafka 相关属性到ddl
* @param topic
* @param groupId
* @return
*/
//拼接Kafka相关属性到DDL
public static String getKafkaDDL(String topic, String groupId) {
return " 'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + brokers + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset' ";
}
}
3.8 程序入口类
package com.wudl.flink.doris;
import com.wudl.flink.doris.sink.DorisSink;
import com.wudl.flink.doris.source.GenerateData;
import com.wudl.flink.doris.utils.DorisStreamLoad;
import com.wudl.flink.doris.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
/**
* @author :wudl
* @date :Created in 2022-01-01 13:15
* @description:
* @modified By:
* @version: 1.0
*/
public class DorisApp {
private static final String bootstrapServer = "192.168.1.161:6667";
private static final String groupName = "flink_doris_group006";
private static final String topicName = "wudltopicdoris01";
private static final String hostPort = "192.168.1.161:8090";
private static final String dbName = "wudldb";
private static final String tbName = "wudl_doris01";
private static final String userName = "root";
private static final String password = "";
// private static final String columns = "id,name,address,city,phone";
private static final String columns = "address,city,id,name,phone";
//"address":"广东省","city":"海南","id":183,"name":"2022-01-03 00:41:37","phone":"15007840220"}
// private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
private static final String jsonFormat = "[\"$.address\",\"$.city\",\"$.id\",\"$.name\",\"$.phone\"]";
// private static final String jsonFormat = "[\"$.address\",\"$.city\",\"$.id\",\"$.name\",\"$.phone\"]";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// props.put("auto.offset.reset", "earliest");
// props.put("max.poll.records", "10000");
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// blinkStreamEnv.setParallelism(1);
blinkStreamEnv.enableCheckpointing(10000);
blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
simpleStringSchema,
props);
// DataStreamSource<String> dataStreamSource = blinkStreamEnv.socketTextStream("192.168.1.163", 9999);
// DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(new GenerateData());
dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println(s);
return s;
}
});
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
dataStreamSource.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));
blinkStreamEnv.execute("flink kafka to doris");
}
}
4.执行结果 执行半个小时 插入插入 4千万数据
doris-count.png5. pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>-->
<!-- <artifactId>Flink-learning</artifactId>-->
<!-- <groupId>com.wudl.flink</groupId>-->
<!-- <version>1.0-SNAPSHOT</version>-->
<!-- </parent>-->
<groupId>org.wudlflink13</groupId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<artifactId>wudl-flink-13</artifactId>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<flink.version>1.13.3</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.3</version>
</dependency>
<!--依赖Scala语言-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<!-- <exclusion>-->
<!-- <artifactId>flink-core</artifactId>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- </exclusion>-->
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hive</groupId>-->
<!-- <artifactId>hive-exec</artifactId>-->
<!-- <version>2.1.0</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version>-->
</dependency>
<!-- 高性能异步组件:Vertx-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>3.9.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>