MySQL技术分享db

druid慢sql日志存储和elk展示

2019-08-13  本文已影响214人  虫儿飞ZLEI

1. 说明

在项目引入了阿里druid数据源以后,可以设置监控页面,阿里druid自带的监控的功能,页面大概如下图所示,可以看到一些sql信息,比如sql语句,sql时间等等。


但是这个是有一定缺陷的,比如它采集的sql是没有限制的,会把所有的sql拿出来,当sql数量非常多时,很难在这个页面中找到你需要的sql,同时它的sql统计也没有存储,数据会不断的丢失。此外,如果在大数据环境里,sql生成的速度非常快时,这个页面也许会出现卡死等情况。

2. 内容

本文主要包含的内容是将druid采集到sql数据进行筛选剔除,只保留与查询相关的慢sql,并进行持久化,以日志的形式保存在日志文件中。
同时利用日志文件,在elk环境中,不断的显示慢sql。大概如下图所示。



3. 持久化sql到日志文件,方式一

3.1 配置文件中开启了stat

spring.datasource.filters: stat,wall,log4j

3.2 准备一个Slf4jLogFilter的Bean,并设置到datasource中


@Bean
public Slf4jLogFilter logFilter(){
    Slf4jLogFilter filter = new Slf4jLogFilter();
    filter.setResultSetLogEnabled(false);
    filter.setConnectionLogEnabled(false);
    filter.setStatementParameterClearLogEnable(false);
    filter.setStatementCreateAfterLogEnabled(false);
    filter.setStatementCloseAfterLogEnabled(false);
    filter.setStatementParameterSetLogEnabled(false);
    filter.setStatementPrepareAfterLogEnabled(false);
    return  filter;
}
        List list= new ArrayList<Filter>(){{add(logFilter);}};
        druidDataSource.setProxyFilters(list);

如果有多个数据源,那么在想要统计的数据源中都需要setProxyFilters。


3.3 配置logback-spring.xml文件

    <appender name="druidlog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>
                druidlog/%d{yyyy-MM-dd}.log
            </FileNamePattern>
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>
    <logger name="druid" level="DEBUG">
        <appender-ref ref="druidlog"/>
    </logger>

运行项目跑数据,就可以看到相应的日志文件如下:


{conn-10001} connected
{conn-10001} pool-connect
{conn-10001, pstmt-20000} created. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000} Parameters : [schema.version]
{conn-10001, pstmt-20000} Types : [VARCHAR]
{conn-10001, pstmt-20000} executed. 43.613649 millis. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000} executed. 43.613649 millis. select * from ACT_GE_PROPERTY where NAME_ = ?
{conn-10001, pstmt-20000, rs-50000} open
{conn-10001, pstmt-20000, rs-50000} Header: [NAME_, VALUE_, REV_]
{conn-10001, pstmt-20000, rs-50000} Result: [schema.version, 5.22.0.0, 1]
{conn-10001, pstmt-20000, rs-50000} closed
{conn-10001, pstmt-20000} clearParameters. 
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001, pstmt-20001} created. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
          
         
                order by id desc
{conn-10001, pstmt-20001} Parameters : []
{conn-10001, pstmt-20001} Types : []
{conn-10001, pstmt-20001} executed. 6.238774 millis. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
          
         
                order by id desc
{conn-10001, pstmt-20001} executed. 6.238774 millis. select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task
          
         
                order by id desc
{conn-10001, pstmt-20001, rs-50001} open
{conn-10001, pstmt-20001, rs-50001} Header: [id, cron_expression, method_name, is_concurrent, description, update_by, bean_class, create_date, job_status, job_group, update_date, create_by, spring_bean, job_name]
{conn-10001, pstmt-20001, rs-50001} Result: [2, 0/10 * * * * ?, run1, 1, , 4028ea815a3d2a8c015a3d2f8d2a0002, com.bootdo.common.task.WelcomeJob, 2017-05-19 18:30:56.0, 0, group1, 2017-05-19 18:31:07.0, null, , welcomJob]
{conn-10001, pstmt-20001, rs-50001} closed
{conn-10001, pstmt-20001} clearParameters. 
{conn-10001} pool-recycle
{conn-10001} pool-connect
{conn-10001, pstmt-20002} created. select `cid`,`title`,`slug`,`created`,`modified`,`type`,`tags`,`categories`,`hits`,`comments_num`,`allow_comment`,`allow_ping`,`allow_feed`,`status`,`author`,`gtm_create`,`gtm_modified` from blog_content
         WHERE  type = ? 
         
                order by cid desc
             
         
            limit 0, 10
{conn-10001, pstmt-20002} Parameters : [article]

