使用管道操作来重构责任链模式

2022-03-10  本文已影响0人  东南枝下

前言

责任链的实现可以看这个:https://www.jianshu.com/p/904340fb6f8f

实现出来的责任链对象如下:

Player player = new PlayerA(new PlayerB(new PlayerC(new PlayerD(null))));

在springboot项目中我把责任链注册成一个bean,但责任链的定义实在丑陋

在学习了netty后觉得ChannelPipeline的使用方式比较优雅一些

 ChannelPipeline pipeline = socketChannel.pipeline();
 pipeline.addLast(serverHandler);

所以参考netty的管道来改造一下责任链

源码阅读

ChannelPipeline的实现DefaultChannelPipeline中的addLastaddLast0方法
可以看出这是一个链表结构,新的上下文对象插入在pre与tail之间

图片.png 图片.png 图片.png

从构造函数中看出,初始的管道中就包含了一个head和tail节点,这里就不详细看下去了

图片.png

看一下netty是如何把hander包装进Context的,这里使用了一个代理,在hander外包了一层

图片.png

看一下他的父类AbstractChannelHandlerContext,可以看出,在fireChannelRead中递归去调用Context链表中的handler

图片.png

AbstractChannelHandlerContext的接口ChannelHandlerContext

图片.png

综合一下,有三个角色,管道(Pipeline)、上下文(Context)、和执行器(handler)。
管道(DefaultChannelPipeline)中有一对上下文链表的头对象(head)和尾对象(tail),上下文链表中包裹着具体的执行器(handler),在执行某个上下文对象的fireChannelRead时会递归的执行链表中下一个节点fireChannelRead

以上,便可以参照此设计一个Pipeline

代码编写

先不考虑增加/减少handler的并发风险

/**
 * @author Jenson
 */
public interface Handler {

    /**
     * 执行操作
     *
     * @param param 入参
     * @return 参数,以map形式
     */
    Map<String, Object> doing(final Object param);
}
/**
 * @author Jenson
 */
public abstract class AbstractHandlerContext {

    volatile AbstractHandlerContext next;
    volatile AbstractHandlerContext prev;

    /**
     * 执行上下文
     *
     * @param param  输入参数
     * @param result 过程中产生结果
     * @return 此上下文对象
     */
    final AbstractHandlerContext doing(Object param, Map<String, Object> result) {
        Map<String, Object> rst = this.doing0(param, result);
        if (rst != null && rst.size() > 0) {
            result.putAll(rst);
        }
        // 执行下一节点
        if (this.next != null) {
            AbstractHandlerContext nextCtx = next.doing(param, result);
        }
        return this;
    }

    /**
     * 具体的执行上下文动作
     *
     * @param param  输入参数
     * @param result 过程中产生结果
     */
    abstract Map<String, Object> doing0(Object param, Map<String, Object> result);
}

/**
 * @author Jenson
 */
public class DefaultHandlerContext extends AbstractHandlerContext {

    private final Handler handler;

    public DefaultHandlerContext(Handler handler) {
        this.handler = handler;
    }

    @Override
    Map<String, Object> doing0(Object param, Map<String, Object> result) {
        return handler.doing(param);
    }
}
/**
 * @author Jenson
 */
public class HeadHandlerContext extends AbstractHandlerContext{
    @Override
    Map<String, Object> doing0(Object param, Map<String, Object> result) {
        System.out.println("------------------默认起始节点---------------");
        return null;
    }
}
/**
 * @author Jenson
 */
public class TailHandlerContext extends AbstractHandlerContext{
    @Override
    Map<String, Object> doing0(Object param, Map<String, Object> result) {
        System.out.println("------------------默认结束节点---------------");
        return null;
    }
}
/**
 * 管道
 *
 * @author Jenson
 */
public interface Pipeline {

    /**
     * 增加一个处理者节点
     *
     * @param handler 处理者
     * @return 管道
     */
    Pipeline addLast(Handler handler);

    /**
     * 执行管道
     *
     * @param param  输入参数
     * @return 过程中产生结果
     */
    Map<String, Object> doing(Object param);
}
/**
 * @author Jenson
 */
