RPC

自己动手实现RPC框架(2)-服务调用

2017-09-12  本文已影响0人  景樗子刘

远程服务调用

在上一步,我们实现了一个服务发布程序,这一步我们要进行客户端调用。

实现细节

设计

类图概况

整体的类图如下:


客户端类图

时序图

客户端时序图

重要类描述

代码实现

RemoteProcedureInvoker

RemoteServiceAccessor

package io.destinyshine.storks.consumer;

import ...

/**
 * a base class for RemoteProcedureInvoker
 *
 * @author liujianyu.ljy
 * @date 2017/09/07
 */
@Slf4j
public abstract class RemoteServiceAccessor {

    protected ServiceInstanceSelector serviceInstanceSelector;
    protected ConnectionSupplier connectionSupplier;

    protected ConsumingConnection obtainConnection(ConsumerDescriptor<?> desc) throws Exception {
        ServiceKey serviceKey = ServiceKey.of(desc);

        Optional<ServiceInstance> instOpt = serviceInstanceSelector.select(desc);

        if (instOpt.isPresent()) {
            ServiceInstance inst = instOpt.get();
            return (connectionSupplier.getConnection(inst));
        } else {
            throw new ServiceNotFoundException("cannot found service of " + serviceKey + " in registry.");
        }
    }

    public void setServiceInstanceSelector(ServiceInstanceSelector serviceInstanceSelector) {
        this.serviceInstanceSelector = serviceInstanceSelector;
    }

    public void setConnectionSupplier(ConnectionSupplier connectionSupplier) {
        this.connectionSupplier = connectionSupplier;
    }
}

DefaultRemoteProcedureInvoker

package io.destinyshine.storks.consumer;

import ...

/**
 * @author destinyliu
 */
@Slf4j
public class DefaultRemoteProcedureInvoker extends RemoteServiceAccessor implements RemoteProcedureInvoker {

    @Override
    public ResponseMessage invoke(ConsumerDescriptor desc, RequestMessage requestMessage) throws Exception {
        ConsumingConnection connection = obtainConnection(desc);
        Future<ResponseMessage> responsePromise = connection.sendRequest(requestMessage);
        return responsePromise.get();
    }

}

ServiceInstanceSelector

DirectServiceInstanceSelector

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * 只支持直连的服务选择器
 *
 * @author liujianyu
 * @date 2017/09/03
 */
public class DirectServiceInstanceSelector implements ServiceInstanceSelector {

    @Override
    public Optional<ServiceInstance> select(ConsumerDescriptor desc) {
        if (desc.isDirect()) {
            ServiceInstance inst = ServiceInstance.builder()
                .host(desc.getRemoteHost())
                .port(desc.getRemotePort())
                .serviceInterface(desc.getServiceInterface().getName())
                .serviceVersion(desc.getServiceVersion())
                .build();
            return Optional.of(inst);
        }
        return Optional.empty();
    }

}

ConnectionSupplier

AbstractConnectionSupplier

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * @author destinyliu
 */
@Slf4j
public abstract class AbstractConnectionSupplier implements ConnectionSupplier {

    private Map<ServiceInstance, ConsumingConnection> connectionCache = new ConcurrentHashMap<>();

