java全栈sharding-JDBCSpringBoot

基于Sharding-jdbc + 编排治理,实现实时改变分表规

2020-05-11  本文已影响0人  zhy0324

写在前面

上一篇文章(https://www.jianshu.com/p/ed7bc1e49b61)出现的问题,就是sharing-jdbc无法根据一个不固定的字段(停车场id)进行动态分表,因为actual-data-nodes是在项目启动的时候就加载好的,不支持动态修改。
还好这一切都是可以解决的。
那么这一篇文章,就是解决了actual-data-nodes动态修改问题。解决方案大致说明一下就是基于sharding-jdbc + sharding的服务编排治理+redis,实现了订单表根据停车场id动态分表,每增删停车场,在不重启项目的情况下动态的改变actual-data-nodes

思路

首先参考了这篇老哥的博文:https://blog.csdn.net/qq_32588349/article/details/99440985 给了我很大的启发
根据官方文档描述,shardingsphere提供了配置中心、注册中心的服务治理功能。并且有这句描述:

image.png
这应该就是我想要的东西,但是我确实不知道该怎么入手,就让公司的架构师给我处理了一下。然后最终得到了下面这个方案。在这里要感谢两位老哥!

大概的描述一下本demo的业务:

  1. sharing-jdbc : 引入基于java的配置包(不用starter包)
  2. 需要分的表为:订单表(t_order)
  3. 分表的依据字段:停车场id(car_park_id)
  4. 分库字段:不需要分库
  5. 初始化数据库要有一个默认表:t_order_defalut,这个表只是为了第一次启动项目,还没有停车场信息的时候,用来默认的,里面不会存任何数据
  6. redis hash存放<停车场id,停车场名称>
  7. redis zset存放 订单actual-data-nodes,score为当前时间的时间戳,方便获取最新的用来替换

配置步骤

sql脚本

/*
Navicat MySQL Data Transfer

Source Server         : 开发数据库 4.71
Source Server Version : 50730
Source Host           : 192.168.4.71:3307
Source Database       : sharding_carpark

Target Server Type    : MYSQL
Target Server Version : 50730
File Encoding         : 65001

Date: 2020-05-11 18:25:00
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for t_car_park
-- ----------------------------
DROP TABLE IF EXISTS `t_car_park`;
CREATE TABLE `t_car_park` (
  `id` varchar(64) NOT NULL,
  `name` varchar(100) DEFAULT NULL COMMENT '名称',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='停车场表';

-- ----------------------------
-- Table structure for t_order_default
-- ----------------------------
DROP TABLE IF EXISTS `t_order_default`;
CREATE TABLE `t_order_default` (
  `id` varchar(64) NOT NULL,
  `name` varchar(100) DEFAULT NULL COMMENT '名称',
  `car_park_id` varchar(64) DEFAULT NULL COMMENT '停车场id',
  `no` varchar(100) DEFAULT NULL COMMENT '订单号',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试分表';

pom文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!--mybatisplus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.1.tmp</version>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

        <!--druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.20</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-orchestration</artifactId>
            <version>4.0.0</version>
        </dependency>

重点关注:sharding-jdbc-core、sharding-jdbc-orchestration这两个包,这两个包是必备的。

application.yml

server:
  port: 8086
  tomcat:
    max-threads: 100
spring:
  druid:
    datasource:
      type: com.alibaba.druid.pool.DruidDataSource
      url: jdbc:mysql://192.168.4.71:3307/sharding_carPark?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
      driver-class-name: com.mysql.jdbc.Driver
      username: root
      password: 123456
      maxActive: 20
      initialSize: 5
      maxWait: 60000
      minIdle: 5
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      #是否缓存preparedStatement,也就是PSCache。在mysql下建议关闭。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。
      poolPreparedStatements: false
      #要启用PSCache,-1为关闭 必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true  可以把这个数值配置大一些,比如说100
      maxOpenPreparedStatements: -1
      #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
      filters: stat,wall,log4j2
      #通过connectProperties属性来打开mergeSql功能;慢SQL记录
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
      #合并多个DruidDataSource的监控数据
      useGlobalDataSourceStat: true
      loginUsername: druid
      loginPassword: druid
  redis:
    database: 1
    host: 192.168.4.71
    port: 6379
    password: 123456
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
logging:
  level:
    com.example.demo: debug

这里的配置文件比较常规

LocalRegistryCenter 本地注册中心

package com.example.demo.config.shardingconfig;

import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

public class LocalRegistryCenter implements RegistryCenter {
    public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
    
    private RegistryCenterConfiguration config;
    
    private Properties properties;
    /**
     * public 是为了在重置节点的时候减少去重新读配置
     */
    public static Map<String, String> values = new ConcurrentHashMap<>();
    
    @Override
    public void init(RegistryCenterConfiguration config) {
        this.config = config;
    }
    
    @Override
    public String get(String key) {
        return values.get(key);
    }
    
    @Override
    public String getDirectly(String key) {
        return values.get(key);
    }
    
    @Override
    public boolean isExisted(String key) {
        return values.containsKey(key);
    }
    
    @Override
    public List<String> getChildrenKeys(String key) {
        return null;
    }
    
    @Override
    public void persist(String key, String value) {
        values.put(key, value);
    }
    
    @Override
    public void update(String key, String value) {
        values.put(key, value);
    }
    
    @Override
    public void persistEphemeral(String key, String value) {
        values.put(key, value);
    }
    
    @Override
    public void watch(String key, DataChangedEventListener dataChangedEventListener) {
        if (null != dataChangedEventListener) {
            // 将数据改变的事件监听器缓存下来
            listeners.put(key, dataChangedEventListener);
        }
    }
    
    @Override
    public void close() {
        config = null;
    }
    
    @Override
    public void initLock(String key) {
        
    }
    
    @Override
    public boolean tryLock() {
        return false;
    }
    
    @Override
    public void tryRelease() {
        
    }
    
    @Override
    public String getType() {
        // 【关键点1】,留着文章后续引用
        return "shardingLocalRegisterCenter";
    }
    
    @Override
    public Properties getProperties() {
        return properties;
    }
    
    @Override
    public void setProperties(Properties properties) {
        this.properties = properties;
    }
}

