创建Spring项目使用DataX进行定时数据库备份

2019-05-08  本文已影响0人  me0w

​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

在DataX的官网介绍文档中,其使用十分简单。下载安装包之后,使用python datax.py [demo.json]命令即可进行数据同步。虽然其启动命令使用的是python脚本,但是看其安装包之后发现,只有启动的部分配置环境变量使用的是python,其余具体源码都是使用的java。既然底层是用java写的,所以萌发了使用Spring来时备份数据的想法。

一、DataX3.0基本结构

在之前的一篇博文中,在Intell Idea中启动了DataX,证明了使用Java项目引用DataX是可行的。下面简单分析一下DataX的源码。

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

datax
总的来说DataX项目由FrameWork(core包、common包和transformer包)以及ReadPlugin和WritePlugin组成,对于DataX所支持的数据库,都有一对XXXreader和XXXwriter包。下面就是从DataX的github项目clone下来的源码包的结构目录。
--DataX
----common
----core
----transformer
----XXXreader
----XXXwriter
----...

二、基本框架模块代码解析

入口方法是core包下面的 com.alibaba.datax.core.Engine.main(String[] args)方法,直接调用了Engine类的entry()方法

2.1 Engine启动类代码解析
public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");
        String jobPath = cl.getOptionValue("job");
        Configuration configuration = ConfigParser.parse(jobPath);
        ...
        configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
        ...
        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        engine.start(configuration);
}

entry()方法主要作用:
1、 获取项目启动参数:job、jobid和mode;
2、 使用ConfigParser工具类从jobpath即传输任务的json文件获取configuration,并对jobId以及mode进行了验证,随后将configuration作为入参,调用Engine类的start()方法。

public void start(Configuration allConf) {
        ColumnCast.bind(allConf);
        LoadUtil.bind(allConf);

        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
         ...
        if (isJob) {
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        }
        ...
        container.start();
    }

start()方法主要作用:
1、初始化一些配置,检查任务的model;
2、根据Configuration的内容创建JobContainer,真正启动任务是调用JobContainer的start()方法。

2.2 JobContainer类解析
   public void start() {
            this.userConf = this.configuration.clone();
            this.preHandle();
            this.init();
            this.prepare();
            this.totalStage = this.split();
            this.schedule();
            this.post();
            this.postHandle();
            this.invokeHooks();
        }

start()方法可以看出datax进行数据备份的一系列流程,从预处理,初始化,到实际调用对应的reader和writer的插件,有兴趣的读者可以自行查看源代码。

三、打包DataX项目

解析了DataX的源代码之后,我们已经知道了从哪里可以调用DataX的备份功能。我们可以将从github上clone源码到本地,使用maven将项目打包放在本地仓库,有条件的话可以上传到私服。

在core包的pom.xml文件里面加上插件如下:

            <!-- 打包源码 -->
            <plugin>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

使用mvn clean install命令将包安装到本地仓库,同理可以打包自己需要的reader和writer包,比如我想将Oracle的表导入到MySql数据库,那么我就需要打包oraclereader和mysqlwriter这两个包。

四、创建boot项目,使用maven包

依赖都已经准备好,下一步就可以来创建boot工程了,除了Springboot项目所需要的相关依赖以外,pom文件里面还要加上DataX的相关依赖如下:

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlreader</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlwriter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

从第二节我们知道,入口是Engine类的entry()方法调用自身的start()方法,但是entry()方法的入参比较复杂,再加上在实际应用中,Datax的job.json任务文件里面配置的数据库的密码是加密的,所以我自定义了一个DataxUtil类,然后去调用Engine类的start()方法。

五、多线程时的错误

在使用定时任务的过程中,我发现当两个job在同一时刻开始,并且reader和writer不同的时候,会出现找不到对应的reader或者writer的异常。比如说 job1和job2都在凌晨01点整执行,job1是从Oracle备份数据到MySQL,而job2是从MySQL备份数据到MySQL,就会报找不到oraclereader plugin或者mysqlreader plugin的错误。如果两个job的reader和writer分别相同,比如说都是从Oracle备份到MySQL,或者都是是从MySQL备份到Oracle则可以正常运行。
经过一番查找,发现问题就出现在LoadUtil这个类。在2.2节介绍的JobContainer类的start()方法里面调用了preHandle()方法,preHandle()方法里面使用了LoadUtil来加载对应的reader和writer,而LoadUtil的loadJobPlugin()方法线程不安全,从而导致了前一个job加载到一半的reader或者writer会被其他线程篡改,导致前一个job的reader或者writer不可用。

AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);

解决方法有两个:
1、强行将定时任务改成串行的,前一个job结束之后才调用下一个job;
2、修改LoadUtil类,保证loadJobPlugin()方法的线程安全。

以下是LoadUtil类需要修改的部分:

private static Configuration pluginRegisterCenter;
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();

替换成:

/**
* 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别
* 具体pluginName,故使用pluginType.pluginName作为key放置在该map中
*/
private static ThreadLocal<Configuration> pluginRegisterCenter = new InheritableThreadLocal<Configuration>();

/** jarLoader的缓冲 */
private final static Map<String, JarLoader> jarLoaderCenter = new ConcurrentHashMap<String, JarLoader>();
    public static void bind(final Configuration pluginConfigs) {
        pluginRegisterCenter.set(pluginConfigs);
    }
private static Configuration getPluginConf(PluginType pluginType,
                                               String pluginName) {
        Configuration pluginConf = pluginRegisterCenter.get()
                .getConfiguration(generatePluginKey(pluginType, pluginName));

        if (null == pluginConf) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
                    String.format("DataX不能找到插件[%s]的配置.",
                            pluginName));
        }

        return pluginConf;
    }

至此就将LoadUtil改成了线程安全的类。

不过估计在DataX设计之初,就是面向运维人员,所以才会使用python命令以使其运行对应job。故其可能在多线程支持方面可能会有欠缺,所以在自行使用的时候,最好是不要使用多线程。一是由于源码本身就不支持多线程,即使对目前暴露出的问题修改了部分源码,由于没有深入阅读源码,可能会在将来遇到其他方面的问题;二是复制效率问题,多个线程一起传输数据时,如果都是大量数据的传输,可能会对内存、IO、还有网络造成争用,造成其他问题。

六、spring项目代码

boot项目代码待上传。

上一篇 下一篇

猜你喜欢

热点阅读