    @Override
    public synchronized ConsumingConnection getConnection(ServiceInstance instance) throws Exception {
        ConsumingConnection con = connectionCache.computeIfAbsent(instance,
            instance1 -> {
                try {
                    return createConnectionInternal(instance1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            });

        return con;
    }

    /**
     * create connection
     *
     * @param instance
     * @return
     * @throws Exception
     */
    protected abstract ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception;

    public void shutdown() {
        logger.warn("shutting down connectionManager...");
        connectionCache.forEach((serviceKey, con) -> {
            logger.warn("closing all connections of serviceKey={}", serviceKey);
            try {
                con.close();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            logger.warn(
                "closed connection of serviceKey={}, {}",
                serviceKey,
                con
            );
            logger.warn("closed all connections of serviceKey={}", serviceKey);
        });
        logger.warn("shutdown connectionManager finished.");
    }
}

SocketChannelConsumingConnectionSupplier

package io.destinyshine.storks.consumer.connect;

import ...

/**
 * NIO consuming connection supplier.
 *
 * @author liujianyu
 * @date 2017/09/03
 */
@Slf4j
public class SocketChannelConsumingConnectionSupplier extends AbstractConnectionSupplier implements ConnectionSupplier {

    @Override
    public ConsumingConnection createConnectionInternal(ServiceInstance instance) throws Exception {
        logger.info("will create connection of instance {}", instance);
        String remoteHost = instance.getHost();
        int remotePort = instance.getPort();
        SocketChannelConsumingConnection con = new SocketChannelConsumingConnection(remoteHost, remotePort);
        con.connect();
        return con;
    }
}

ConsumingConnection

在ConsumingConnection的实现过程中,我们使用基于Netty 的NIO方式实现。由于NIO的response返回是异步的,并且发送Request后不会阻塞线程知道远程服务端返回详细;所以需要维护一个ConcurrentLinkedQueue<Promise<ResponseMessage>>队列,当发送一个请求的时候,增加一个Promise到队列中,并返回这个Promise。收到响应的时候取队头的Promise设置结果,在Promise上等待的线程可收到结果。可参考下面的代码。

Netty中的Promise是Future的子接口,类似于java8自带的CompletableFuture,能够增加监听器监听其是否完成,也能够手动设置结果。可参考java8的CompletableFuture.

实现代码。

package io.destinyshine.storks.consumer.support;

import ...

/**
 * @author liujianyu
 */
public class SocketChannelConsumingConnection implements AutoCloseable, ConsumingConnection {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private ConcurrentLinkedQueue<Promise<ResponseMessage>> responsePromises = new ConcurrentLinkedQueue<>();

    private final String remoteHost;
    private final int remotePort;

    private Channel channel;

    private long connectedTime;

    public SocketChannelConsumingConnection(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void connect() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.remoteAddress(new InetSocketAddress(remoteHost, remotePort));
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    super.channelActive(ctx);
                }

                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(new ProtostuffEncoder<>(RequestMessage.class))
                        .addLast(Protocol.newFrameDecoder())
                        .addLast(new ProtostuffDecoder<>(ResponseMessage.class, ResponseMessage::new))
                        .addLast(new SimpleChannelInboundHandler<ResponseMessage>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg)
                                throws Exception {
                                Promise<ResponseMessage> promise;
                                if ((promise = responsePromises.poll()) != null) {
                                    promise.setSuccess(msg);
                                } else {
                                    logger.error("remote server closed!");
                                }
                            }
                        });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect().sync();
            this.channel = channelFuture.channel();
            this.connectedTime = System.currentTimeMillis();
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    logger.debug(future.toString() + "client connected");
                } else {
                    logger.debug(future.toString() + "server attemp failed", future.cause());
                }

            });
        } finally {

        }
    }

    @Override
    public Promise<ResponseMessage> sendRequest(RequestMessage requestMessage) {
        Promise<ResponseMessage> promise = this.channel.eventLoop().newPromise();
        this.responsePromises.add(promise);
        this.channel.writeAndFlush(requestMessage);
        return promise;
    }

    @Override
    public String toString() {
        return "SocketChannelConsumingConnection{" +
            "remoteHost='" + remoteHost + '\'' +
            ", remotePort=" + remotePort +
            '}';
    }

    @Override
    public void close() {
        channel.close().awaitUninterruptibly();
    }
}

运行客户端

同样使用原始的方式执行客户端代码。

在这个实例中,用多个线程发起并行RPC调用,试验是否能够正确响应,多个线程之间是否会错乱。经过试验多线程正常调用和响应。

package io.destinyshine.storks.sample.service;

import ...

/**
 * @author liujianyu
 */
public class DirectClientMain {

    private static final Logger logger = LoggerFactory.getLogger(DirectClientMain.class);

    public static void main(String[] args) throws Exception {

        DefaultRemoteProcedureInvoker invoker = new DefaultRemoteProcedureInvoker();
        invoker.setConnectionSupplier(new SocketChannelConsumingConnectionSupplier());
        invoker.setServiceInstanceSelector(new DirectServiceInstanceSelector());

        ConsumerDescriptor<HelloService> desc = ConsumerBuilder
            .ofServiceInterface(HelloService.class)
            .remoteServer("127.0.0.1")
            .remotePort(39874)
            .serviceVersion("1.0.0")
            .direct(true)
            .build();

        ConsumerFactory consumerFactory = new DefaultConsumerFactory(invoker);

        HelloService helloServiceConsumer = consumerFactory.getConsumer(desc);

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            int finalI = i;
            executorService.submit(() -> {
                String input = null;
                String result = null;
                try {
                    input = "tom,direct," + Thread.currentThread().getName() + "," + finalI;
                    result = helloServiceConsumer.hello(input);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                logger.info("input={}, get result: {}", input, result);
            });
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                invoker.close();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }));

    }
}

上一篇 下一篇

猜你喜欢

热点阅读