接口幂等性(重复提交)
2021-03-26 本文已影响0人
jiahzhon
- 幂等接口就是多次调用不会影响到系统。
数据库唯一主键
- 数据库唯一主键的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于“插入”时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。
- 使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键,这样才能能保证在分布式环境下 ID 的全局唯一性。
适用操作
- 插入操作
- 删除操作
使用限制
-
需要生成全局唯一主键 ID
image.png
主要流程如下:
- 客户端执行创建请求,调用服务端接口。
- 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL 语句。
- 服务端将该条数据插入数据库中,如果插入成功则表示没有重复调用接口。如果抛出主键重复异常,则表示数据库中已经存在该条记录,返回错误信息到客户端。
数据库乐观锁
- 数据库乐观锁方案一般只能适用于执行更新操作的过程,我们可以提前在对应的数据表中多添加一个字段,充当当前数据的版本标识。
- 这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。
适用操作
- 更新操作
使用限制
- 需要数据库对应业务表中添加额外字段
为了每次执行更新时防止重复更新,确定更新的一定是要更新的内容,我们通常都会添加一个 version 字段记录当前的记录版本,这样在更新时候将该值带上,那么只要执行更新操作就能确定一定更新的是某个对应版本下的信息。
UPDATE my_table SET price=price+50,version=version+1 WHERE id=1 AND version=5
防重 Token 令牌
- 针对客户端连续点击或者调用方的超时重试等情况,例如提交订单,此种操作就可以用 Token 的机制实现防止重复提交。
- 简单的说就是调用方在调用接口的时候先向后端请求一个全局 ID(Token),请求的时候携带这个全局 ID 一起请求(Token 最好将其放到 Headers 中),后端需要对这个 Token 作为 Key,用户信息作为 Value 到 Redis 中进行键值内容校验,如果 Key 存在且 Value 匹配就执行删除命令,然后正常执行后面的业务逻辑。如果不存在对应的 Key 或 Value 不匹配就返回重复执行的错误信息,这样来保证幂等操作。
适用操作
- 插入操作
- 更新操作
- 删除操作
使用限制
- 需要生成全局唯一 Token 串
- 需要使用第三方组件 Redis 进行数据效验
- 服务端提供获取 Token 的接口,该 Token 可以是一个序列号,也可以是一个分布式 ID 或者 UUID 串。
- 客户端调用接口获取 Token,这时候服务端会生成一个 Token 串。
- 然后将该串存入 Redis 数据库中,以该 Token 作为 Redis 的键(注意设置过期时间)。
- 将 Token 返回到客户端,客户端拿到后应存到表单隐藏域中。
- 客户端在执行提交表单时,把 Token 存入到 Headers 中,执行业务请求带上该 Headers。
- 服务端接收到请求后从 Headers 中拿到 Token,然后根据 Token 到 Redis 中查找该 key 是否存在。
- 服务端根据 Redis 中是否存该 key 进行判断,如果存在就将该 key 删除,然后正常执行业务逻辑。如果不存在就抛异常,返回重复提交的错误信息。
注意,在并发情况下,执行 Redis 查找数据与删除需要保证原子性,否则很可能在并发下无法保证幂等性。其实现方法可以使用分布式锁或者使用 Lua 表达式来注销查询与删除操作。
下游传递唯一序列号
-
所谓请求序列号,其实就是每次向服务端请求时候附带一个短时间内唯一不重复的序列号,该序列号可以是一个有序 ID,也可以是一个订单号,一般由下游生成,在调用上游服务端接口时附加该序列号和用于认证的 ID。
-
当上游服务器收到请求信息后拿取该 序列号 和下游 认证ID 进行组合,形成用于操作 Redis 的 Key,然后
- 到 Redis 中查询是否存在对应的 Key 的键值对,根据其结果:
如果存在,就说明已经对该下游的该序列号的请求进行了业务处理,这时可以直接响应重复请求的错误信息。 - 如果不存在,就以该 Key 作为 Redis 的键,以下游关键信息作为存储的值(例如下游商传递的一些业务逻辑信息),将该键值对存储到 Redis 中 ,然后再正常执行对应的业务逻辑即可。
- 到 Redis 中查询是否存在对应的 Key 的键值对,根据其结果:
适用操作
- 插入操作
- 更新操作
- 删除操作
使用限制
- 要求第三方传递唯一序列号;
- 需要使用第三方组件 Redis 进行数据效验
主要流程
- 下游服务生成分布式 ID 作为序列号,然后执行请求调用上游接口,并附带唯一序列号与请求的认证凭据ID。
- 上游服务进行安全效验,检测下游传递的参数中是否存在序列号和凭据ID。
- 上游服务到 Redis 中检测是否存在对应的序列号与认证ID组成的 Key,如果存在就抛出重复执行的异常信息,然后响应下游对应的错误信息。如果不存在就以该序列号和认证ID组合作为 Key,以下游关键信息作为 Value,进而存储到 Redis 中,然后正常执行接来来的业务逻辑
上面步骤中插入数据到 Redis 一定要设置过期时间。这样能保证在这个时间范围内,如果重复调用接口,则能够进行判断识别。如果不设置过期时间,很可能导致数据无限量的存入 Redis,致使 Redis 不能正常工作。
怎么选
- 对于下单等存在唯一主键的,可以使用“唯一主键方案”的方式实现。
- 对于更新订单状态等相关的更新场景操作,使用“乐观锁方案”实现更为简单。
- 对于上下游这种,下游请求上游,上游服务可以使用“下游传递唯一序列号方案”更为合理。
- 类似于前端重复提交、重复下单、没有唯一ID号的场景,可以通过 Token 与 Redis 配合的“防重 Token 方案”实现更为快捷。
SpringBoot利用AOP防止请求重复提交
思路
- 自定义注解@NoRepeatSubmit 标记所有Controller中提交的请求。
- 通过AOP对所有标记了@NoRepeatSubmit 的方法进行拦截。
- 在业务方法执行前,获取当前用户的token或者JSessionId+当前请求地址,作为一个唯一的key,去获取redis分布式锁,如果此时并发获取,只有一个线程能获取到。
- 业务执行后,释放锁。
关于Redis分布式锁
- 使用Redis是为了在负载均衡部署,如果是单机的项目可以使用一个本地线程安全的Cache替代Redis
代码
- 自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoRepeatSubmit {
/**
* 设置请求锁定时间
*
* @return
*/
int lockTime() default 10;
}
- AOP
@Aspect
@Component
public class RepeatSubmitAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(RepeatSubmitAspect.class);
@Autowired
private RedisLock redisLock;
@Pointcut("@annotation(noRepeatSubmit)")
public void pointCut(NoRepeatSubmit noRepeatSubmit) {
}
@Around("pointCut(noRepeatSubmit)")
public Object around(ProceedingJoinPoint pjp, NoRepeatSubmit noRepeatSubmit) throws Throwable {
int lockSeconds = noRepeatSubmit.lockTime();
HttpServletRequest request = RequestUtils.getRequest();
Assert.notNull(request, "request can not null");
// 此处可以用token或者JSessionId
String token = request.getHeader("Authorization");
String path = request.getServletPath();
String key = getKey(token, path);
String clientId = getClientId();
boolean isSuccess = redisLock.tryLock(key, clientId, lockSeconds);
LOGGER.info("tryLock key = [{}], clientId = [{}]", key, clientId);
if (isSuccess) {
LOGGER.info("tryLock success, key = [{}], clientId = [{}]", key, clientId);
// 获取锁成功
Object result;
try {
// 执行进程
result = pjp.proceed();
} finally {
// 解锁
redisLock.releaseLock(key, clientId);
LOGGER.info("releaseLock success, key = [{}], clientId = [{}]", key, clientId);
}
return result;
} else {
// 获取锁失败,认为是重复提交的请求
LOGGER.info("tryLock fail, key = [{}]", key);
return new ApiResult(200, "重复请求,请稍后再试", null);
}
}
private String getKey(String token, String path) {
return token + path;
}
private String getClientId() {
return UUID.randomUUID().toString();
}
}
- 测试接口
@RestController
public class SubmitController {
@PostMapping("submit")
@NoRepeatSubmit(lockTime = 30)
public Object submit(@RequestBody UserBean userBean) {
try {
// 模拟业务场景
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ApiResult(200, "成功", userBean.userId);
}
public static class UserBean {
private String userId;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId == null ? null : userId.trim();
}
}
}
- 配置文件
server.port=8000
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=localhost
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
#spring.redis.password=yourpwd
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间
spring.redis.jedis.pool.max-wait=-1ms
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=8
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=5000ms
- 测试类(模拟测试)
@Component
public class RunTest implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(RunTest.class);
@Autowired
private RestTemplate restTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("执行多线程测试");
String url="http://localhost:8000/submit";
CountDownLatch countDownLatch = new CountDownLatch(1);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for(int i=0; i<10; i++){
String userId = "userId" + i;
HttpEntity request = buildRequest(userId);
executorService.submit(() -> {
try {
countDownLatch.await();
System.out.println("Thread:"+Thread.currentThread().getName()+", time:"+System.currentTimeMillis());
ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
System.out.println("Thread:"+Thread.currentThread().getName() + "," + response.getBody());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
countDownLatch.countDown();
}
private HttpEntity buildRequest(String userId) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", "yourToken");
Map<String, Object> body = new HashMap<>();
body.put("userId", userId);
return new HttpEntity<>(body, headers);
}