但是上面的日志文件有一些问题,比如记录的内容比较乱,我们希望它可以一行记录一条sql。那么就需要去捕获它的sql,然后自己格式化输出到日志。

3.4 捕获sql,格式化输出

datasource.setTimeBetweenLogStatsMillis(3000);
@Component
public class MyStatLogger extends DruidDataSourceStatLoggerAdapter implements DruidDataSourceStatLogger {

    private static Logger logger = LoggerFactory.getLogger("druid");

    @Override
    public void log(DruidDataSourceStatValue statValue) {
        if (statValue.getSqlList().size() > 0) {
            for (JdbcSqlStatValue sqlStat : statValue.getSqlList()) {
                Map<String, Object> sqlStatMap = new LinkedHashMap<String, Object>();
                String sql;
                sql = sqlStat.getSql().replace("\t", "");
                sql = sql.replace("\n", "");
                sqlStatMap.put("sql", sql);
                if (sqlStat.getExecuteCount() > 0) {
                    sqlStatMap.put("executeCount", sqlStat.getExecuteCount());
                    sqlStatMap.put("executeMillisMax", sqlStat.getExecuteMillisMax());
                    sqlStatMap.put("executeMillisTotal", sqlStat.getExecuteMillisTotal());
                    sqlStatMap.put("createtime", LocalDateTime.now());
                    sqlStatMap.put("systemName", "JCPT");
                }
                    logger.error(sqlStatMap.toString());
            }
        }
    }
}
    <appender name="druidlog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>
                druidlog/%d{yyyy-MM-dd}.log
            </FileNamePattern>
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>
    <logger name="druid" level="ERROR">
        <appender-ref ref="druidlog"/>
    </logger>

再次运行代码,查看日志输出内容,会如下:

