Flink_Aysnc IO 异步IO
2022-08-25 本文已影响0人
Eqo
异步IO为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题.
列如:
我们再Flink处理数据的时候,很可能需要关联数据库,从数据库中获取对应信息然后再处理,如果数据量较大,每条数据都去请求数据库,等待数据库回应之后,才能处理数据,及其影响性能.
举例 数据流处理数据时,加入我需要判断这条数据,是否存在于数据库中.
Flink 就采取异步IO,所有的数据先发送请求,响不响应先不管,先继续处理数据流中的数据,谁先响应 先处理谁.
image.png异步模式可以并发的处理多个请求和回复,
可以连续的向数据库发送用户a、b、c、d等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,这也正是Async I/O的实现原理。
使用前提
- 数据库(或kv类型数据存储系统)必须支持异步的请求clinent(如java的vetx)
- 如果不支持异步客户端的话,也可以将同步客户端丢到线程池 中做为异步客户端
使用步骤
- 1.使用
AysncDataStream
工具类对数据流DataStream进行异步处理 - 2.自定义类,继承
RichAsyncFunction 富有的异步方法类
,转换异步处理数据,其中需要异步请求外部存储系统,处理结果
核心代码
使用AsyncDataStream工具类中的unorderedWait()无需等待 方法设置要处理的数据流, 传入 异步处理方法
// todo 3-2. 异步请求Mysql数据库,采用JDBC方式,由于Mysql不支持异步访问,以多线程方式实现
// 使用异步工具类创建 使用无需等待
/**
*
in – Input DataStream 需要进行异步请求的 数据流
func – AsyncFunction 自定义类,转换异步处理数据,其中需要异步请求外部存储系统,处理结果
timeout – for the asynchronous operation to complete 超时时间
timeUnit – of the given timeout 单位
capacity – The max number of async i/o operation that can be triggered 异步请求变化量
*/
//in
SingleOutputStreamOperator<String> resultDS = AsyncDataStream.unorderedWait(tupleStream, new AsyncMysqlRequestResult(),
10000, TimeUnit.MILLISECONDS, 10);
异步处理方法类(重要)
这个方法类 实现的是 出数据类中真实处理逻辑
image.png
package cn.itcast.async;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.concurrent.*;
/**
* 义子类,实现函数接口:AsyncFunction,重写asyncInvoke方法,实现异步请求数据库,获取数据
*
* @author ado
*/
// 实现富有的 异步请求方法类
public class AsyncMysqlRequestResult extends RichAsyncFunction<Tuple2<String, String>, String> {
// 定义mysql 连接变量
private Connection connection = null ;
private PreparedStatement pstmt = null ;
private ResultSet result = null ;
// 定义线程池 因为mysql 不支持异步请求, 所以需要线程池实现
private ExecutorService executorService = null;
// 初始化线程池
@Override
public void open(Configuration parameters) throws Exception {
executorService= Executors.newFixedThreadPool(10);
// a. 加载驱动类
Class.forName("com.mysql.jdbc.Driver") ;
// b. 获取连接
connection = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?useSSL=false", "root", "123456"
);
// c. 实例化Statement对象
pstmt = connection.prepareStatement("SELECT user_name FROM db_flink.tbl_user_info WHERE user_id = ?") ;
}
@Override
public void asyncInvoke(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {
/*
input -> 数据流中每条数据: (u_1009, u_1009,click,2022-08-06 19:30:55.347)
|
wujiu,u_1009,click,2022-08-06 19:30:55.347
*/
// 1.获取用户id
String userID = input.f0;
// 2.todo 异步请求mysql 到数据库中访问对应的用户 userName 一句ueseID 检索
Future<String> future = executorService.submit(new Callable<String>() {
// 相当于再线程池内启动一个线程 去访问数据库
@Override
public String call() throws Exception {
//todo 核心 编写jdbc代码 根据userid 获取username
// d. 设置占位符值,进行查询
pstmt.setString(1, userID);
result = pstmt.executeQuery();
String userName ="null";
// 为什么使用 while 因为我们查询的数据 有可能是多条数据
while (result.next()){
userName = result.getString("user_name");
}
return userName;
}
});
//3. 获取返回结果的 username
String userName = future.get();
String output = userName + "," + input.f1 ;
//4. 将查询数据结构异步返回
resultFuture.complete(Collections.singletonList(output));
}
// 超时时间 这个方法是 如果请求数据库 超时了 数据库没有响应 该怎么办
@Override
public void timeout(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {
// 超时了 直接返回
// 此时的逻辑是 根据 字段获取 数据库中的userid
// 获取日志
String log = input.f1;
//输出数据
String output ="unknown"+log;
// 里面需要结束一个集合
// 通过集合工具类 获取一个集合
resultFuture.complete(Collections.singletonList(output));
}
@Override
public void close() throws Exception {
// 关闭线程池
if(null != executorService) {
executorService.shutdown();
}
// f. 关闭连接
if(null != result){
result.close();
}
if(null != pstmt){
pstmt.close();
}
if(null != connection) {
connection.close();
}
}
}