public class DefaultPipeline implements Pipeline {

    final AbstractHandlerContext head;
    final AbstractHandlerContext tail;

    /**
     * 构造函数
     */
    public DefaultPipeline() {
        head = new HeadHandlerContext();
        tail = new TailHandlerContext();
        head.next = tail;
        tail.prev = head;
    }

    /**
     * 增加一个处理者节点
     *
     * @param handler 处理者
     * @return 管道
     */
    @Override
    public Pipeline addLast(Handler handler) {
        AbstractHandlerContext newCtx = new DefaultHandlerContext(handler);
        synchronized (this) {
            AbstractHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
        return this;
    }

    @Override
    public Map<String, Object> doing(Object param) {
        Map<String, Object> result = new HashMap<>();
        // 从头执行
        head.doing(param, result);
        return result;
    }
}

至此,简单的Pipeline已经完成

测试

/**
 * @author Jenson
 */
@Data
public class Person {
    private String name;
    private Integer age;
}
/**
 * @author Jenson
 */
public class PlayerHandler1 implements Handler {
    @Override
    public Map<String, Object> doing(Object param) {
        System.out.println("第 1 个节点------参数:" + param.toString());
        Map<String,Object> result = new HashMap<>();
        result.put("aaa","111");
        result.put("bbb","222");
        return result;
    }
}
/**
 * @author Jenson
 */
public class PlayerHandler2 implements Handler {
    @Override
    public Map<String, Object> doing(Object param) {
        System.out.println("第 2 个节点------参数:" + param.toString());
        Map<String, Object> result = new HashMap<>();
        result.put("bbb", "3safafaf33");
        result.put("ccc", "333");
        ((Person) param).setName("我已经不是我了");
        return result;
    }
}
/**
 * @author Jenson
 */
public class PlayerHandler3 implements Handler {
    @Override
    public Map<String, Object> doing(Object param) {
        System.out.println("第 3 个节点------参数:" + param.toString());
        Map<String,Object> result = new HashMap<>();
        result.put("ddd","555");
        result.put("eee","666");
        return result;
    }
}
/**
 * @author Jenson
 */
public class PipelineTestMain {
    public static void main(String[] args) {

        Pipeline pipeline = new DefaultPipeline();

        pipeline.addLast(new PlayerHandler1())
                .addLast(new PlayerHandler2())
                .addLast(new PlayerHandler3());

        Person person = new Person();
        person.setName("jenson");
        person.setAge(24);
        Map<String, Object> result = pipeline.doing(person);
        // 遍历返回参数
        result.keySet().forEach(key -> {
            System.out.println(key + ":" + result.get(key));
        });
    }
}
------------------默认起始节点---------------
第 1 个节点------参数:Person(name=jenson, age=24)
第 2 个节点------参数:Person(name=jenson, age=24)
第 3 个节点------参数:Person(name=我已经不是我了, age=24)
------------------默认结束节点---------------
aaa:111
ccc:333
bbb:3safafaf33
eee:666
ddd:555

入参对象内的值在handler执行过程中会被改变,如果有些值不愿意被修改,可以用final定义

代码地址:https://gitee.com/jenson343/hotchpotch/tree/master/pipeline-to-chain

改造:在springboot中使用

上述实现的管道,为了方便在springboot中使用,需要做一些改造,首先handler们应该做成一个个Spring Bean,这样在实际使用中才方便其他bean依赖注入。
既然handler做成了Spring Bean,单向链表就可以用一个Map来记录,Map中存的是BeanName,执行时根据BeanName动态从IOC容器中获取bean来执行
使用Map的话可以用两个Map实现双向链表(Map<当前节点,下一节点>、Map<当前节点,上一节点>),这样就能实现在原有链表的任意位置添加,移除指定handler节点(二次开发,对标准业务功能进行调整)

/**
 * @author Jenson
 */
public interface Handler {

    /**
     * 执行操作
     *
     * @param param 入参
     * @return 返回参数
     */
    Object doing(final Object param);
}
/**
 * 管道
 *
 * @author Jenson
 */
public interface Pipeline {

