2.异步调用—服务端异步处理

2020-09-03  本文已影响0人  山海树

服务端异步是指在调用端调用的时候,服务端采用异步的方式来进行操作
1.CompleteFuture
服务端代码

 private final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
            new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    public CompletableFuture<String> sayHello(String name) {
        RpcContext context = RpcContext.getContext();

        //通过supplyAsunc开启异步执行
        return CompletableFuture.supplyAsync(()-> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("async return ");
            return "Hello " + name + " " + context.getAttachment("company");
        }, bizThreadPool);

    }

消费端代码
此时消费端不需要开启异步调用,

ReferenceConfig<AsyncGreetingService> referenceConfig = new ReferenceConfig<>();

        referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));

        RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
        referenceConfig.setRegistry(registryConfig);

        referenceConfig.setInterface(AsyncGreetingService.class);
        referenceConfig.setTimeout(5000);

       /* referenceConfig.setLoadbalance("myLoadBalance");
        referenceConfig.setCluster("myBoradcast");*/

       referenceConfig.setVersion("1.0.0");
       referenceConfig.setGroup("dubbo");

        AsyncGreetingService greetingService = referenceConfig.get();

       //设置隐式参数
        RpcContext.getContext().setAttachment("company","alibaba");

        CompletableFuture<String> future = greetingService.sayHello("world");
        future.whenComplete((v,t)->{
            if(t != null) ((Throwable)t).printStackTrace();
            else System.out.println(v);
        });
       System.out.println("over");
      //over将先打印出来,程序继续向下进行,当服务端运行完成后,回调函数触发

2.AsyncContext
该类是dubbo包下的一个类,有兴趣可以学下servlet包下的同名类
服务端

 private final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
            new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        bizThreadPool.execute(()->{
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            asyncContext.write("Hello " + name +" "+ RpcContext.getContext().getAttachment("company"));
        });
        return null;
    }

消费端

     ReferenceConfig<GreetingServiceRpcContext> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setApplication(new ApplicationConfig("first-dubbo-async-consumer"));
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
        referenceConfig.setRegistry(registryConfig);
        referenceConfig.setInterface(GreetingServiceRpcContext.class);
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");
        //异步调用需要设置超时是时间,(默认超时时间1s)不然服务处理时间过长,消费者将断去链接,导致future.get()报错
        referenceConfig.setTimeout(10000);
        referenceConfig.setAsync(true);
        GreetingServiceRpcContext greetingService = referenceConfig.get();
        System.out.println(greetingService.sayHello(" world"));
        Future<String> future = RpcContext.getContext().getFuture();
        System.out.println(future.get());
        System.out.println("over");

这种方法实现的服务端异步调用中,
消费端如果不设置消费端异步调用,采用直接调用的方式,则会一直阻塞,直到服务端真的处理完成
消费端如果设置了消费端异步调用,则可以采用future模式拿到返回值。

不管是消费端还是服务端的异步,目的都是为了让自己的程序继续下行,提升处理效率

上一篇 下一篇

猜你喜欢

热点阅读