JAVA藏兵谷

阿里开源数据同步工具——DataX源码揭秘

2019-09-29  本文已影响0人  来杯熊酱不加糖

1.前言

datax是阿里出品,最初是为了解决淘宝数据交换的问题,据说淘宝有30%的数据交换是通过datax完成的。

2.介绍

DataX 是一个开源异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Data目前已经支持常用的插件体系,主流的RDBMS,NOSQL,大数据计算系统都已接入。

3.源码解析

从github上clone源码到本地,源码地址:https://github.com/alibaba/DataX

DataX源码由Framework(core包,common包和transformer包)及 plugin(ReadPlugin和WritePlugin)组成。

Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

3.1入口类Engine

  \bullet entry()方法:

  主要用于获取项目启动参数:job,jobid,mode;

  注意:mode分为单机模式和分布式模式,这里指定为standalone 单机模式。

            jobid默认值为-1,只有在standalone模式下使用,非 standalone 模式必须提供有效的jobid值。

public static void entry(String jobPath)throws Throwable {

        String jobIdString ="-1";

// 指定单机还是分布式模式运行

        RUNTIME_MODE ="standalone";

Configuration configuration = ConfigParser.parse(jobPath);

......

configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

......

ConfigurationValidate.doValidate(configuration);

Engine engine =new Engine();

engine.start(configuration);

}

\bullet start()方法:

主要用于初始化配置,检查job的model信息。

public void start(Configuration allConf) {

// 绑定column转换信息

    ColumnCast.bind(allConf);

/**

* 初始化PluginLoader,可以获取各种插件配置

*/

    LoadUtil.bind(allConf);

......

Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);

//初始化PerfTrace

    PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);

perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);

container.start();

}

3.2 jobContainer容器

job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报。

\bullet start()方法:

jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、post以及destroy和statistics。

public void start() {

LOG.info("DataX jobContainer starts job.");

this.preHandle();

this.init();

this.prepare();

this.totalStage =this.split();

this.schedule();

this.post();

this.postHandle();

this.invokeHooks();

}

\bullet init()方法:reader和writer的初始化

private void init() {

......

JobPluginCollector jobPluginCollector =new DefaultJobPluginCollector(

this.getContainerCommunicator());

//必须先Reader ,后Writer

this.jobReader =this.initJobReader(jobPluginCollector);

this.jobWriter =this.initJobWriter(jobPluginCollector);

}

\bullet schedule()方法:

任务调度器schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中。

private void schedule() {

/**

* 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务

*/

    List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,

this.needChannelNumber, channelsPerTaskGroup);

......

AbstractScheduler scheduler;

scheduler = initStandaloneScheduler(this.configuration);

scheduler.schedule(taskGroupConfigs);

......

/** * 检查任务执行情况 */this.checkLimit();

this.checkLimit();

}

\bullet post()方法: 启动各类数据库插件的读写任务。

private void post() {

this.postJobWriter();

this.postJobReader();

}

4. Spring Boot集成DataX

在springboot项目上,通过POM文件引入datax相关jar包

<dependency>

<groupId>com.alibaba.datax</groupId>

<artifactId>datax-core</artifactId>

<version>0.0.1-SNAPSHOT</version>

</dependency>

同时需要引入数据源读取和写入相关的Reader/Writer插件

<dependency>

<groupId>com.oracle</groupId>

<artifactId>ojdbc6</artifactId>

<version>11.2.0.3</version>

<scope>system</scope>

<systemPath>${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar</systemPath>

</dependency>

<dependency>

<groupId>com.microsoft.sqlserver</groupId>

<artifactId>sqljdbc4</artifactId>

<version>4.0</version>

<scope>system</scope>

<systemPath>${basedir}/src/main/lib/sqljdbc4-4.0.jar</systemPath>

</dependency>

这里引入mysql 及oracle数据源对应的插件

上一篇下一篇

猜你喜欢

热点阅读