Oozie-command

2016-06-29  本文已影响129人  PunyGod

从前面几篇组件介绍,希望用户对oozie的组件有个粗略的概念,用户需要完成的业务逻辑将会被封装成为wf、coord、bundle,作为一个调度系统,oozie需要为用户做到什么呢,oozie需要按照用户的设地在合适的时候对用户的wf、coord、bundle进行操作,可能的操作包括 启动、停止、杀死、重跑、挂起、恢复、结束等等,本篇主要介绍oozie的命令系统;

/** * Extends Callable adding the concept of priority. <p/> The priority is useful when queuing callables for later 
* execution via the {@link org.apache.oozie.service.CallableQueueService}. <p/> A higher number means a higher 
* priority. <p/>
*/
public interface XCallable<T> extends Callable<T> {
/** * Base class for synchronous and asynchronous commands. 
* <p/> 
* It enables by API the following pattern: 
* <p/>
* <ul> 
* <li>single execution: a command instance can be executed only once</li> 
* <li>eager data loading: loads data for eager precondition check</li>
* <li>eager precondition check: verify precondition before obtaining lock</li> 
* <li>data loading: loads data for precondition check and execution</li>
* <li>precondition check: verifies precondition for execution is still met</li> 
* <li>locking: obtains exclusive lock on key before executing the command</li> 
* <li>execution: command logic</li> 
* </ul> 
* <p/> 
* It has built in instrumentation and logging. 
*/
public abstract class XCommand<T> implements XCallable<T> {

oozie将所有的命令抽象出一层 XCommand ,命令根据不同的场景需要同步执行或者异步执行,当进行异步执行的时候,还引入了优先级的概念来排列命令的执行计划;

/** * Implements the XCommand life-cycle. 
*
* @return the {link #execute} return value. 
* @throws Exception thrown if the command could not be executed. */
@Override
public final T call() throws CommandException {
    setLogInfo();
    if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
        LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
        return null;
    }
    commandQueue = null; 
    instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
    Instrumentation.Cron callCron = new Instrumentation.Cron();
    try {
        callCron.start();
        if (!isSynchronous) {
            eagerLoadState();
            eagerVerifyPrecondition();
        }
        try {
            T ret = null;
            if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
                Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
                acquireLockCron.start();
                acquireLock();
                acquireLockCron.stop();
                instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
            } 
           // executing interrupts only in case of the lock required commands
            if (lock != null) {
                this.executeInterrupts();
            } 
           if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) { 
               if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
                        && !used.compareAndSet(false, true)) {  
                  LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
                    return null;
                }
                LOG.trace("Load state for [{0}]", getEntityKey()); 
               loadState();
               LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
                verifyPrecondition();
                LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
                Instrumentation.Cron executeCron = new Instrumentation.Cron();
                executeCron.start();
                ret = execute();
                executeCron.stop();
                instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
            }
            if (commandQueue != null) {
                CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
                for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
                    LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
                    if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
                        LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
                                .size(), entry.getKey());
                   }
                }
            }
            return ret;
        }
        finally {
            if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 
               releaseLock();
           }
        }
    }
    catch(PreconditionException pex){ 
       LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
        return null;
    }
    catch (XException ex) {
        LOG.error("XException, ", ex);
        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
        if (ex instanceof CommandException) { 
           throw (CommandException) ex;
        }
        else {
            throw new CommandException(ex);
        }
    }
    catch (Exception ex) {
        LOG.error("Exception, ", ex); 
       instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 
       throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
    } 
   catch (Error er) {
        LOG.error("Error, ", er);  
      instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 
       throw er;
    }
    finally {
        FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
        callCron.stop(); 
       instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
    }}

如上是 所有命令的call方法:如果是异步执行命令时候,再执行命令的时候 需要
先来测试一下此刻的状态是否还有执行命令的必要;所有的call方法在执行 具体的 execute() 时候,都需要装载需要操作的信息,用于更改数据库信息。如果对某个实例的操作不能同时进行,在执行命令之前还需要去获取锁,来保证某一时刻的操作是唯一的。很多情况下,一个命令会衍生出子命令,比如说杀死一个bundle的时候,我需要将bundle的状态置为杀死之外,还要去发送杀死bundle中的coord杀死的命令,这个就是一个命令产生子命令的场景,这个时候,我们也需要将这些子命令加入到异步命令执行池中去直接。

不同的命令逻辑的执行区别主要体现在不同的子类中的 execute() 中。

diagram.png

图中是oozie的整个包含不同业务色彩的命令系统。

上一篇下一篇

猜你喜欢

热点阅读