    /**
     * 执行管道
     *
     * @param param 输入参数
     * @return 过程中产生结果(Map < handlerName, result >)
     */
    Map<String, Object> doing(Object param);

    /**
     * 末位增加一个处理者节点
     *
     * @param handlerName 处理者BeanName
     * @return 管道
     */
    Pipeline addLast(String handlerName);

    /**
     * 在指定处理者前增加一个处理者节点
     *
     * @param pointHandler 指定处理者BeanName
     * @param handlerName  处理者BeanName
     * @return 管道
     */
    Pipeline addBefore(String pointHandler, String handlerName);

    /**
     * 在指定处理者后增加一个处理者节点
     *
     * @param pointHandler 指定处理者BeanName
     * @param handlerName  处理者BeanName
     * @return 管道
     */
    Pipeline addAfter(String pointHandler, String handlerName);

    /**
     * 移除一个节点,这个节点不能是默认开始节点和默认结束节点
     *
     * @param handlerName 处理者BeanName
     * @return 管道
     */
    Pipeline remove(String handlerName);
}

/**
 * 手动获取Bean
 *
 * @author Jenson
 */
@Component
public class SpringContextUtil implements ApplicationContextAware {

    /**
     * 应用上下文,用于获取bean
     */
    private static ApplicationContext applicationContext;

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }

    /**
     * 根据bean的名称获取bean
     *
     * @param name bean的名称
     * @return bean
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }
}


/**
 * 管道默认实现类
 *
 * @author Jenson
 */
@Slf4j
public class DefaultPipeline implements Pipeline {
    /**
     * 链表默认头节点,始终保持在最头部
     */
    private final String head = "defaultPipelineHeadHandler";
    /**
     * 链表默认尾节点,始终保持在最尾部
     */
    private final String tail = "defaultPipelineTailHandler";

    /**
     * 正向链表
     */
    private final Map<String, String> positiveLinkMap;
    /**
     * 反向链表
     */
    private final Map<String, String> reverseLinkMap;

    public DefaultPipeline() {
        this.positiveLinkMap = new ConcurrentHashMap<>();
        this.reverseLinkMap = new ConcurrentHashMap<>();
        positiveLinkMap.put(head, tail);
        reverseLinkMap.put(tail, head);
    }


    @Override
    public Map<String, Object> doing(Object param) {
        Map<String, Object> result = new HashMap<>();
        // 从头部开始执行
        this.executeHandler(head,
                param,
                result,
                0,
                positiveLinkMap.size());

        return result;
    }

    @Override
    public Pipeline addLast(String handlerName) {
        this.addBefore(tail, handlerName);
        return this;
    }

    @Override
    public Pipeline addBefore(String pointHandler, String handlerName) {
        synchronized (this) {
            this.remove0(handlerName);
            if (reverseLinkMap.get(pointHandler) != null) {
                String prev = reverseLinkMap.get(pointHandler);
                positiveLinkMap.put(prev, handlerName);
                positiveLinkMap.put(handlerName, pointHandler);
                reverseLinkMap.put(pointHandler, handlerName);
                reverseLinkMap.put(handlerName, prev);
            } else {
                log.error("pointHandler (" + pointHandler + ") not in link map !");
            }
        }
        return this;
    }

    @Override
    public Pipeline addAfter(String pointHandler, String handlerName) {
        synchronized (this) {
            this.remove0(handlerName);
            if (positiveLinkMap.get(pointHandler) != null) {
                String after = positiveLinkMap.get(pointHandler);
                positiveLinkMap.put(pointHandler, handlerName);
                positiveLinkMap.put(handlerName, after);
                reverseLinkMap.put(handlerName, pointHandler);
                reverseLinkMap.put(after, handlerName);
            } else {
                log.error("pointHandler (" + pointHandler + ") not in link map !");
            }
        }
        return this;
    }

    @Override
    public Pipeline remove(String handlerName) {
        synchronized (this) {
            this.remove0(handlerName);
        }
        return this;
    }

