GIS后端

How it works(10) NodeODM源码阅读(A)

2019-03-15  本文已影响2人  默而识之者

引入

OpenDroneMap(ODM)是一款非常强大的无人机成果处理软件,可以直接将无人机拍摄的照片处理成正摄影像甚至进行三维建模.ODM本身是基于python的OpenSFM编写的命令行工具,为了方便实际使用,NodeODM出现了.
NodeODM是Nodejs编写的一套带有可视化界面的API,实现了通过接口上传图片,修改配置,获取进度等常用功能.因此我们一般用的都是NodeODM,很少会直接调用ODM.
NodeODM对外是将命令行封装为接口,内部主要实现了以下的功能:

代码架构

惯例是madge生成的结构图

文件架构

因为NodeODM是一个前后端一体项目,文件组织比较复杂,也需要说一下:


从架构图可知,整个NodeODM围绕一大一下两个功能展开:

鉴权

NodeODM没有用户体系,因此采用的是静态token鉴权.因为整个NodeODM是基于express框架搭建的接口,所以所谓的鉴权,就是在请求中最先处理的中间件:

const auth = require('./libs/auth/factory').fromConfig(config);
const authCheck = auth.getMiddleware();
app.get('/task/:uuid/info', authCheck, getTaskFromUuid, (req, res) => {
    res.json(req.task.getInfo());
});

鉴权中间件采用工厂模式:

const NoTokenRequiredAuth = require('./NoTokenRequiredAuth');
const SimpleTokenAuth = require('./SimpleTokenAuth');

module.exports = {
    fromConfig: function(config){
        if (config.token){
            return new SimpleTokenAuth(config.token);
        }else{
            return new NoTokenRequiredAuth();
        }
    }
}

鉴权模型有两种:

这两者都基于一个抽象的鉴权基类:

module.exports = class TokenAuthBase{
    initialize(cb){
        cb();
    }
    cleanup(cb){
        cb();
    }
    //虚函数,需要重写
    validateToken(token, cb){ cb(new Error("Not implemented"), false); }
    //生成中间件
    getMiddleware(){
        return (req, res, next) => {
            this.validateToken(req.query.token, (err, valid) => {
                if (valid) next();
                else{
                    res.json({ error: "Invalid authentication token: " + err.message });
                }
            });
        };
    }
};

两种鉴权模式继承了该基类

//简单鉴权
module.exports = class SimpleTokenAuth extends TokenAuthBase{
    constructor(token){
        super(token);
        this.token = token;
    }

    validateToken(token, cb){ 
        if (this.token === token){
            return cb(null, true);
        }else{
            cb(new Error("token does not match."), false);
        }
    }
};

//无鉴权
module.exports = class NoTokenRequiredAuth extends TokenAuthBase{
    // 永远返回true
    validateToken(token, cb){ cb(null, true); }
};

虽然这是一种聊胜于无的鉴权方式,但它定义了一个很好的模式:如果有第三种复杂/动态的鉴权方式,采用工厂模式可以保证对上层的影响最小.继承基类的方式能保证对整个鉴权模块的影响最小.

任务

相比可有可无的鉴权,任务是整个NodeODM最重要的部分.任务相关由三大部分组成:


任务的运行流程也与这三部分都有关系:


任务运行前

由上面的流程图可知,任务的创建分4个小步骤:

构建环境

初始化环境发生在NodeODM启动之时,只会初始化一次,主要目的是准备相应的文件夹,清理无用的旧任务文件夹等:

constructor(done){
    this.tasks = {};
    //正在运行的任务
    this.runningQueue = [];
    //顺序执行下列清理
    async.series([
        cb => this.restoreTaskListFromDump(cb),
        cb => this.removeOldTasks(cb),
        cb => this.removeOrphanedDirectories(cb),
        cb => this.removeStaleUploads(cb),// 移除没有对应任务的文件夹
        cb => {
            this.processNextTask();//自动开始状态为排队的任务
            cb();
        },
        cb => {
            // 建立定时任务,每小时都进行清除,
            schedule.scheduleJob('0 * * * *', () => {
                this.removeOldTasks();
                this.dumpTaskList();//将配置文件存储在硬盘上
                this.removeStaleUploads();
            });
            cb();
        }
    ], done);
}

