DataX二次开发

2020-10-22  本文已影响0人  诺之林

代码

cd /Users/kevin/Workspace

git clone git@git.nuozhilin.site:yuanlin/datax.git

git clone git@git.nuozhilin.site:yuanlin/datax-source.git

配置

配置基于IntelliJ IDEA

启动调试Debug "datax"

框架

(*) Main Thread
全局 Engine main()
入口 Engine engine()
启动 JobContainer start()
切分 JobContainer split() => channel个数 = TaskGroup个数
调度 AbstractScheduler schedule()
执行 ProcessInnerScheduler startAllTaskGroup()
    (*) TaskGroup Thread => 线程个数 = TaskGroup个数
    执行 TaskGroupContainer start()
    读取 ReaderRunner readerRunner
        (*) Reader Thread
    通道 channel => TaskGroup线程内存对空间
        com.alibaba.datax.core.transport.channel.memory.MemoryChannel
        流控 ArrayBlockingQueue
    写入 WriterRunner WriterRunner
        (*) Writer Thread
    报告 TaskGroupContainer reportTaskGroupCommunication()

高性能的两个方面

插件

public class MongoDBReader extends Reader {
    public static class Job extends Reader.Job {
        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            this.mongoClient = MongoUtil.initMongoClient(originalConfig);
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return CollectionSplitUtil.doSplit(originalConfig, adviceNumber, mongoClient);
        }
    }
    public static class Task extends Reader.Task {
        private MongoClient mongoClient;

        @Override
        public void init() {
            mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            MongoDatabase db = mongoClient.getDatabase(database);
            MongoCollection col = db.getCollection(this.collection);
            dbCursor = col.find(filter).iterator();
            while (dbCursor.hasNext()) {
                Document item = dbCursor.next();
            }

            recordSender.sendToWriter(record);
        }
    }
}
public class MongoDBWriter extends Writer {
    public static class Job extends Writer.Job {
        @Override
        public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> configList = new ArrayList<Configuration>();
            for(int i = 0; i < mandatoryNumber; i++) {
                configList.add(this.originalConfig.clone());
            }
            return configList;
        }

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
        }
    }

    public static class Task extends Writer.Task {
        @Override
        public void init() {
            this.writerSliceConfig = this.getPluginJobConf();
            this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
            this.batchSize = BATCH_SIZE;
        }

        @Override
        public void startWrite(RecordReceiver lineReceiver) {
            MongoDatabase db = mongoClient.getDatabase(database);
            MongoCollection<BasicDBObject> col = db.getCollection(this.collection, BasicDBObject.class);
            List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
            Record record = null;
            while((record = lineReceiver.getFromReader()) != null) {
                writerBuffer.add(record);
                if(writerBuffer.size() >= this.batchSize) {
                    doBatchInsert(col,writerBuffer, mongodbColumnMeta);
                    writerBuffer.clear();
                }
            }
            if(!writerBuffer.isEmpty()) {
                doBatchInsert(col,writerBuffer, mongodbColumnMeta);
                writerBuffer.clear();
            }
        }
    }
}

参考

上一篇 下一篇

猜你喜欢

热点阅读