{sql=select * from ACT_GE_PROPERTY where NAME_ = ?, executeCount=1, executeMillisMax=45, executeMillisTotal=45, createtime=2019-08-02T11:42:44.106, systemName=SMRZ}
{sql=select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task                                   order by id desc, executeCount=1, executeMillisMax=6, executeMillisTotal=6, createtime=2019-08-02T11:42:50.107, systemName=SMRZ}
{sql=select `cid`,`title`,`slug`,`created`,`modified`,`type`,`tags`,`categories`,`hits`,`comments_num`,`allow_comment`,`allow_ping`,`allow_feed`,`status`,`author`,`gtm_create`,`gtm_modified` from blog_content         WHERE  type = ?                          order by cid desc  limit 0, 10, executeCount=1, executeMillisMax=40, executeMillisTotal=40, createtime=2019-08-02T11:43:05.110, systemName=SMRZ}
{sql=select count(*) from blog_content  WHERE  type = ?, executeCount=1, executeMillisMax=5, executeMillisTotal=5, createtime=2019-08-02T11:43:05.114, systemName=SMRZ}
{sql=select        `user_id`,`username`,`name`,`password`,`dept_id`,`email`,`mobile`,`status`,`user_id_create`,`gmt_create`,`gmt_modified`,`sex`,`birth`,`pic_id`,`live_address`,`hobby`,`province`,`city`,`district`        from sys_user         WHERE  username = ?                          order by user_id desc, executeCount=1, executeMillisMax=69, executeMillisTotal=69, createtime=2019-08-02T11:43:08.114, systemName=SMRZ}
{sql=insert into sys_log(`user_id`, `username`, `operation`, `time`, `method`, `params`, `ip`, `gmt_create`)values(?, ?, ?, ?, ?, ?, ?, ?), executeCount=1, executeMillisMax=300, executeMillisTotal=300, createtime=2019-08-02T11:43:08.114, systemName=SMRZ}
{sql=select distinctm.menu_id , parent_id, name, url,perms,`type`,icon,order_num,gmt_create, gmt_modifiedfrom sys_menu mleftjoin sys_role_menu rm on m.menu_id = rm.menu_id left joinsys_user_role uron rm.role_id =ur.role_id where ur.user_id = ?andm.type in(0,1)order bym.order_num}
{sql=insert into sys_log(`user_id`, `username`, `operation`, `time`, `method`, `params`, `ip`, `gmt_create`)values(?, ?, ?, ?, ?, ?, ?, ?), executeCount=1, executeMillisMax=42, executeMillisTotal=42, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select distinctm.menu_id , parent_id, name, url,perms,`type`,icon,order_num,gmt_create, gmt_modifiedfrom sys_menu mleftjoin sys_role_menu rm on m.menu_id = rm.menu_id left joinsys_user_role uron rm.role_id =ur.role_id where ur.user_id = ?andm.type in(0,1)order bym.order_num, executeCount=1, executeMillisMax=278, executeMillisTotal=278, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select `id`,`type`,`url`,`create_date` from sys_file where id = ?, executeCount=1, executeMillisMax=28, executeMillisTotal=28, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select DISTINCTn.id ,`type`,`title`,`content`,`files`,r.is_read,`status`,`create_by`,`create_date`,`update_by`,`update_date`,`remarks`,`del_flag`from oa_notify_record r right JOIN oa_notify n on r.notify_id = n.id WHERE  r.is_read = ?  and r.user_id = ? order by is_read ASC, update_date DESC limit ?, ?, executeCount=1, executeMillisMax=41, executeMillisTotal=41, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select count(*)fromoa_notify_record r right JOIN oa_notify n on r.notify_id= n.id wherer.user_id =? andr.is_read = ?, executeCount=1, executeMillisMax=7, executeMillisTotal=7, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select table_name tableName, engine, table_comment tableComment, create_time createTime from information_schema.tables where table_schema = (select database()), executeCount=1, executeMillisMax=7, executeMillisTotal=7, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}
{sql=select `id`,`cron_expression`,`method_name`,`is_concurrent`,`description`,`update_by`,`bean_class`,`create_date`,`job_status`,`job_group`,`update_date`,`create_by`,`spring_bean`,`job_name` from sys_task                                   order by id desc  limit ?, ?, executeCount=1, executeMillisMax=5, executeMillisTotal=5, createtime=2019-08-02T11:43:11.115, systemName=SMRZ}

这里的操作思路大概是用我们自己输出的日志替代druid输出的日志,同时提升了日志输出级别,屏蔽了druid输出的日志,那么就只剩我们自己要输出的日志

3. 持久化sql到日志文件,方式二

上面方式一输出的日志有一个问题,输出的sql日志没有参数,所有的参数都以?代替,我们希望能够输出带参数的日志。还有一个问题,使用这种方式的话,原来druid的sql统计页面就采集不到sql了,我们希望在采集日志到log文件的同时不影响其自身的统计页面。

3.1 思路

在druid中Slf4jLogFilter提供了一个参数可以用来设置sql输出带参数:

filter.setStatementExecutableSqlLogEnable(true);

但是这个设置只对其自身打印的sql有效,对我们重写的StatLogger无效,那么只能继续使用自身的sql打印,但是我们需要重写druid的部分源代码,更改sql输出格式,和限制只输出慢sql。

3.2 步骤

    @Bean(name="mysqlDataSource")
    @Qualifier("mysqlDataSource")
    @ConfigurationProperties(prefix ="spring.datasource-mysql")
    @Primary
    public DataSource mysqlDataSource(){
        DruidDataSource druidDataSource =  new DruidDataSource();
        druidDataSource.setConnectionProperties(connectionProperties.replace("${publicKey}", publicKey));

        List list= new ArrayList<Filter>(){{add(logFilter);}};
        druidDataSource.setProxyFilters(list);
        return druidDataSource;
    }
