工作流,可用于复杂的交易系统

2019-07-29  本文已影响0人  要不再等等

架构图


工作流

1.工作流引擎

package demo.flow;

import com.alibaba.fastjson.JSON;

import java.util.HashMap;
import java.util.Map;

/**
 * desc 自定义工作流引擎
 * 
 */
public class WorkFlowEngine {

    public static final WorkFlowEngine getInstance = new WorkFlowEngine();

    /**
     * 工作流集合,渠道类型-->Map<处理器名称-->处理器>
     */
    private static Map<String, Map<String, BusinessProcessorProxy>> workFlows = new HashMap<String, Map<String, BusinessProcessorProxy>>();
        
        /**
     * 模拟数据
     */  
    static {
        Map<String, BusinessProcessorProxy> bMap = new HashMap<>();
        workFlows.put("test", bMap);

        BusinessProcessorProxy b1 = new BusinessProcessorProxy();

        Map<String, String> nextWorks1 = new HashMap<>();
        nextWorks1.put("CONTINUE", "p2");
        b1.setNextWorks(nextWorks1);
        //
        BusinessProcessorProxy b2 = new BusinessProcessorProxy();
        BusinessProcessorStart bs = new BusinessProcessorStart();

        Map<String, String> nextWorks2 = new HashMap<>();
        nextWorks2.put("CONTINUE", "p3");

        b2.setBusinessProcessor(bs);
        b2.setNextWorks(nextWorks2);
        //
        BusinessProcessorProxy b3 = new BusinessProcessorProxy();
        BusinessProcessorContinue bs1 = new BusinessProcessorContinue();

        b3.setBusinessProcessor(bs1);

        bMap.put("START", b1);
        bMap.put("p2", b2);
        bMap.put("p3", b3);
    }

    /**
     * 执行业务工作流
     */
    public void processWork() {
        BusinessContext txnContext = BusinessContextHelper.getInstance.getTxnContext();
        try {

            String nextWorkValue = null;
            BusinessProcessorProxy nextProcessor = null;
            
            // 获取渠道类型的工作流
            Map<String, BusinessProcessorProxy> workFlow = getWorkFlow();

            BusinessProcessorProxy currentProcessor = workFlow.get(txnContext.getCurrentWork());
            if (txnContext.getProcessResult() != null) {
                nextWorkValue = currentProcessor.getNextWorks().get(txnContext.getProcessResult());
                nextProcessor = workFlow.get(nextWorkValue);
            }
            
            // 结束
            if (nextProcessor == null) {
                return;
            }

            txnContext.setProcessResult(null);
            txnContext.setCurrentWork(nextWorkValue);
            nextProcessor.process(txnContext);
        } catch (Exception e) {
            // 系统异常
            return;
        }

        // 处理下一流程
        processWork();
    }

    /**
     * 获取渠道类型的工作流
     * 
     * @return 交易渠道的工作流
     */
    private Map<String, BusinessProcessorProxy> getWorkFlow(){
        Map<String, BusinessProcessorProxy> workFlow = workFlows.get("test");
        return workFlow;
    }


}

2.工作流实现引擎接口和实现

package demo.flow;

/**
 * desc 工作流实现引擎
 */
public interface WorkFlowProcessor {

    /**
     * 处理流程
     * 
     */
    public void process();
}
package demo.flow;

/**
 * desc 自定义工作流处理器
 *
 */
public class MyWorkFlowProcessor implements WorkFlowProcessor {

    public static final MyWorkFlowProcessor getInstance = new MyWorkFlowProcessor();

    /**
     * 自定义工作流引擎
     */
    private WorkFlowEngine workFlowEngine = WorkFlowEngine.getInstance;

    @Override
    public void process(){
        workFlowEngine.processWork();
    }

}

3.业务处理接口和实现

package demo.flow;

/**
 * 业务处理接口
 * 
 */
public interface BusinessProcessor {

    /**
     * 处理业务
     * 
     */
    public void process(BusinessContext businessContext);
}
package demo.flow;

public class BusinessProcessorStart implements BusinessProcessor {

