Flink_Aysnc IO 异步IO

2022-08-25  本文已影响0人  Eqo

异步IO为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题.
列如:
我们再Flink处理数据的时候,很可能需要关联数据库,从数据库中获取对应信息然后再处理,如果数据量较大,每条数据都去请求数据库,等待数据库回应之后,才能处理数据,及其影响性能.
举例 数据流处理数据时,加入我需要判断这条数据,是否存在于数据库中.

Flink 就采取异步IO,所有的数据先发送请求,响不响应先不管,先继续处理数据流中的数据,谁先响应 先处理谁.

image.png
异步模式可以并发的处理多个请求和回复,可以连续的向数据库发送用户a、b、c、d等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,这也正是Async I/O的实现原理。

使用前提

使用步骤

核心代码

使用AsyncDataStream工具类中的unorderedWait()无需等待 方法设置要处理的数据流, 传入 异步处理方法

        // todo 3-2. 异步请求Mysql数据库,采用JDBC方式,由于Mysql不支持异步访问,以多线程方式实现
        // 使用异步工具类创建 使用无需等待
        /**
         *
         in – Input DataStream 需要进行异步请求的 数据流
         func – AsyncFunction    自定义类,转换异步处理数据,其中需要异步请求外部存储系统,处理结果
         timeout – for the asynchronous operation to complete 超时时间
         timeUnit – of the given timeout  单位
         capacity – The max number of async i/o operation that can be triggered 异步请求变化量
         */
        //in
        SingleOutputStreamOperator<String> resultDS = AsyncDataStream.unorderedWait(tupleStream, new AsyncMysqlRequestResult(),
                10000, TimeUnit.MILLISECONDS, 10);

异步处理方法类(重要)

这个方法类 实现的是 出数据类中真实处理逻辑


image.png
package cn.itcast.async;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.concurrent.*;

/**
 * 义子类,实现函数接口:AsyncFunction,重写asyncInvoke方法,实现异步请求数据库,获取数据
 *
 * @author ado
 */
// 实现富有的 异步请求方法类
public class AsyncMysqlRequestResult extends RichAsyncFunction<Tuple2<String, String>, String> {

    // 定义mysql 连接变量
    private Connection connection = null ;
    private PreparedStatement pstmt = null ;
    private ResultSet result = null ;
    // 定义线程池  因为mysql 不支持异步请求, 所以需要线程池实现
    private ExecutorService executorService = null;

    // 初始化线程池
    @Override
    public void open(Configuration parameters) throws Exception {

        executorService= Executors.newFixedThreadPool(10);
        // a. 加载驱动类
        Class.forName("com.mysql.jdbc.Driver") ;
        // b. 获取连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://node1.itcast.cn:3306/?useSSL=false", "root", "123456"
        );
        // c. 实例化Statement对象
        pstmt = connection.prepareStatement("SELECT user_name FROM db_flink.tbl_user_info WHERE user_id = ?") ;

    }

    @Override
    public void asyncInvoke(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {

           /*
            input -> 数据流中每条数据: (u_1009,     u_1009,click,2022-08-06 19:30:55.347)
                        |
                   wujiu,u_1009,click,2022-08-06 19:30:55.347
         */
        // 1.获取用户id
         String userID = input.f0;

        // 2.todo 异步请求mysql 到数据库中访问对应的用户 userName 一句ueseID 检索

        Future<String> future = executorService.submit(new Callable<String>() {
            // 相当于再线程池内启动一个线程 去访问数据库
            @Override
            public String call() throws Exception {

                //todo 核心 编写jdbc代码 根据userid 获取username
                // d. 设置占位符值,进行查询
                pstmt.setString(1, userID);
                result = pstmt.executeQuery();

                String userName ="null";

                // 为什么使用 while 因为我们查询的数据 有可能是多条数据
                while (result.next()){
                    userName = result.getString("user_name");

                }

                return userName;
            }
        });
        //3. 获取返回结果的 username
        String userName = future.get();
        String output = userName + "," + input.f1 ;

        //4. 将查询数据结构异步返回
        resultFuture.complete(Collections.singletonList(output));


    }
    // 超时时间 这个方法是 如果请求数据库 超时了 数据库没有响应 该怎么办
    @Override
    public void timeout(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {
        // 超时了 直接返回

        // 此时的逻辑是 根据 字段获取 数据库中的userid
        // 获取日志
        String log = input.f1;
        //输出数据
        String output ="unknown"+log;
        // 里面需要结束一个集合
        // 通过集合工具类 获取一个集合
        resultFuture.complete(Collections.singletonList(output));



    }

    @Override
    public void close() throws Exception {
        // 关闭线程池
        if(null != executorService) {
            executorService.shutdown();
        }

        // f. 关闭连接
        if(null != result){
            result.close();
        }
        if(null != pstmt){
            pstmt.close();
        }
        if(null != connection) {
            connection.close();
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读