下面这步很重要
在本地文件中添加注册中心

  1. 在resources文件夹下面新建Directory,名称为:META-INF
  2. 在META-INF继续创建名为service的Directory
  3. 添加file,名为org.apache.shardingsphere.orchestration.reg.api.RegistryCenter(注意这个是固定的),里面的内容是:com.example.demo.config.shardingconfig.LocalRegistryCenter(你本地注册中心类存放的全路径)
    像这样


    image.png

基于java的配置类

package com.example.demo.config.shardingconfig;

import com.alibaba.druid.filter.Filter;
import com.alibaba.druid.filter.logging.Slf4jLogFilter;
import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.example.demo.config.datasource.DataSourceProperties;
import com.example.demo.config.redis.RedisConfig;
import com.example.demo.config.redis.RedisPrefixEnum;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.StandardShardingStrategyConfiguration;
import org.apache.shardingsphere.orchestration.config.OrchestrationConfiguration;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
import org.apache.shardingsphere.shardingjdbc.orchestration.api.OrchestrationShardingDataSourceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.*;

/**
 * @title: ShardingRuleConfig
 * @projectName shardingJavaDemo
 * @description: TODO
 * @author zhy
 * @date 2020/5/910:23
 */
@Configuration
@AutoConfigureAfter({DataSourceProperties.class, RedisConfig.class})
public class ShardingRuleConfig {

    private String defaultDataSource = DatasourceEnum.DEFAULT.getValue();

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Autowired
    private DataSourceProperties properties;