    @Override
    public void process(BusinessContext businessContext) {
        businessContext.setContext("start");
        businessContext.setProcessResult("CONTINUE");
        System.out.println("执行任务1");
    }
}
package demo.flow;

public class BusinessProcessorContinue implements BusinessProcessor {
    @Override
    public void process(BusinessContext businessContext) {
        businessContext.setContext("end");
        CommonResponse response = (CommonResponse) businessContext.getResponse().get("response");
        response.setCode("00");
        response.setMsg("success");
        System.out.println("执行任务2");


    }
}

4.业务处理代理类

package demo.flow;

import java.util.HashMap;
import java.util.Map;

/**
 * desc 业务处理代理类
 *
 */
public class BusinessProcessorProxy {
    
    /**
     * 业务处理器
     */
    private BusinessProcessor businessProcessor;
    
    /**
     * 下一个工作任务集合
     */
    private Map<String, String> nextWorks = new HashMap<String, String>();


    /**
     * 执行实际业务逻辑
     * 
     * @param businessContext 业务上下文
     */
    public void process(BusinessContext businessContext){
        if (businessProcessor != null) {
            businessProcessor.process(businessContext);
        }
    }

    public BusinessProcessor getBusinessProcessor() {
        return businessProcessor;
    }

    public void setBusinessProcessor(BusinessProcessor businessProcessor) {
        this.businessProcessor = businessProcessor;
    }

    public Map<String, String> getNextWorks() {
        return nextWorks;
    }

    public void setNextWorks(Map<String, String> nextWorks) {
        this.nextWorks = nextWorks;
    }
}

5.业务上下文

package demo.flow;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
 * desc 业务上下文
 * 
 */
public class BusinessContext implements Serializable {

    private static final long serialVersionUID = 1L;
    
    /**
     * 当前工作状态
     */
    private String currentWork = "START";

    /**
     * 处理结果
     */
    private String processResult = "CONTINUE";

    private String context;

    private Map<String, Object> response = new HashMap<>();

    public BusinessContext() {
    }

    public String getCurrentWork() {
        return currentWork;
    }

    public void setCurrentWork(String currentWork) {
        this.currentWork = currentWork;
    }

    public String getProcessResult() {
        return processResult;
    }

    public void setProcessResult(String processResult) {
        this.processResult = processResult;
    }

    public String getContext() {
        return context;
    }

    public void setContext(String context) {
        this.context = context;
    }

    public Map<String, Object> getResponse() {
        return response;
    }

    public void setResponse(Map<String, Object> response) {
        this.response = response;
    }
}

6.上下文帮助类

package demo.flow;

/**
 * 数据上下文帮助类获取
 * 
 */
public class BusinessContextHelper {

    public static final BusinessContextHelper getInstance = new BusinessContextHelper();
    /**
     * threadlocal
     */
    private  static ThreadLocal<BusinessContext> txnContextThreadLocal = new ThreadLocal<BusinessContext>();

    static {

    }
    /**
     * 添加上下文
     * 
     * @param txnContext
     */
    public void addTxnContext(BusinessContext txnContext) {
        txnContextThreadLocal.set(txnContext);
    }

    /**
     * 获取当前交易上下文
     * 
     * @return 当前的交易上下文
     */
    public BusinessContext getTxnContext() {
        return txnContextThreadLocal.get();
    }

    public void removeTxnContext() {
        txnContextThreadLocal.remove();
    }
    
}

7.通用返回类

package demo.flow;

public class CommonResponse {
    private String code;

    private String msg;

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

8.测试类

package demo.flow;

public class FlowTest {

    public static void main(String[] args) {
        CommonResponse commonResponse = new CommonResponse();
        BusinessContext businessContext = new BusinessContext();
        businessContext.getResponse().put("response", commonResponse);
        new Thread(() -> {
            BusinessContextHelper.getInstance.addTxnContext(businessContext);
            MyWorkFlowProcessor.getInstance.process();
        }).start();

    }
}

异步返回的话,可使用DeferredResult

上一篇下一篇

猜你喜欢

热点阅读