datasource.setTimeBetweenLogStatsMillis(3000);
package com.base.web.common.adapter;

import cn.hutool.core.date.DateUtil;
import com.alibaba.druid.filter.logging.Slf4jLogFilter;
import com.alibaba.druid.proxy.jdbc.CallableStatementProxy;
import com.alibaba.druid.proxy.jdbc.JdbcParameter;
import com.alibaba.druid.proxy.jdbc.PreparedStatementProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.base.web.common.domain.DruidSqlLogBean;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
/**
 * @author zhanglei
 * @email ah.zhanglei4@aisino.com
 * @date 2019/8/8
 */
public class DruidSlf4jLoggerFilterEx extends Slf4jLogFilter {

    private static final Logger LOG = LoggerFactory
            .getLogger(DruidSlf4jLoggerFilterEx.class);

    private Logger statementLogger  = LoggerFactory.getLogger(statementLoggerName);

    private SQLUtils.FormatOption statementSqlFormatOption             = new SQLUtils.FormatOption(false, true);

    private static final String JCPT_NAME = "JCPT";
    private static final int TIME_LINE = 3000;
    private static final String SELECT_LOWER = "select";
    private static final String SELECT_CAPITAL = "SELECT";

    private void recordSql(String formattedSql,double millis,StatementProxy statement,String message){
        DruidSqlLogBean druidSqlLogBean = new DruidSqlLogBean();
        druidSqlLogBean.setDate(DateUtil.now());
        druidSqlLogBean.setSql(formattedSql);
        druidSqlLogBean.setMillisecond(millis);
        druidSqlLogBean.setSystem(JCPT_NAME);
        druidSqlLogBean.setMessage(message);
        druidSqlLogBean.setConneid(String.valueOf(statement.getConnectionProxy().getId()));
        druidSqlLogBean.setStmtid(stmtId(statement));
        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
        statementLog(gson.toJson(druidSqlLogBean));
    }

    private String stmtId(StatementProxy statement) {
        StringBuffer buf = new StringBuffer();
        if (statement instanceof CallableStatementProxy) {
            buf.append("cstmt-");
        } else if (statement instanceof PreparedStatementProxy) {
            buf.append("pstmt-");
        } else {
            buf.append("stmt-");
        }
        buf.append(statement.getId());

        return buf.toString();
    }

    @Override
    protected void statementLog(String message) {
        if(message.contains(JCPT_NAME)){
            statementLogger.error(message);
        }
    }

    @Override
    protected void statementExecuteAfter(StatementProxy statement, String sql, boolean firstResult) {

        if (!(sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL))){
            return;
        }
        statement.setLastExecuteTimeNano();
        double nanos = statement.getLastExecuteTimeNano();
        double millis = nanos / (1000 * 1000);
        if (millis < TIME_LINE){
            return;
        }
        int parametersSize = statement.getParametersSize();

        List<Object> parameters = new ArrayList<Object>(parametersSize);
        for (int i = 0; i < parametersSize; ++i) {
            JdbcParameter jdbcParam = statement.getParameter(i);
            parameters.add(jdbcParam != null
                    ? jdbcParam.getValue()
                    : null);
        }
        String formattedSql = formatparamSql(sql,parametersSize,statement,parameters);
        if (millis > TIME_LINE){
            if (sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL)){
                formattedSql = formattedSql.replace("\t", " ");
                formattedSql = formattedSql.replace("\n", " ");
                recordSql(formattedSql,millis,statement,"");
            }
        }
    }

    private String formatparamSql(String sql, int parametersSize, StatementProxy statement, List<Object> parameters) {
        String formattedSql = sql;
        if (parametersSize>0){
            String dbType = statement.getConnectionProxy().getDirectDataSource().getDbType();
            formattedSql = SQLUtils.format(sql, dbType, parameters, this.statementSqlFormatOption);
        }
        return formattedSql;
    }
}

