异步调用与重试

2023-03-04  本文已影响0人  zxbyh

在业务场景经常会有类似这样的情况,比如说用户交易成功后,需要调用第三方接口给用户发放其他权益. 这个调用不能做成同步的,否则如果第三方系统出问题就会堵塞我们正常的业务. 其实可以用如下思路来解决.

异步调用, 如果失败了再重试n次, 代码如下:

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;

public class AsyncScheduleCompletableFutureTest {
    private static volatile Integer askCount = 0;

    private static Boolean doTask(){
        return (int)(Math.random() * 100)>80;
    }

    private static String asyncDo(){
        CompletableFuture.supplyAsync(() -> {
            return doTask();
        }).thenAccept(r-> {
            askCount++;
            System.out.println("异步任务首次调用执行返回,"+r+",调用次数"+askCount);
            if(r){
                System.out.println("异步任务调用成功!");
                return;
            }
            System.out.println("异步任务调用失败,进入定时调用");
            scheduleDo();

        });
        return "已经将任务放到异步,主函数返回!";
    }

    public static void scheduleDo(){
        (new Timer()).schedule(
            new TimerTask(){
                public void run(){
                    System.out.println("异步任务进入延迟调用执行!");
                    CompletableFuture.supplyAsync(() -> {
                        return doTask();
                    }).thenAccept(r-> {
                        askCount++;
                        System.out.println("异步任务延迟调用执行返回,"+r+",调用次数"+askCount);
                        if(r){
                            System.out.println("异步任务延迟调用执行成功,退出!");
                            return;
                        }
                        if(askCount>=3){
                            System.out.println("超过最大调用次数,退出!");
                            return;
                        }

                        scheduleDo();

                    });
                }
            },
            2000//延时2000毫秒
        );
    }

    public static void main(String[] args) {
        System.out.println(asyncDo());
    }
}

然后就可以吧这个写成一个工具类, 这儿用的timer

import lombok.RequiredArgsConstructor;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

@RequiredArgsConstructor
public class AsyncScheduleTimer {
    private final Supplier<Boolean> supplierTask;
    private final Integer maxTryCount;
    private final Integer delayMilliSeconds;

    private Timer _timer ;
    private volatile Integer _tryCount = 0;

    public void asyncDo(){
        CompletableFuture.supplyAsync(() -> {
            return this.supplierTask.get();
        }).whenComplete((r,e)-> {
            if (e != null) {
                System.out.println("异步任务首次调用执行异常,"+e.getMessage());
            }
            else {
                _tryCount++;
                System.out.println("异步任务首次调用执行返回," + r + ",调用次数" + _tryCount);
                if (r) {
                    System.out.println("异步任务调用成功!");
                    return;
                }
                else {
                    System.out.println("异步任务调用失败,进入定时调用");
                    this.initTimer();
                    scheduleDo();
                }
            }
        });
        System.out.println("已经将任务放到异步,主函数返回!");
    }

    private void scheduleDo(){
        this._timer.schedule(
            new TimerTask() {
                @Override
                public void run() {
                    System.out.println("异步任务进入延迟调用执行!");
                    CompletableFuture.supplyAsync(() -> {
                        return supplierTask.get();
                    }).whenComplete((r, e) -> {
                        if (e != null) {
                            System.out.println("异步任务首次调用执行异常," + e.getMessage());
                        } else {
                            _tryCount++;
                            System.out.println("异步任务延迟调用执行返回," + r + ",调用次数" + _tryCount);
                            if (r) {
                                System.out.println("异步任务延迟调用执行成功,退出!");
                                return;
                            }
                            if (_tryCount >= maxTryCount) {
                                System.out.println("超过最大调用次数,退出!");
                                return;
                            }

                            scheduleDo();
                        }

                    });
                }
            },
            this.delayMilliSeconds
        );
    }

    private void initTimer() {
        this._timer = new Timer();
    }

    public static void main(String[] args) {
        new AsyncScheduleTimer(
            ()-> (int)(Math.random() * 100)>80,
            3,
            2000
        ).asyncDo();
    }
}

可以换成 HashedWheelTimer ,关于HashedWheelTimer可以看这个文章http://events.jianshu.io/p/311121f63d2f

import io.netty.util.HashedWheelTimer;
import lombok.RequiredArgsConstructor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@RequiredArgsConstructor
public class AsyncScheduleHashedWheelTimer {
    private final Supplier<Boolean> supplierTask;
    private final volatile Integer maxTryCount;
    private final Integer delayMilliSeconds;

    private HashedWheelTimer _timer ;
    private volatile Integer _tryCount = 0;

    public void asyncDo(){
        CompletableFuture.supplyAsync(() -> {
            return this.supplierTask.get();
        }).whenComplete((r,e)-> {
            if (e != null) {
                System.out.println("异步任务首次调用执行异常,"+e.getMessage());
            }
            else {
                _tryCount++;
                System.out.println("异步任务首次调用执行返回," + r + ",调用次数" + _tryCount);
                if (r) {
                    System.out.println("异步任务调用成功!");
                    return;
                }
                else {
                    System.out.println("异步任务调用失败,进入定时调用");
                    this.initTimer();
                    scheduleDo();
                }
            }
        });
        System.out.println("已经将任务放到异步,主函数返回!");
    }

    private void scheduleDo(){
        this._timer.newTimeout(
            timeout -> {
                System.out.println("异步任务进入延迟调用执行!");
                CompletableFuture.supplyAsync(() -> {
                    return supplierTask.get();
                }).whenComplete((r,e)-> {
                    if (e != null) {
                        System.out.println("异步任务首次调用执行异常,"+e.getMessage());
                    }
                    else {
                        _tryCount++;
                        System.out.println("异步任务延迟调用执行返回,"+r+",调用次数"+_tryCount);
                        if(r){
                            System.out.println("异步任务延迟调用执行成功,退出!");
                            return;
                        }
                        if(_tryCount>=maxTryCount){
                            System.out.println("超过最大调用次数,退出!");
                            return;
                        }

                        scheduleDo();
                    }

                });
            },
            this.delayMilliSeconds,//延时2000毫秒
            TimeUnit.MILLISECONDS
        );
    }

    private void initTimer() {

//        int[] timeouts = new int[]{this.retryInterval, this.timeout};
//        Arrays.sort(timeouts);
//        int minTimeout = timeouts[0];
//        if (minTimeout % 100 != 0) {
//            minTimeout = minTimeout % 100 / 2;
//        } else if (minTimeout == 100) {
//            minTimeout = 50;
//        } else {
//            minTimeout = 100;
//        }
//
//        this._timer = new HashedWheelTimer(new DefaultThreadFactory(this.poolName), (long)minTimeout, TimeUnit.MILLISECONDS, 1024, false);

        this._timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 128);
    }

    public static void main(String[] args) {

        new AsyncScheduleHashedWheelTimer(
            ()-> (int)(Math.random() * 100)>80,
            3,
            2000
        ).asyncDo();
    }
}
上一篇下一篇

猜你喜欢

热点阅读