Spring Boot

一种数据库同步简易方法

2020-03-29  本文已影响0人  EasyNetCN

基于微服务架构设计时,一般数据库是独立的,但是在数据统计分析的时候,很多时候很不方便,下面的代码实现了根据需求可以把多个库的表,按照业务需求同步到一个库中。当然任解决方案都不是空中楼阁,要面向业务。这种方法适合以下场景:

1.数据量不是特别大

2.实时性要求不是特别高

sync.png
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;

import cn.ydyun360.common.controller.QuerySqlParam;
import cn.ydyun360.common.utility.SpringContextHolder;
import cn.ydyun360.stats.service.TableSyncService;
import cn.ydyun360.stats.service.model.TableSyncResult;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class TableSyncServiceImpl implements TableSyncService {
    @Autowired
    private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

    @Override
    public Mono<TableSyncResult> sync(String database, String table, Integer limit) {
        var result = new TableSyncResult();

        result.setStartTime(LocalDateTime.now());
        result.setDatabase(database);
        result.setTable(table);
        result.setLimit(limit);

        var webClient = getWebClient(database);
        var destUpdateTime = getDestUpdateTime(database, table);

        return getSrcUpdateTime(webClient, table).flatMap(srcUpdateTime -> {
            return getTotal(webClient, table, srcUpdateTime, destUpdateTime).flatMap(total -> {
                result.setTotal(total);

                if (total > 0) {
                    return getTableSchema(webClient, table).flatMap(schema -> {
                        var sb = new StringBuilder("INSERT INTO ").append(database).append("_").append(table)
                                .append("(");

                        for (var i = 0; i < schema.size(); i++) {
                            var map = schema.get(i);
                            var field = map.get("COLUMN_NAME").toString();

                            sb.append(field);

                            if (i < schema.size() - 1) {
                                sb.append(",");
                            }
                        }

                        sb.append(") VALUES(");

                        for (var i = 0; i < schema.size(); i++) {
                            var map = schema.get(i);
                            var field = map.get("COLUMN_NAME").toString();

                            sb.append(":").append(field);

                            if (i < schema.size() - 1) {
                                sb.append(",");
                            }
                        }

                        sb.append(") ON DUPLICATE KEY UPDATE ");

                        var f = schema.stream().filter(item -> !item.get("COLUMN_NAME").equals("id"))
                                .collect(Collectors.toList());

                        for (var i = 0; i < f.size(); i++) {
                            var map = f.get(i);
                            var field = map.get("COLUMN_NAME").toString();

                            sb.append(field).append("=:").append(field);

                            if (i < f.size() - 1) {
                                sb.append(",");
                            }
                        }

                        var list = new ArrayList<Mono<Boolean>>();
                        var start = 0;

                        while (start < total) {
                            final Integer s = start;

                            list.add(getData(webClient, table, srcUpdateTime, destUpdateTime, s, limit, sb.toString()));

                            start += limit;
                        }

                        return Flux.concat(list).collectList().map(m -> {
                            result.setEndTime(LocalDateTime.now());

                            return result;
                        });
                    });

                }

                result.setEndTime(LocalDateTime.now());

                return Mono.just(result);
            });
        });
    }

    private WebClient getWebClient(String database) {
        return SpringContextHolder.getBean(database + "ReportServiceWebClient");
    }

    private String getDestUpdateTime(String database, String table) {
        var sb = new StringBuilder("SELECT update_time FROM ").append(database).append("_").append(table)
                .append(" ORDER BY update_time DESC LIMIT 1");

        return namedParameterJdbcTemplate.getJdbcTemplate().queryForObject(sb.toString(), String.class);
    }

    private Mono<String> getSrcUpdateTime(WebClient webClient, String table) {
        var sb = new StringBuilder("SELECT update_time FROM ").append(table)
                .append(" ORDER BY update_time DESC LIMIT 1");

        var querySqlParam = new QuerySqlParam();

        querySqlParam.setSql(sb.toString());

        return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                }).map(result -> result.get(0).get("update_time").toString());
    }

    private Mono<Long> getTotal(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime) {
        var sb = new StringBuilder("SELECT COUNT(id) AS count FROM ").append(table);

        if (StringUtils.isNotBlank(destUpdateTime)) {
            sb.append(" WHERE update_time >= '").append(destUpdateTime).append("' AND update_time <= '")
                    .append(srcUpdateTime).append("'");
        }

        var querySqlParam = new QuerySqlParam();

        querySqlParam.setSql(sb.toString());

        return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                }).map(result -> null == result || result.isEmpty() ? 0L
                        : Long.valueOf(result.get(0).get("count").toString()));
    }

    private Mono<Boolean> getData(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime,
            Integer start, Integer limit, String updateSql) {
        var sb = new StringBuilder("SELECT * FROM ").append(table).append(" WHERE update_time >= '")
                .append(destUpdateTime).append("' AND update_time <= '").append(srcUpdateTime)
                .append("' ORDER BY update_time").append(" LIMIT ").append(start).append(",").append(limit);

        var querySqlParam = new QuerySqlParam();

        querySqlParam.setSql(sb.toString());

        return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                }).map(data -> {
                    if (null != data && !data.isEmpty()) {
                        namedParameterJdbcTemplate.batchUpdate(updateSql, SqlParameterSourceUtils.createBatch(data));
                    }

                    return true;
                });
    }

    private Mono<List<Map<String, Object>>> getTableSchema(WebClient webClient, String table) {
        return webClient.get().uri("/tables/{table}/columns", table).retrieve()
                .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                });
    }
}
上一篇 下一篇

猜你喜欢

热点阅读