    /**
     * 从某一个handler节点开始,顺序执行每一个节点
     *
     * @param beanName 需要执行的handler节点
     * @param param    入参
     * @param result   handler执行返回参数
     * @param count    当前执行节点计数器,从0开始
     * @param total    最大递归层级,为链表长度,作为遇到环形链表时的安全保障,避免无限递归
     */
    private void executeHandler(String beanName,
                                Object param,
                                Map<String, Object> result,
                                int count,
                                int total
    ) {
        if (result == null) {
            result = new HashMap<>();
        }
        if (count > total) {
            // 递归安全保障,避免遇到环形链表导致内存溢出
            return;
        }
        Object bean = SpringContextUtil.getBean(beanName);
        if (bean instanceof Handler) {
            Object rst = ((Handler) bean).doing(param);
            // 以beanName区分每个handler的返回值
            result.put(beanName, rst);
        } else {
            // 错误的bean,不是Handler接口的实现
            log.error("error.wrong handler bean !");
        }
        // 执行下一个节点
        String next = positiveLinkMap.get(beanName);
        if (next != null) {
            count = count + 1;
            this.executeHandler(next, param, result, count, total);
        }
    }

    /**
     * 删除节点,非原子化操作
     *
     * @param handlerName 处理者BeanName
     */
    private void remove0(String handlerName) {
        if (!head.equals(handlerName) && !tail.equals(handlerName)) {
            String prev = reverseLinkMap.get(handlerName);
            String after = positiveLinkMap.get(handlerName);
            if (prev != null) {
                positiveLinkMap.put(prev, after);
                positiveLinkMap.remove(handlerName);
            }
            if (after != null) {
                reverseLinkMap.put(after, prev);
                reverseLinkMap.remove(handlerName);
            }
        }
    }
}

/**
 * 默认管道起始handler节点
 * @author Jenson
 */
@Component
public class DefaultPipelineHeadHandler implements Handler {
    @Override
    public Object doing(Object param) {
        System.out.println("------------------默认起始节点---------------");
        return null;
    }
}
/**
 * 默认管道结束handler节点
 *
 * @author Jenson
 */
@Component
public class DefaultPipelineTailHandler implements Handler {
    @Override
    public Object doing(Object param) {
        System.out.println("------------------默认结束节点---------------");
        return null;
    }
}
        Person person = new Person();
        person.setName("jenson");
        person.setAge(24);

        Pipeline pipeline = new DefaultPipeline();
        pipeline.addLast("playerHandlerA")
                .addLast("playerHandlerB")
                .addLast("playerHandlerC")
                .remove("playerHandlerB")
                .addBefore("playerHandlerA", "playerHandlerB")
                .addLast("playerHandlerC")
                .addAfter("playerHandlerB", "playerHandlerC")
                .addAfter("playerHandlerB", "playerHandlerA");

        Map<String, Object> result = pipeline.doing(person);
        // 遍历返回参数
        result.keySet().forEach(key -> {
            System.out.println(key + ":" + result.get(key));
        });
------------------默认起始节点---------------
第 2 个节点------参数:Person(name=jenson, age=24)
第 1 个节点------参数:Person(name=jenson, age=24)
第 3 个节点------参数:Person(name=jenson, age=24)
------------------默认结束节点---------------
playerHandlerA:aaaaa
playerHandlerB:bbbbbbbbbbbbbbbbbb
playerHandlerC:cccccccc
defaultPipelineHeadHandler:null
defaultPipelineTailHandler:null

/**
 * @author Jenson
 */
@Configuration
public class DefaultPipelineConfig {

    @Bean("defaultPipeline")
    public Pipeline createDefaultPipeline() {
        Pipeline pipeline = new DefaultPipeline();
        pipeline.addLast("playerHandlerA")
                .addLast("playerHandlerB")
                .addLast("playerHandlerC");
        return pipeline;
    }
}

这样就可以在不改动业务源码的情况下去调整管道中的执行节点

/**
 * @author Jenson
 */
@Configuration
@DependsOn("defaultPipeline")
public class CustomPipelineConfig {

    @Bean("customPipeline")
    public Pipeline changeDefaultPipeline(@Qualifier("defaultPipeline") Pipeline pipeline) {
        System.out.println("---修改bean---");
        pipeline.remove("playerHandlerB");
        return pipeline;
    }
}
上一篇下一篇

猜你喜欢

热点阅读