    /**
      * shardingjdbc数据源
      * @param
      * @throws
      * @return javax.sql.DataSource
      * @author zhy
      * @date 2020/5/9 10:33
      */
    @Bean
    public DataSource dataSource() throws SQLException {
        // 配置真实数据源
        Map<String, DataSource> dataSourceMap = new HashMap<>();
        //多数据源配置
        //数据源1
        DruidDataSource dataSource0 = druidDataSource();
        dataSourceMap.put(defaultDataSource, dataSource0);
        //数据源2
//        DruidDataSource dataSource1 = createDb1();
//        dataSourceMap.put("ds1", dataSource1);

        // 配置分片规则
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        //订单表分片规则
        TableRuleConfiguration orderRuleConfig = orderRuleConfig();
        shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
        //可以继续用add添加分片规则
        //shardingRuleConfig.getTableRuleConfigs().add(orderRuleConfig);
        //多数据源一定要指定默认数据源,只有一个数据源就不需要
        //shardingRuleConfig.setDefaultDataSourceName("ds0");
        Properties p = new Properties();
        //打印sql语句,生产环境关闭
        p.setProperty("sql.show",Boolean.TRUE.toString());
        OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(
                "orchestration-sharding-data-source", new RegistryCenterConfiguration("shardingLocalRegisterCenter"),
                false);
        return OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, p,
                orchestrationConfig);
    }

    /**
     * 订单分片规则
     * @param
     * @throws
     * @return io.shardingjdbc.core.api.config.TableRuleConfiguration
     * @author zhy
     * @date 2020/5/7 10:28
     */
    private TableRuleConfiguration orderRuleConfig(){
        String logicTable = ShardingTableEnum.ORDER.getValue();
        String orderNodesByRedisCarPark = getActualDataNodesByCatalog(ShardingTableEnum.ORDER);
        //t_order_default 这张表是默认表,需要事先建好,防止首次启动报错
        String actualDataNodes = StringUtils.isEmpty(orderNodesByRedisCarPark) ? "ds0.t_order_default" : orderNodesByRedisCarPark;
        TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(logicTable,actualDataNodes);
        //设置分表策略
        tableRuleConfig.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("car_park_id",new CarParkShardingTableAlgorithm()));
        //根据时间将策略放进redis中,方便读取替换
        redisTemplate.opsForZSet().add(RedisPrefixEnum.SHARDING_RULE_ORDER.getValue(),actualDataNodes,new Date().getTime());
        return tableRuleConfig;
    }


    /**
      * 根据分表类型获取初始化actualDataNodes
      * @param logicTable
      * @throws
      * @return java.lang.String
      * @author zhy
      * @date 2020/5/11 14:52
      */
    public String getActualDataNodesByCatalog(ShardingTableEnum logicTable){
        String redisKey = RedisPrefixEnum.CAR_PARK_ID_CATALOG.getValue();
        //获取所有的停车场
        Set<Object> keys = redisTemplate.opsForHash().keys(redisKey);
        if (keys.isEmpty()){
            return null;
        }
        StringBuilder sb = new StringBuilder();
        keys.forEach(obj -> {
            sb.append(defaultDataSource).append(".").append(logicTable.getValue()).append("_").append(obj.toString()).append(",");
        });
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }

    /**
     * 获取druid数据库链接
     * @param
     * @throws
     * @return com.alibaba.druid.pool.DruidDataSource
     * @author zhy
     * @date 2020/5/7 10:29
     */
    private DruidDataSource druidDataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName(properties.getDriverClassName());
        dataSource.setUrl(properties.getUrl());
        dataSource.setUsername(properties.getUsername());
        dataSource.setPassword(properties.getPassword());
        dataSource.setInitialSize(properties.getInitialSize());
        dataSource.setMinIdle(properties.getMinIdle());
        dataSource.setMaxActive(properties.getMaxActive());
        dataSource.setMaxWait(properties.getMaxWait());
        dataSource.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRunsMillis());
        dataSource.setMinEvictableIdleTimeMillis(properties.getMinEvictableIdleTimeMillis());
        String validationQuery = properties.getValidationQuery();
        if (validationQuery != null && !"".equals(validationQuery)) {
            dataSource.setValidationQuery(validationQuery);
        }
        dataSource.setTestWhileIdle(properties.isTestWhileIdle());
        dataSource.setTestOnBorrow(properties.isTestOnBorrow());
        dataSource.setTestOnReturn(properties.isTestOnReturn());
        if (properties.isPoolPreparedStatements()) {
            dataSource.setMaxPoolPreparedStatementPerConnectionSize(properties.getMaxPoolPreparedStatementPerConnectionSize());
        }
        String connectionPropertiesStr = properties.getConnectionProperties();
        if (connectionPropertiesStr != null && !"".equals(connectionPropertiesStr)) {
            Properties connectProperties = new Properties();
            String[] propertiesList = connectionPropertiesStr.split(";");
            for (String propertiesTmp : propertiesList) {
                String[] obj = propertiesTmp.split("=");
                String key = obj[0];
                String value = obj[1];
                connectProperties.put(key, value);
            }
            dataSource.setConnectProperties(connectProperties);
        }
        dataSource.setUseGlobalDataSourceStat(properties.isUseGlobalDataSourceStat());
        WallConfig wallConfig = new WallConfig();
        wallConfig.setMultiStatementAllow(true);
        WallFilter wallFilter = new WallFilter();
        wallFilter.setConfig(wallConfig);
        //打开日志记录过滤器,可通过log4j2,记录sql   application.yml中配置【logging:config: classpath:logConfig/log4j2.xml】
        Slf4jLogFilter slf4jLogFilter = new Slf4jLogFilter();
        slf4jLogFilter.setStatementCreateAfterLogEnabled(false);
        slf4jLogFilter.setStatementCloseAfterLogEnabled(false);
        slf4jLogFilter.setResultSetOpenAfterLogEnabled(false);
        slf4jLogFilter.setResultSetCloseAfterLogEnabled(false);
        List<Filter> filters = new ArrayList<>();
        filters.add(wallFilter);
        filters.add(new StatFilter());
        filters.add(slf4jLogFilter);

        dataSource.setProxyFilters(filters);
        return dataSource;
    }
}