//加载全部已存在的任务
restoreTaskListFromDump(done){
    fs.readFile(TASKS_DUMP_FILE, (err, data) => {
        if (!err){
            let tasks;
            try{
                tasks = JSON.parse(data.toString());
            }catch(e){
                done(new Error(`It looks like the ${TASKS_DUMP_FILE} is corrupted (${e.message}).`));
                return;
            }
      //重新序列化每一个任务
            async.each(tasks, (taskJson, done) => {
                Task.CreateFromSerialized(taskJson, (err, task) => {
                    if (err) done(err);
                    else{
                      //将任务添加回任务列表
                        this.tasks[task.uuid] = task;
                        done();
                    }
                });
            }, err => {
                logger.info(`Initialized ${tasks.length} tasks`);
                if (done !== undefined) done();
            });
        }else{
            logger.info("No tasks dump found");
            if (done !== undefined) done();
        }
    });
}

//移除过早的任务
removeOldTasks(done){
    let list = [];
    let now = new Date().getTime();
    logger.debug("Checking for old tasks to be removed...");

    for (let uuid in this.tasks){
        let task = this.tasks[uuid];
    //任务创建时间
        let dateFinished = task.dateCreated;
        if (task.processingTime > 0) dateFinished += task.processingTime;
    //收集所有过早的已失败已完成的已或取消的任务(允许存在正在运行或正在排队的过早任务)
        if ([statusCodes.FAILED,
            statusCodes.COMPLETED,
            statusCodes.CANCELED].indexOf(task.status.code) !== -1 &&
            now - dateFinished > CLEANUP_TASKS_IF_OLDER_THAN){
            list.push(task.uuid);
        }
    }
  //为什么不在上步直接remove?因为remove是异步的,上面的是同步方法,调用的话无法保证已完成
    async.eachSeries(list, (uuid, cb) => {
        logger.info(`Cleaning up old task ${uuid}`);
        this.remove(uuid, cb);
    }, done);
}

初始化配置

从接口定义可以看出,初始化配置总共经历了简单鉴权,配置UUID到初始化任务:

app.post('/task/new/init', authCheck, taskNew.assignUUID, taskNew.handleInit);

UUID是追踪每一个任务的唯一标识,生成了UUID,就可以安心的构建该任务的不重名工作区文件夹:

assignUUID: (req, res, next) => {
    //用户可以自行指定uuid
    if (req.get('set-uuid')){
        const userUuid = req.get('set-uuid');
        // 验证用户给定的UUID是否合法并且没有重名
        if (/^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(userUuid) && !TaskManager.singleton().find(userUuid)){
            req.id = userUuid;
            next();
        }else{
            res.json({error: `Invalid set-uuid: ${userUuid}`})
        }
    }else{
        //生成一个UUID
        req.id = uuidv4();
        next();
    }
}
handleInit: (req, res) => {
        req.body = req.body || {};
        //在临时文件夹下生成工作区
        const srcPath = path.join("tmp", req.id);
        const bodyFile = path.join(srcPath, "body.json");
        // 出错的时候删掉工作区
        const die = (error) => {
            res.json({error});
            removeDirectory(srcPath);
        };
    //顺序执行下列任务
        async.series([
            cb => {
                // 检测指定的配置信息是否合法
                if (req.body && req.body.options){
                    odmInfo.filterOptions(req.body.options, err => {
                        if (err) cb(err);
                        else cb();
                    });
                }else cb();
            },
            cb => {
              //检测要生成的文件夹是否不存在
                fs.stat(srcPath, (err, stat) => {
                    if (err && err.code === 'ENOENT') cb();
                    else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
                });
            },
            //创建该文件夹
            cb => fs.mkdir(srcPath, undefined, cb),
            //将配置序列化,也存入该文件
            cb => {
                fs.writeFile(bodyFile, JSON.stringify(req.body), {encoding: 'utf8'}, cb);
            },
            //返回生成的UUID供客户端后面使用
            cb => {
                res.json({uuid: req.id});
                cb();
            }
        ], err => {
            if (err) die(err.message);
        });
   },

