深究canal发送kafka消息体过大异常

2020-03-23  本文已影响0人  FOUR_c14d

       上文提到在使用canal同步数据库数据时,发现kafka报消息体过大的错,当时只是调大了kafka的max.message.bytes参数以及canal的相应参数进行简单的处理。但是当时根据业务与监听的表觉得不会出现这么大的消息体,于是深入分析原因。    


查看kafka接收记录 

    通过  bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test 命令模拟消费线上数据,发现出现我们的canal.instance.filter.regex白名单配置根本没生效,我们收到了未配置的表数据,继续分析数据

{"data":null,"database":"","es":1583405669000,"id":15601,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"update TABLE_STAT   set   FILE_SIZE=FILE_SIZE+0,   FILE_COUNT=FILE_COUNT+0,   DELETE_COUNT=DELETE_COUNT+0,   UPDATE_COUNT=UPDATE_COUNT+1,   INSERT_COUNT=INSERT_COUNT+0,   GMT_MODIFIED=now()   WHERE PIPELINE_ID=7 and DATA_MEDIA_PAIR_ID=56","sqlType":null,"table":"TABLE_STAT","ts":1583405670037,"type":"QUERY"}

可以看到这条数据的type是QUERY,对应的是数据库的  binlog_rows_query_log_events 事件(我们由于其他业务开启了该事件),该事件可以获取sql,于是我们看canal源码该事件对应的parse处理

private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {

// mysql5.6支持,需要设置binlog-rows-query-log-events=1,可详细打印原始DML语句

String queryString = null;

try { queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());

String tableName = null; 

if (useDruidDdlFilter) {

List<DdlResult> results = DruidDdlParser.parse(queryString, null);

if (results.size() > 0) {

tableName = results.get(0).getTableName();

}

}

return buildQueryEntry(queryString, event.getHeader(), tableName);

} catch (UnsupportedEncodingException e) { t

hrow new CanalParseException(e);

}

}

可以看到该事件canal的处理没有做任何过滤,猜想原因是因为获取不到库名所以不能根据白名单过滤,所以白名单过滤了数据变化的时间,但是这个事件的数据依然能够通过canal发送,导致我们一些不想监听的一些业务表数据通过kafka发送,且这些信息带有sql信息,一些sql过大导致了一些大数据。

解决方案

    其实当我们的业务不需要sql信息的时候我们可以将源码对应的时间注释掉,然后重新打包发布。

注释代码

case LogEvent.ROWS_QUERY_LOG_EVENT:

return parseRowsQueryEvent((RowsQueryLogEvent) logEvent);

上一篇下一篇

猜你喜欢

热点阅读