动态替换的shardingService

package com.example.demo.config.shardingconfig;

import com.example.demo.config.datasource.DataSourceProperties;
import com.example.demo.config.redis.RedisConfig;
import com.example.demo.config.redis.RedisPrefixEnum;
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Set;

/**
 * @title: ShardingService
 * @projectName shardingJavaDemo
 * @description: TODO
 * @author zhy
 * @date 2020/5/1115:08
 */
@Component
@AutoConfigureAfter({RedisConfig.class})
public class ShardingService {

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Autowired
    private ShardingRuleConfig shardingRuleConfig;

    /**
      * 替换sharding里的分表规则ActualDataNodes的值
      * @param oldRule
      * @param newRule
      * @throws
      * @return void
      * @author zhy
      * @date 2020/5/11 15:12
      */
    public void replaceActualDataNodes(String oldRule,String newRule){
        // 获取已有的配置
        String rules = LocalRegistryCenter.values
                .get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
        // 修改规则
        String rule = rules.replace(oldRule, newRule);
        LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema")
                .onChange(new DataChangedEvent(
                        "/orchestration-sharding-data-source/config/schema/logic_db/rule",
                        rule, DataChangedEvent.ChangedType.UPDATED));
        LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);
    }

    /**
      * 获取当前的分表规则
      * @param shardingTableEnum
      * @throws
      * @return java.lang.String
      * @author zhy
      * @date 2020/5/11 15:56
      */
    public String getActualDataNodesInRedis(ShardingTableEnum shardingTableEnum){
        String redisKey = RedisPrefixEnum.SHARDING_RULE_ORDER.getValue();
        //倒序获取一条最新的纪录
        Set<Object> objects = redisTemplate.opsForZSet().reverseRange(redisKey, 0, 1);
        return new ArrayList<>(objects).get(0).toString();
    }

    /**
      * 根据redis中存储的停车场id获取分表规则
      * @param shardingTableEnum
      * @throws
      * @return java.lang.String
      * @author zhy
      * @date 2020/5/11 16:09
      */
    public String getActualDataNodesByCatalog(ShardingTableEnum shardingTableEnum){
        return shardingRuleConfig.getActualDataNodesByCatalog(shardingTableEnum);
    }
}

最后贴上源码

https://github.com/zhyhuayong/shardingJavaDemo
大家可以参考源码,要测试的同学记得修改自己的数据库配置哦

本人为(weixin)恭(gong)祝(zhong)号:一吱小确幸。欢迎大家关注

上一篇下一篇

猜你喜欢

热点阅读