flink1.9 使用LookupableTableSource
LookupableTableSource
LookupableTableSource
是Flink新增的接口,支持通过一个表中的某几列进行数据查找访问。当流数据通过维表关联进行字段补齐时,该接口是非常实用的。目前只在Blink planner支持该接口,并且在未来版本中可能会发生改变。
LookupableTableSource
接口实现以后需要使用temporal table join syntax语法来实现维表JOIN,也就是JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
语法,calcite目前已经支持解析该语法。
LookupableTableSource
是TableSource的子类,getLookupFunction将lookupKeys对应的value值,传递给TableFunction,该TableFunction会同步加载数据。getAsyncLookupFunction则通过异步的方式加载数据,如果使用异步加载的方式需要将isAsyncEnabled设置为true,那么flink会使用异步加载机制来执行自定义的AsyncTableFunction。
public interface LookupableTableSource<T> extends TableSource<T> {
TableFunction<T> getLookupFunction(String[] lookupKeys);
AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
boolean isAsyncEnabled();
}
异步维表读取Mysql案例
接下来我们通过实现LookupableTableSource
及AsyncTableFunction
接口,完成从serversocket读取数据,同时异步读取Mysql维表数据,实现异步维表JOIN,最后将查询的结果打印输出。
- 构建MysqlAsyncLookupFunction,使用vertx从mysql异步加载数据
import cn.todd.flink.jdbc.JDBCParse;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLConnection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.FunctionRequirement;
import org.apache.flink.types.Row;
import java.sql.Date;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class MysqlAsyncLookupFunction extends AsyncTableFunction<Row> {
private final static String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
private final static String URL = "jdbc:mysql://127.0.0.1:3306/mqtest?charset=utf8";
private JDBCClient jdbcClient = null;
private final String[] fieldNames;
private final String[] connectionField;
private final TypeInformation[] fieldTypes;
public MysqlAsyncLookupFunction(String[] fieldNames, String[] connectionField, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.connectionField = connectionField;
}
/**
* 根据传递的keys异步查询维表数据
* @param resultFuture
* @param keys 源表某些字段的值,通常用来做数据筛选时使用
*/
public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... keys) {
JsonArray inputParams = new JsonArray();
Arrays.asList(keys).forEach(inputParams::add);
jdbcClient.getConnection(conn -> {
if (conn.failed()) {
resultFuture.completeExceptionally(conn.cause());
return;
}
final SQLConnection connection = conn.result();
String sqlCondition = getSelectFromStatement("sidetest", fieldNames, connectionField);
// vertx异步查询
connection.queryWithParams(sqlCondition, inputParams, rs -> {
if (rs.failed()) {
resultFuture.completeExceptionally(rs.cause());
return;
}
int resultSize = rs.result().getResults().size();
if (resultSize > 0) {
List<Row> rowList = Lists.newArrayList();
for (JsonArray line : rs.result().getResults()) {
Row row = buildRow(line);
rowList.add(row);
}
resultFuture.complete(rowList);
} else {
resultFuture.complete(Collections.emptyList());
}
// and close the connection
connection.close(done -> {
if (done.failed()) {
throw new RuntimeException(done.cause());
}
});
});
});
}
private Row buildRow(JsonArray line) {
Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
row.setField(i, line.getValue(i));
}
return row;
}
// 数据返回类型
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
@Override
public void open(FunctionContext context) throws Exception {
// 使用vertx来实现异步jdbc查询
JsonObject mysqlClientConfig = new JsonObject();
mysqlClientConfig.put("url", URL)
.put("driver_class", MYSQL_DRIVER)
.put("user", "xxx")
.put("password", "xxx");
System.setProperty("vertx.disableFileCPResolving", "true");
VertxOptions vo = new VertxOptions();
vo.setFileResolverCachingEnabled(false);
vo.setWarningExceptionTime(60000);
vo.setMaxEventLoopExecuteTime(60000);
Vertx vertx = Vertx.vertx(vo);
jdbcClient = JDBCClient.createNonShared(vertx, mysqlClientConfig);
}
public static String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
// 构建查询维表使用的sql
public static String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
String fromClause = Arrays.stream(selectFields).map(JDBCParse::quoteIdentifier).collect(Collectors.joining(", "));
String whereClause = Arrays.stream(conditionFields).map(f -> quoteIdentifier(f) + "=? ").collect(Collectors.joining(", "));
String sqlStatement = "SELECT " + fromClause + " FROM " + quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + whereClause : "");
return sqlStatement;
}
@Override
public void close() throws Exception {
jdbcClient.close();
}
@Override
public String toString() {
return super.toString();
}
@Override
public Set<FunctionRequirement> getRequirements() {
return null;
}
@Override
public boolean isDeterministic() {
return false;
}
// 属性构建
public static final class Builder {
// 查询维表中的字段
private String[] fieldNames;
// 查询条件,where中的条件
private String[] connectionField;
// 维表数据返回的类型
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder getBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withConnectionField(String[] connectionField) {
this.connectionField = connectionField;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public MysqlAsyncLookupFunction build() {
return new MysqlAsyncLookupFunction(fieldNames, connectionField, fieldTypes);
}
}
}
- 创建MysqlAsyncLookupTableSource传递AsyncTableFunction
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
public class MysqlAsyncLookupTableSource implements LookupableTableSource<Row> {
private final String[] fieldNames;
private final String[] connectionField;
private final TypeInformation[] fieldTypes;
public MysqlAsyncLookupTableSource(String[] fieldNames, String[] connectionField, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.connectionField = connectionField;
}
@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
return null;
}
// 使用AsyncTableFunction,加载维表数据
@Override
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
return MysqlAsyncLookupFunction.Builder.getBuilder()
.withFieldNames(fieldNames)
.withFieldTypes(fieldTypes)
.withConnectionField(connectionField)
.build();
}
@Override
public boolean isAsyncEnabled() {
return true;
}
// 读取的数据类型
@Override
public DataType getProducedDataType() {
// 旧版本的Typeinfo类型转新版本的DataType
return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
.build();
}
public static final class Builder {
private String[] fieldNames;
private String[] connectionField;
private TypeInformation[] fieldTypes;
private Builder() {
}
public static Builder newBuilder() {
return new Builder();
}
public Builder withFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}
public Builder withFieldTypes(TypeInformation[] fieldTypes) {
this.fieldTypes = fieldTypes;
return this;
}
public Builder withConnectionField(String[] connectionField) {
this.connectionField = connectionField;
return this;
}
public MysqlAsyncLookupTableSource build() {
return new MysqlAsyncLookupTableSource(fieldNames,connectionField, fieldTypes);
}
}
}
- 创建CustomerSocketTextStreamFunction从serverSocket读取文本并转换为JSON
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Iterator;
public class CustomerSocketTextStreamFunction implements SourceFunction<Row> {
private static final Logger LOG = LoggerFactory.getLogger(CustomerSocketTextStreamFunction.class);
/**
* Default delay between successive connection attempts.
*/
private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 2000;
/**
* Default connection timeout when connecting to the server socket (infinite).
*/
private static final int CONNECTION_TIMEOUT_TIME = 0;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Type information describing the result type.
*/
private final TypeInformation<Row> typeInfo;
/**
* Field names to parse. Indices match fieldTypes indices.
*/
private final String[] fieldNames;
/**
* Types to parse fields as. Indices match fieldNames indices.
*/
private final TypeInformation<?>[] fieldTypes;
private volatile boolean isRunning = true;
private transient Socket currentSocket;
ServersocketSourceTableInfo tableInfo;
public CustomerSocketTextStreamFunction(ServersocketSourceTableInfo tableInfo, TypeInformation<Row> typeInfo) {
this.typeInfo = typeInfo;
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
this.tableInfo = tableInfo;
}
@Override
public void run(SourceContext<Row> ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
while (isRunning) {
try {
Socket socket = new Socket();
currentSocket = socket;
socket.connect(new InetSocketAddress(tableInfo.getHostname(), tableInfo.getPort()), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
String delimiter = tableInfo.getDelimiter();
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
// truncate trailing carriage return
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
ctx.collect(convertToRow(record));
buffer.delete(0, delimPos + delimiter.length());
}
}
} catch (Exception e) {
LOG.info("Connection server failed, Please check configuration !!!!!!!!!!!!!!!!");
}
// if we dropped out of this loop due to an EOF, sleep and retry
if (isRunning) {
attempt++;
if (tableInfo.getMaxNumRetries() == -1 || attempt < tableInfo.getMaxNumRetries()) {
Thread.sleep(DEFAULT_CONNECTION_RETRY_SLEEP);
} else {
// this should probably be here, but some examples expect simple exists of the stream source
// throw new EOFException("Reached end of stream and reconnects are not enabled.");
break;
}
}
}
// collect trailing data
if (buffer.length() > 0) {
ctx.collect(convertToRow(buffer.toString()));
}
}
public Row convertToRow(String record) throws IOException {
JsonNode root = objectMapper.readTree(record);
Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
JsonNode node = getIgnoreCase(root, fieldNames[i]);
if (node == null) {
row.setField(i, null);
} else {
// Read the value as specified type
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
row.setField(i, value);
}
}
return row;
}
@Override
public void cancel() {
isRunning = false;
// we need to close the socket as well, because the Thread.interrupt() function will
// not wake the thread in the socketStream.read() method when blocked.
Socket theSocket = this.currentSocket;
if (theSocket != null) {
IOUtils.closeSocket(theSocket);
}
}
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
Iterator<String> iter = jsonNode.fieldNames();
while (iter.hasNext()) {
String key1 = iter.next();
if (key1.equalsIgnoreCase(key)) {
return jsonNode.get(key1);
}
}
return null;
}
}
- Serversocket实体类
import java.io.Serializable;
public class ServersocketSourceTableInfo implements Serializable {
public ServersocketSourceTableInfo() { }
public ServersocketSourceTableInfo(String hostname, int port, String delimiter, long maxNumRetries) {
this.hostname = hostname;
this.port = port;
this.delimiter = delimiter;
this.maxNumRetries = maxNumRetries;
}
private String hostname;
private int port;
private String delimiter;
private long maxNumRetries;
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDelimiter() {
return delimiter;
}
public void setDelimiter(String delimiter) {
this.delimiter = delimiter;
}
public long getMaxNumRetries() {
return maxNumRetries;
}
public void setMaxNumRetries(long maxNumRetries) {
this.maxNumRetries = maxNumRetries;
}
@Override
public String toString() {
return "ServersocketSourceTableInfo{" +
"hostname='" + hostname + '\'' +
", port=" + port +
", delimiter='" + delimiter + '\'' +
", maxNumRetries=" + maxNumRetries +
'}';
}
}
- 维表关联执行入口
import cn.todd.flink.join.side.MysqlAsyncLookupTableSource;
import cn.todd.flink.join.source.CustomerSocketTextStreamFunction;
import cn.todd.flink.join.source.ServersocketSourceTableInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
public class SideTabelJoin {
public static void main(String[] args) throws Exception {
// use blink and streammode
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
RowTypeInfo typeInformation = buildRowTypeInfo();
CustomerSocketTextStreamFunction sourceFunction = buildCustomerSocketTextStreamFunction(typeInformation);
String tableName = "user_visit";
DataStreamSource serversocketSource = streamEnv.addSource(sourceFunction, tableName, typeInformation);
// 源表
//serversocketSource.print();
tableEnv.registerDataStream(tableName, serversocketSource, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
// 维表
MysqlAsyncLookupTableSource tableSource = MysqlAsyncLookupTableSource.Builder.newBuilder()
.withFieldNames(new String[]{"a", "b", "c", "d"})
.withFieldTypes(new TypeInformation[]{Types.STRING, Types.LONG, Types.STRING, Types.STRING})
.withConnectionField(new String[]{"a"})
.build();
tableEnv.registerTableSource("sideTable", tableSource);
Table table = tableEnv.sqlQuery("select t1.visitCount,t1.name , t1.proctime,s1.b ,s1.c from user_visit as t1 join sideTable " +
" FOR SYSTEM_TIME AS OF t1.proctime s1 on t1.name =s1.a ");
// 查询的结果直接打印
DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
rowDataStream.print();
streamEnv.execute();
}
private static RowTypeInfo buildRowTypeInfo() {
TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
String[] fields = new String[]{"id", "name", "visitCount"};
return new RowTypeInfo(types, fields);
}
private static CustomerSocketTextStreamFunction buildCustomerSocketTextStreamFunction(RowTypeInfo typeInformation) {
ServersocketSourceTableInfo tableInfo = new ServersocketSourceTableInfo("127.0.0.1", 9900, "\n", 3);
return new CustomerSocketTextStreamFunction(tableInfo, typeInformation);
}
- 测试
- 源表数据
nc -l 9900
{"name": "xc","visitCount": 88,"id": "1001"}
- 维表只有a,b,c,d四个字段数据
xc 10 1 xc
- 打印
4> 88,xc,2019-11-24T06:57:57.455,10,1
维表缓存
使用异步访问函数,提高了程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。在实际生产环境下,通常会使用缓存机制避免频繁读取维表,在维表关联之前使用Keyby操作从而提高缓存命中率。
StreamSQL实现维表关联两种方式
- UDTF
LookupFunction
机制使用的就是UDTF的方式,底层实际也是JOIN该UDTF,类似LATERA JOIN
语法。
如果没有LookupFunction
机制,而又想通过标准SQL实现维表JOIN,则可以在上一层使用sql解析工具(例如calcite)做一次转换。如:
// Side的连接以及字段信息已知
SELECT
so.*, s1.a,s1.b
FROM
Source so
JOIN
Side s1
ON
so.id = s1.c
// 解析JOIN条件,从Side中加载并注册UDTF,并重新拼接SQL,交给flinksql执行
SELECT
so.*, s1.a,s1.b
FROM
Source so
LATERAL TABLE (MysqlAsyncFun(id)) AS s1;
- flatMap or asyncInvoke算子
该方式是将源表的数据打到asyncInvoke
算子,由该算子异步加载数据,将源表和维表组成一个宽表后输出并注册为一个新表,并交由flink执行SQL语句,详细可以参考开源项目flinkStreamSQL。其内部sql从转换到执行实例如下:
// 用户sql
SELECT
so.id,so.name,s1.city,s1.address
FROM
Source so
JOIN
Side s1
ON
so.sid=s1.id
// 1.向flink注册源表
// 2.将源表数据到asyncInvoke,输出宽表
// 3. 注册宽表、替换表名、执行最终的SQL语句
SELECT
so_s1.id,so_s1.name,so_s1.city,so_s1.address
FROM
Source_Side so_s1
这种方式其实有三个弊端:1.DataStream到Table来回切换不方便。2. 将两个表合并为一个宽表时,如果两个表包含相同属性名称的话需要做区分。3. select语句中的表名要做替换,考虑的情况比较多。不过这种方式思路比较清晰、可扩展性比较高。
参考文章:
Flink 实时计算 - 维表 Join 的实现
flink1.9新特性:维表JOIN解读
User-defined Sources & Sinks