存储图片

从接口定义可以看出,存储图片总共经历了简单鉴权,验证UUID,图片上传,返回结果几步:

app.post('/task/new/upload/:uuid', authCheck, taskNew.getUUID, taskNew.uploadImages, taskNew.handleUpload);

上传图片前,首先验证的是UUID是否合法,确定要向哪个任务上传图片:

getUUID: (req, res, next) => {
    req.id = req.params.uuid;
    if (!req.id) res.json({error: `Invalid uuid (not set)`});
    //验证是否存在uuid对应的文件夹下有配置文件存在
    const srcPath = path.join("tmp", req.id);
    const bodyFile = path.join(srcPath, "body.json");
    fs.access(bodyFile, fs.F_OK, err => {
        if (err) res.json({error: `Invalid uuid (not found)`});
        else next();
    });
},

上传图片的处理采用了express官方的multer模块:

//接收表单中的"images"字段上传的图片
uploadImages: upload.array("images"),
const upload = multer({
    storage: multer.diskStorage({
        destination: (req, file, cb) => {
            let dstPath = path.join("tmp", req.id);
            fs.exists(dstPath, exists => {
                if (!exists) {
                    fs.mkdir(dstPath, undefined, () => {
                        cb(null, dstPath);
                    });
                } else {
                    cb(null, dstPath);
                }
            });
        },
        filename: (req, file, cb) => {
            let filename = utils.sanitize(file.originalname);
            //因为body.json是内置的配置存储文件,避免在此被替换掉
            if (filename === "body.json") filename = "_body.json";
            cb(null, filename);
        }
    })
});

任务开始运行

进入任务最重要的环节:执行.从接口定义可以看出,任务执行总共经历了简单鉴权,验证UUID,执行前处理,任务执行几步:

app.post('/task/new/commit/:uuid', authCheck, taskNew.getUUID, taskNew.handleCommit, taskNew.createTask);

在执行任务前,需要为执行准备参数,即待处理的任务文件和任务参数.从代码可以看出,NodeODM的参数都会固化在本地文件内,用的时候序列化,修改后再存入本地,而不是一直维持在内存里:

handleCommit: (req, res, next) => {
        const srcPath = path.join("tmp", req.id);
        const bodyFile = path.join(srcPath, "body.json");
        //顺序执行下列动作
        async.series([
            //先读取配置文件(文件夹下有全部图片和配置文件)
            cb => {
                fs.readFile(bodyFile, 'utf8', (err, data) => {
                    if (err) cb(err);
                    else {
                        try {
                            //序列化配置文件后删除它
                            const body = JSON.parse(data);
                            fs.unlink(bodyFile, err => {
                                if (err) cb(err);
                                else cb(null, body);
                            });
                        } catch (e) {
                            cb("Malformed body.json");
                        }
                    }
                });
            },
            //读取所有图片(现在只剩图片了)
            cb => fs.readdir(srcPath, cb),
        ], (err, [body, files]) => {
            if (err) res.json({
                error: err.message
            });
            else {
                //将配置和文件列表挂载到request对象上,供后面的中间件使用
                req.body = body;
                req.files = files;
                if (req.files.length === 0) {
                    req.error = "Need at least 1 file.";
                }
                next();
            }
        });
    },

一切准备妥当,就可以开启任务了:

