HiveServer2多数据源对象池实战

2020-05-17  本文已影响0人  _Kantin

需求背景

多数据源源对象池实战

  1. 在maven/grade中引入commons-pool2
#maven
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.2</version>
</dependency>

#grade
dependencies{
    compile group: 'org.apache.commons', name: 'commons-pool2', version: '2.6.2'
}
  1. 创建对象池配置文件
    public class PoolProperties {
         //最大空空闲
          private int maxIdle = 5;
          //最大连接数
          private int maxTotal = 50;
           // 最小空闲
          private int minIdle = 2;
          // 初始化连接数
          private int initialSize = 5;
          //最大等待时间,设置为30秒
          private int maxWaitMillis = 30000;
   
          //省略get/set方法
    }
  1. 创建不用数据源的工厂类
#A集群的连接池对象
public class AClusterObjectPool extends GenericObjectPool<Connection> {
    public AClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
        super(factory, config);
    }
}

#B集群的连接池对象
public class BClusterObjectPool extends GenericObjectPool<Connection> {
    public BClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
        super(factory, config);
    }
}
  1. 创建connection对象类
public class PoolableObjectFactory extends BasePooledObjectFactory<Connection> {

    private final String driverName = "org.apache.hive.jdbc.HiveDriver";
    private  String url;
    private  String username;
    private  String password;


    public PoolableObjectFactory(String url, String username, String password) {
        this.url = url;
        this.username = username;
        this.password = password;
    }
    /**
     * 创建一个对象实例
     */
    @Override
    public Connection create() throws Exception {
        Class.forName(driverName);
        java.util.Properties info = new java.util.Properties();
        info.put("user", username);
        info.put("password", password);
        info.put("hive.metastore.client.socket.timeout", 3000);
        Connection con = DriverManager.getConnection(url, info);
        return con;
    }
    /**
     * 包裹创建的对象实例,返回一个pooledobject
     */
    @Override
    public PooledObject<Connection> wrap(Connection obj) {
        return new DefaultPooledObject<Connection>(obj);
    }
}
  1. 创建池对象的工具类
@Configuration
@EnableConfigurationProperties(PoolProperties.class)
public class PooledObjectUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(PooledObjectUtil.class);

    @Autowired
    private ClusterService clusterService;

    @Value("${a.cluster.name}")
    private String aClusterName;

    @Value("${b.cluster.name}")
    private String bClusterName;

    private final PoolProperties poolProperties;

    AClusterObjectPool  aPool;

    BClusterObjectPool  bPool;

    @Autowired
    public PooledObjectUtil(PoolProperties poolProperties) {
        this.poolProperties = poolProperties;
    }

    private GenericObjectPoolConfig generatePoolConfig() {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        // 最大空闲数
        poolConfig.setMaxIdle(poolProperties.getMaxIdle());
        // 最小空闲数, 池中只有一个空闲对象的时候,池会在创建一个对象,并借出一个对象,从而保证池中最小空闲数为1
        poolConfig.setMinIdle(poolProperties.getMinIdle());
        // 最大池对象总数
        poolConfig.setMaxTotal(poolProperties.getMaxTotal());
        // 在获取对象的时候检查有效性, 默认false
        poolConfig.setTestOnBorrow(true);
        // 在归还对象的时候检查有效性, 默认false
        poolConfig.setTestOnReturn(false);
        // 在空闲时检查有效性, 默认false
        poolConfig.setTestWhileIdle(false);
        // 最大等待时间, 默认的值为-1,表示无限等待。
        poolConfig.setMaxWaitMillis(poolProperties.getMaxWaitMillis());
        // 是否启用后进先出, 默认true
        poolConfig.setLifo(true);
        // MXBean already registered的错误
        poolConfig.setJmxEnabled(false);
        return poolConfig;
    }

    @Bean
    public AClusterObjectPool  aPooledObjectFactory() {
        Cluster cluster = clusterService.findByClusterName(aClusterName);
        GenericObjectPoolConfig poolConfig = generatePoolConfig();
        PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
        aPool = new AClusterObjectPool(factory, poolConfig);
        initPool(aPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
        LOGGER.info("Successful build aCluster connection,size is {}",aPool .getNumActive());
        return aPool ;
    }

    @Bean
    public BClusterObjectPool  bPooledObjectFactory() {
        Cluster cluster = clusterService.findByClusterName(bClusterName);
        GenericObjectPoolConfig poolConfig = generatePoolConfig();
        PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
        bPool = new BClusterObjectPool(factory, poolConfig);
        initPool(bPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
        LOGGER.info("Successful build b cluster connection,size is {}",bPool .getNumActive());
        return bPool ;
    }
    /**
     * 预先加载connection对象到对象池中
     * @param initialSize 初始化连接数
     * @param maxIdle     最大空闲连接数
     */
    private void initPool(AClusterObjectPool pool, int initialSize, int maxIdle) {
        if (initialSize <= 0) {
            return;
        }
        int size = Math.min(initialSize, maxIdle);
        for (int i = 0; i < size; i++) {
            try {
                pool.addObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * 预先加载connection对象到对象池中
     * @param initialSize 初始化连接数
     * @param maxIdle     最大空闲连接数
     */
    private void initPool(BClusterObjectPool pool, int initialSize, int maxIdle) {
        if (initialSize <= 0) {
            return;
        }
        int size = Math.min(initialSize, maxIdle);
        for (int i = 0; i < size; i++) {
            try {
                pool.addObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

在项目中如何使用

    @Value(cluster.name.a)
    private String aClusterName;
    @Autowired
    private AClusterObjectPool aClusterObjectPool;
    @Autowired
    private BClusterObjectPool bClusterObjectPool;

    #从对象池中借用对象调用borrow方法
    private Connection borrowObject(String clusterName) throws Exception {
        Connection connection;
        if (aClusterName.equals(clusterName)) {
            connection = aClusterObjectPool.borrowObject();
        } else {
            connection = bClusterObjectPool.borrowObject();
        }
        LOGGER.info("Borrow hive client object success,clusterName is {}",clusterName);
        return connection;
    }
    #向对象池归还对象调用return方法
    private void returnObject(Connection connection,String clusterName){
        if (aClusterObjectPool.equals(clusterName)) {
            aClusterObjectPool.returnObject(connection);
        } else {
            bClusterObjectPool.returnObject(connection);
        }
        LOGGER.info("Return hive client object success,clusterName is {}",clusterName);
    }

实际效果

遇到的一些问题

hive.server2.session.check.interval = 1 hour
hive.server2.idle.operation.timeout = 1 day
hive.server2.idle.session.timeout = 3 days

参考资料

上一篇 下一篇

猜你喜欢

热点阅读