实现一个简单的限流器

2023-05-11  本文已影响0人  猫螺丝股记
/**
 * 限流器初始化
 * 
 **/
@Component
public class RateLimiterInitializer implements InitializingBean {
    @Resource
    private ApplicationContext applicationContext;

    @Override
    public void afterPropertiesSet() {
        Map<String, Object> map = applicationContext.getBeansWithAnnotation(UseLimiter.class);
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            Object val = entry.getValue();
            Class<?> clazz = AopUtils.getTargetClass(val);
            Method[] methods = clazz.getMethods();
            for (Method m : methods) {
                if (!m.isAnnotationPresent(Limiter.class)) {
                    continue;
                }

                StringBuilder params = new StringBuilder();
                params.append(m.getName()).append(";");
                for (Type type : m.getGenericParameterTypes()) {
                    params.append(type.getTypeName()).append(";");
                }
                params.append(m.getAnnotatedReturnType().getType());
                Limiter limiter = m.getAnnotation(Limiter.class);
                RateLimitContextHolder.put(params.toString(), limiter.permits());
            }

        }

    }

}

```java
//  aop处理

/**
 * <h2>限流处理</h2>
 * <pre>
 *     当相应的参数大于0则进入到相应的处理流程中;
 *   
 * </pre>
 **/
@Component
@Aspect
public class RateLimitAspect {

   // 拦截方法上的@Limiter
    @Pointcut("@annotation(.Limiter)")
    public void point() {
        //
    }

    @Around("point()")
    public Object limit(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
        Limiter limit = methodSignature.getMethod().getDeclaredAnnotation(Limiter.class);
        if (limit.timeout() > 0) {
            return circuitBreaker(proceedingJoinPoint, limit.timeout());
        } else if (limit.permits() > 0) {
            StringBuilder key = new StringBuilder();
            key.append(methodSignature.getMethod().getName()).append(";");
            for (Type type : methodSignature.getMethod().getGenericParameterTypes()) {
                key.append(type.getTypeName()).append(";");
            }
            key.append(methodSignature.getMethod().getAnnotatedReturnType().getType());

            Semaphore limiter = RateLimitContextHolder.get(key.toString());
            try {
                log.info("限流执行中,剩余可用令牌数:{}, 参数:{}, 方法:{}",
                        limiter == null ? "empty" : limiter.availablePermits(), key, methodSignature.getMethod());

                if (limiter != null && !limiter.tryAcquire()) {
                    throw new BizException("请求频率超过限制,请稍后再试!");
                }
                return proceedingJoinPoint.proceed();
            } finally {
                // 释放
                if(limiter != null){
                    limiter.release();
                }
            }

        }
        return null;
    }


    /**
     * 熔断器
     *
     * @param proceedingJoinPoint point
     * @param timeout             timeout
     * @return 处理结果
     */
    private Object circuitBreaker(ProceedingJoinPoint proceedingJoinPoint, int timeout) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<Object> future = es.submit(() -> {
            try {
                return proceedingJoinPoint.proceed();
            } catch (Throwable throwable) {
                throw new BizException("处理异常,请稍后再试!");
            }
        });

        final Object obj;
        try {
            obj = future.get(timeout, TimeUnit.MILLISECONDS);
            return obj;
        } catch (Throwable e) {
            future.cancel(true);
            throw new BizException("处理异常,请稍后再试!");
        }
    }

}





上一篇 下一篇

猜你喜欢

热点阅读