Flink维表关联系列1-Async IO
最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行一些总结。
1 维表服务与Flink异步IO
1.1 维表服务
维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比如订单业务中,商品属性、商家属性都可以称之为维度表。
所以说,某张表可能整体并不是维度表,但是某些字段表示的是属性,这些属性可看做是维度表。
在flink流处理实时分析或者数仓中,同样会使用维表来完成一些数据过滤或者字段补齐的操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:
<1> 在维度数据量比较小并且业务要求的时效性不高,可以定时全量加载维度数据到内存中,直接从内存中查询维度数据;
比如专柜所在门店的门店信息表等等
<2> 在维度数据量比较大并且业务要求的时效性不高,这时候全量加载就会撑爆内存,可以使用LRU的缓存策略,当缓存的维度数据达到一定大小,采用淘汰最近最少使用的数据,同时还可以设置数据的过期时间;
<3> 业务要求数据时效性比较高,那么就需要flink实时查询,这个时候需要注意外部存储所能承受的QPS;
<4> 最后一种方案直接将维度数据发送到kafka中,flink任务消费kafka的维度数据,然后使用广播方式将维度数据广播到每一个处理task中,这种方式同样要求数据量比较小。
1.2 异步IO
一般来说,在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路就是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库查询时间很长,那么有可能会阻塞流计算的整体流程。
需要注意的是:使用Async I/O的前提是需要一个支持异步请求的客户端,没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。
1.2.1 基于线程池实现基于mysql的jdbc查询异步化
公司同事封装的异步查询JDBC数据库的抽象类:
public abstract class JDBCAsyncFunction<IN, OUT> extends RichAsyncFunction<IN, OUT> {
private static final int DEFAULT_CAPACITY = 20;
/**
* 数据源配置文件路径
* */
private String _path;
/**
* 队列大小
* */
private int _capacity;
/**
* 数据源
* */
private HikariDataSource _datasource;
/**
* 线程池
* */
private ExecutorService _executor;
public JDBCAsyncFunction(String path) {
this(path, DEFAULT_CAPACITY);
}
public JDBCAsyncFunction(String path, int capacity) {
this._path = path;
this._capacity = capacity;
}
protected <V> Future<V> asyncCall(Callable<V> call) {
return this._executor.submit(call);
}
protected QueryRunner createQueryRunner() {
return new QueryRunner(this._datasource);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化 JDBC 连接池
if (this._datasource == null) {
HikariConfig config = new HikariConfig(this._path);
config.setMaximumPoolSize(this._capacity);
this._datasource = new HikariDataSource(config);
}
// 初始化线程池
if (this._executor == null) {
this._executor = Executors.newFixedThreadPool(this._capacity);
}
}
@Override
public void close() throws Exception {
if (this._executor != null) {
this._executor.shutdown();
this._executor = null;
}
if (this._datasource != null) {
this._datasource.close();
this._datasource = null;
}
super.close();
}
}
可以基于此进行异步查询mysql数据库。另外,会发现封装的这个抽象类,用到了线程池和数据库连接池。因为mysql本身并不支持异步数据库操作。
1.2.2 基于异步JDBC组件Vertx实现的mysql查询异步化
<1> 引入Vertx相关依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.8.3</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.8.3</version>
</dependency>
<2> 在open中通过Vertx创建SQLClient,其内部维护自己的异步请求服务;在asyncInvoke中调用获取connection,执行查询,并释放连接;然后在close方法中关闭client。
public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {
private SQLClient client;
@Override
public void open(Configuration parameters) throws Exception {
Vertx vertx = Vertx.vertx(new VertxOptions()
.setWorkerPoolSize(10)
.setEventLoopPoolSize(10));
JsonObject config = new JsonObject()
.put("url", "jdbc:mysql://xx:3306/base")
.put("driver_class", "com.mysql.cj.jdbc.Driver")
.put("max_pool_size", 10)
.put("user", "x")
.put("password", "x");
client = JDBCClient.createShared(vertx, config);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {
client.getConnection(conn -> {
if (conn.failed()) {
return;
}
final SQLConnection connection = conn.result();
connection.query("select id, name from t where id = " + input.getId(), res2 -> {
ResultSet rs = new ResultSet();
if (res2.succeeded()) {
rs = res2.result();
}
List<Store> stores = new ArrayList<>();
for (JsonObject json : rs.getRows()) {
Store s = new Store();
s.setId(json.getInteger("id"));
s.setName(json.getString("name"));
stores.add(s);
}
connection.close();
resultFuture.complete(stores);
});
});
}
}
拓展知识点:
<1> 线程池的原理:
其实线程池的原理很简单,类似于操作系统中的缓冲区的概念,它的流程如下:先启动若干数量的线程,并让这些线程都处于睡眠状态,
当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又处于睡眠状态。
可能你也许会问:为什么要搞得这么麻烦,如果每当客户端有新的请求时,我就创建一个新的线程不就完了?
这也许是个不错的方法,因为它能使得你编写代码相对容易一些,但你却忽略了一个重要的问题?
那就是性能!
高峰期每秒的客户端请求并发数超过100,如果为每个客户端请求创建一个新线程的话,那耗费的CPU时间和内存将是惊人的,如果采用一个拥有200个线程的线程池,
那将会节约大量的系统资源,使得更多的CPU时间和内存用来处理实际的商业应用,而不是频繁的线程创建与销毁。
<2> 数据库连接池的原理:
数据库连接是一种关键的有限的昂贵的资源,这一点在多用户的网页应用程序中体现得尤为突出。
一个数据库连接对象均对应一个物理数据库连接,每次操作都打开一个物理连接,使用完都关闭连接,这样造成系统的性能低下。 数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池(简单说:在一个“池”里放了好多半成品的数据库联接对象),由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。并且应用程序可以根据池中连接的使用率,动态增加或减少池中的连接数。
连接池技术尽可能多地重用了消耗内存地资源,大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。通过使用连接池,将大大提高程序运行效率,同时,我们可以通过其自身的管理机制来监视数据库连接的数量、使用情况等。
1)最小连接数是连接池一直保持的数据库连接,所以如果应用程序对数据库连接的使用量不大,将会有大量的数据库连接资源被浪费;
2)最大连接数是连接池能申请的最大连接数,如果数据库连接请求超过此数,后面的数据库连接请求将被加入到等待队列中,这会影响之后的数据库操作。