createTask: (req, res) => {
    req.setTimeout(1000 * 60 * 20);
    const srcPath = path.join("tmp", req.id);
    // 遇到致命错误时,就返回错误信息,并尝试删除文件
    const die = (error) => {
        res.json({error});
        removeDirectory(srcPath);
    };
    if (req.error !== undefined){
        die(req.error);
    }else{
      //正式存储成果的文件夹
        let destPath = path.join(Directories.data, req.id);
        let destImagesPath = path.join(destPath, "images");
        let destGcpPath = path.join(destPath, "gcp");
        async.series([
          //验证配置参数是否合法
            cb => {
                odmInfo.filterOptions(req.body.options, (err, options) => {
                    if (err) cb(err);
                    else {
                        req.body.options = options;
                        cb(null);
                    }
                });
            },
            //保证最终成果文件夹不存在
            cb => {
                if (req.files && req.files.length > 0) {
                    fs.stat(destPath, (err, stat) => {
                        if (err && err.code === 'ENOENT') cb();
                        else cb(new Error(`Directory exists (should not have happened: ${err.code})`));
                    });
                } else {
                    cb();
                }
            },
            //如果提供的是url,则直接下载该zip文件
            cb => {
                if (req.body.zipurl) {
                    let archive = "zipurl.zip";
                    upload.storage.getDestination(req, archive, (err, dstPath) => {
                        if (err) cb(err);
                        else{
                         //下载
             const download = function(uri, filename, callback) {
                request.head(uri, function(err, res, body) {
                    if (err) callback(err);
                    else request(uri).pipe(fs.createWriteStream(filename)).on('close', callback);
                });
              };
             //getDestination函数返回的是当前任务的临时文件夹
             //zip包最终会被下载到该文件夹
                          let archiveDestPath = path.join(dstPath, archive);
                          download(req.body.zipurl, archiveDestPath, cb);
                        }
                    });
                } else {
                    cb();
                }
            },
            //将所有文件从临时目录移到最终目录去
            cb => fs.mkdir(destPath, undefined, cb),
            cb => fs.mkdir(destGcpPath, undefined, cb),
            cb => mv(srcPath, destImagesPath, cb),
            cb => {
                // 解压存在的zip文件
                fs.readdir(destImagesPath, (err, entries) => {
                    if (err) cb(err);
                    else {
                        async.eachSeries(entries, (entry, cb) => {
                            if (/\.zip$/gi.test(entry)) {
                                let filesCount = 0;
                                fs.createReadStream(path.join(destImagesPath, entry)).pipe(unzip.Parse())
                                        .on('entry', function(entry) {
                                            if (entry.type === 'File') {
                                                filesCount++;
                                                entry.pipe(fs.createWriteStream(path.join(destImagesPath, path.basename(entry.path))));
                                            } else {
                                                entry.autodrain();
                                            }
                                        })
                                        .on('close', () => {
                                            // 解压完成后检测是不是会文件过多
                                            if (config.maxImages && filesCount > config.maxImages) cb(`${filesCount} images uploaded, but this node can only process up to ${config.maxImages}.`);
                                            else cb();
                                        })
                                        .on('error', cb);
                            } else cb();
                        }, cb);
                    }
                });
            },
            cb => {
                // 寻找控制点描述文件,移动到正确的位置,并删除所有zip文件
                // also remove any lingering zipurl.zip
                fs.readdir(destImagesPath, (err, entries) => {
                    if (err) cb(err);
                    else {
                        async.eachSeries(entries, (entry, cb) => {
                            if (/\.txt$/gi.test(entry)) {
                                mv(path.join(destImagesPath, entry), path.join(destGcpPath, entry), cb);
                            }else if (/\.zip$/gi.test(entry)){
                                fs.unlink(path.join(destImagesPath, entry), cb);
                            } else cb();
                        }, cb);
                    }
                });
            },
            // 创建任务并运行
            cb => {
                new Task(req.id, req.body.name, (err, task) => {
                    if (err) cb(err);
                    else {
                        TaskManager.singleton().addNew(task);
                        res.json({ uuid: req.id });
                        cb();
                    }
                }, req.body.options,
                req.body.webhook,
                req.body.skipPostProcessing === 'true');
            }
        ], err => {
            if (err) die(err.message);
        });
    }
}

至此,建立一个任务并运行需要调用4个接口.
当然NodeODM也提供了合而为一的接口,简化调用:

app.post('/task/new', authCheck, taskNew.assignUUID, taskNew.uploadImages, (req, res, next) => {
    req.body = req.body || {};
    if ((!req.files || req.files.length === 0) && !req.body.zipurl) req.error = "Need at least 1 file or a zip file url.";
    else if (config.maxImages && req.files && req.files.length > config.maxImages) req.error = `${req.files.length} images uploaded, but this node can only process up to ${config.maxImages}.`;
    next();
}, taskNew.createTask);
上一篇下一篇

猜你喜欢

热点阅读