上述代码主要重写了两个方法,一个是statementLog,用来输出log日志,一个是statementExecuteAfter,用来做sql参数添加、sql过滤,sql时间计算,sql格式化等等。

    @Bean
    public DruidSlf4jLoggerFilterEx logFilter() {
        DruidSlf4jLoggerFilterEx filter = new DruidSlf4jLoggerFilterEx();
        filter.setStatementExecutableSqlLogEnable(true);
        filter.setStatementLogEnabled(true);
        filter.setResultSetLogEnabled(false);
        filter.setConnectionLogEnabled(false);
        filter.setDataSourceLogEnabled(false);
        filter.setStatementCreateAfterLogEnabled(false);
        filter.setStatementPrepareAfterLogEnabled(false);
        filter.setStatementPrepareCallAfterLogEnabled(false);
        filter.setStatementExecuteAfterLogEnabled(false);
        filter.setStatementExecuteQueryAfterLogEnabled(false);
        filter.setStatementExecuteUpdateAfterLogEnabled(false);
        filter.setStatementExecuteBatchAfterLogEnabled(false);
        filter.setStatementCloseAfterLogEnabled(false);
        filter.setStatementParameterSetLogEnabled(false);
        filter.setStatementParameterClearLogEnable(false);
        return filter;
    }

运行代码应该可以看到有参数,并且格式化了的日志

{"sql":"select xxx from xxx where xxx = 'xxx'  and xxx = xxx order by xxx desc limit 10","date":"2019-08-12 15:41:12","time":"3698.787206","system":"xxx","conneid":"110004","stmtid":"pstmt-120031"}

再看druid自带的统计页面,也已经可以正常显示了。

如果上述步骤都做好了,但是依然没有效果,可以检查有没有下面的坑。

虽然设置了

        filter.setStatementExecutableSqlLogEnable(true);
        filter.setStatementLogEnabled(true);

但是本人在测试的时候,这里设置的参数并不起作用。
那么可以在DruidSlf4jLoggerFilterEx代码中重写如下代码:

    @Override
    public boolean isStatementLogEnabled() {
        return true;
    }
    @Override
    public boolean isStatementExecutableSqlLogEnable() {
        return true;
    }

3.3 记录错误的sql

如果想把错误的sql也记录下来,需要再重写一个druid的方法,在DruidSlf4jLoggerFilterEx中:

    @Override
    protected void statement_executeErrorAfter(StatementProxy statement,
                                               String sql, Throwable error) {
        if (!this.isStatementLogErrorEnabled()) {
            return;
        }
        if (!(sql.contains(SELECT_LOWER)||sql.contains(SELECT_CAPITAL))){
            return;
        }
        int parametersSize = statement.getParametersSize();

        List<Object> parameters = new ArrayList<Object>(parametersSize);
        for (int i = 0; i < parametersSize; ++i) {
            JdbcParameter jdbcParam = statement.getParameter(i);
            parameters.add(jdbcParam != null
                    ? jdbcParam.getValue()
                    : null);
        }
        String formattedSql = formatparamSql(sql,parametersSize,statement,parameters);
        statement.setLastExecuteTimeNano();
        double nanos = statement.getLastExecuteTimeNano();
        double millis = nanos / (1000 * 1000);
        formattedSql = formattedSql.replace("\t", " ");
        formattedSql = formattedSql.replace("\n", " ");
        String message = error.getLocalizedMessage();
        message = message.replace("\\t", " ");
        message = message.replace("\\n", " ");
        recordSql(formattedSql,millis,statement,"ERROT_MESSAGE--"+message);
    }

增加配置


到此,记录sql日志完成。

4 elk操作

目前,整个流程是
项目运行产生日志文件,filebeat读取日志文件,发送给logstash,logstash收到以后存储在kafka上面,然后另一个logstash读取kafka数据,之后存储在elasticsearch上面,最后kibana利用es的数据展示。

这里只介绍filebeat的相关操作

上一篇下一篇

猜你喜欢

热点阅读