禅与计算机程序设计艺术JVM · Java虚拟机原理 · JVM上语言·框架· 生态系统Java 8 · Java 9 · Java X · Java 实践指北

分布式系统中的BASE 和 ACID、幂等性、分布式锁、分布式事

2020-04-23  本文已影响0人  光剑书架上的书

一、BASE 和 ACID

ACID

ACID 四项特性分别为:

原子性(A)。所有的系统都受惠于原子性操作。当我们考虑可用性的时候,没有理由去改变分区两侧操作的原子性。而且满足 ACID 定义的、高抽象层次的原子操作,实际上会简化分区恢复。

一致性(C)。ACID 的 C 指的是事务不能破坏任何数据库规则,如键的唯一性。与之相比,CAP 的 C 仅指单一副本这个意义上的一致性,因此只是 ACID 一致性约束的一个严格的子集。ACID 一致性不可能在分区过程中保持,因此分区恢复时需要重建 ACID 一致性。推而广之,分区期间也许不可能维持某些不变性约束,所以有必要仔细考虑哪些操作应该禁止,分区后又如何恢复这些不变性约束。

隔离性(I)。隔离是 CAP 理论的核心:如果系统要求 ACID 隔离性,那么它在分区期间最多可以在分区一侧维持操作。事务的可串行性(serializability)要求全局的通信,因此在分区的情况下不能成立。只要在分区恢复时进行补偿,在分区前后保持一个较弱的正确性定义是可行的。

持久性(D)。牺牲持久性没有意义,理由和原子性一样,虽然开发者有理由(持久性成本太高)选择 BASE 风格的软状态来避免实现持久性。这里有一个细节,分区恢复可能因为回退持久性操作,而无意中破坏某项不变性约束。但只要恢复时给定分区两侧的持久性操作历史记录,破坏不变性约束的操作还是可以被检测出来并修正的。通常来讲,让分区两侧的事务都满足 ACID 特性会使得后续的分区恢复变得更容易,并且为分区恢复时事务的补偿工作奠定了基本的条件。

BASE, “Basically Available, Soft state, Eventually consistent(基本可用、软状态、最终一致性)”.

CAP理论

2000年7月,Eric Brewer教授提出CAP猜想;2年后,Seth Gilbert和Nancy Lynch从理论上证明了CAP;之后,CAP理论正式成为分布式计算领域的公认定理。 CAP定律说的是在一个分布式计算机系统中,一致性,可用性和分区容错性这三种保证无法同时得到满足,最多满足两个。 CAP:C :Consistency(一致性)A:(Availability)可用性P:(Partition Tolerance)分区容错性

让我们构造一个非常简单的分布式系统。

Consistency(一致性)

Gilbert and Lynch 这样描述的一致性.

any read operation that begins after a write operation completes must return that value, or the result of a later write operation

在写操作完成之后的任何读操作都必须返回该值。

客户端向G1服务器发起一个写操作,把变量初始值v0 改为v1,接下来客户端可能向节点G1读取也可能向节点G2读取;

  1. 如果同步完成 ,那么读到的结果是v1,这样也满足了一致性
  1. 还未同步完成,这是G2还是v0,这就不满足一致性。
(Partition Tolerance)分区容错

Gilbert and Lynch 这样描述的分区容错.

The network will be allowed to lose arbitrarily many messages sent from one node to another. 从一个节点发送到另外一个节点过程中,允许丢失任意多的消息.

在分布式环境中,节点之间的通信可能出现问题,整个系统就产生所谓的分区。所以我们在设计的时候需要考虑这种情况;剩下来的 A和C满足好,我们就可以说我们的系统有很好的分区容错性。

(Availability)可用性

Gilbert and Lynch 对 availability的描述原文. every request received by a non-failing node in the system must result in a response 系统中非失败节点收到的每个请求都必须导致响应 在可用性系统中,只要服务器没有奔溃,客户端发送请求,服务器必须返回一个相应给客户端。

为什么要CAP不能同时满足?

通过上述的定义和描述知道分区无法避免,p总是要考虑的。为什么c和a无法同时做到呢?其实都是分区惹的祸。

如果我们保证一致性;那么G1写入操作之后,必须保证数据同步给G2之后,G2才能对外提供响应,这显然就没有可用性了。

反之我们保证可用性,那就没法保证一致性。在分布式系统中,我们一般会选择AP而牺牲一致性。牺牲并不意味着不关心一致性,而是首先满足A和P,稍后再解决C的问题。

CAP 理论主张任何基于网络的数据共享系统,都最多只能拥有以下三条中的两条:

数据一致性(C),等同于所有节点访问同一份最新的数据副本;
对数据更新具备高可用性(A);
能容忍网络分区(P)。

于是有了BASE理论。

BASE 理论

eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(StrongConsistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。

(Basically Available)基本可用

在分布式系统出现故障的时候,允许损失部分可用性,即保证核心可用。

(Soft State)软状态

接受一段时间的状态不同步,及中间状态,而改中间状态不影响系统整体可用性。这里的中间状态就是CAP理论中的数据不一致性。

(Eventually Consistent)最终一致性

上面说软状态,然后不可能一直是软状态,必须有个时间期限。在期限过后系统能够保证在没有其他新的更新操作的情况下,数据最终一定能够达到一致的状态,因此所有客户端对系统的数据访问最终都能够获取到最新的值。

总结

CAP是分布式系统设计理论,BASE是CAP理论中AP方案的延伸,对于C我们采用的方式和策略就是保证最终一致性;

二、什么是分布式系统幂等性?

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。

在编程中.一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“getUsername()和setTrue()”函数就是一个幂等函数.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.

什么是幂等性(Idempotence)?

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

——HTTP/1.1规范中幂等性的定义

从定义上看,HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用。说白了就是,同一个请求,发送一次和发送N次效果是一样的!幂等性是分布式系统设计中十分重要的概念,而HTTP的分布式本质也决定了它在HTTP中具有重要地位。下面将以HTTP中的幂等性做例子加以介绍。

简单示例

假设有一个从账户取钱的远程API(可以是HTTP的,也可以不是),我们暂时用类函数的方式记为:

bool withdraw(account_id, amount)

withdraw的语义是从account_id对应的账户中扣除amount数额的钱;如果扣除成功则返回true,账户余额减少amount;如果扣除失败则返回false,账户余额不变。

值得注意的是:和本地环境相比,我们不能轻易假设分布式环境的可靠性

所以问题来了,一种典型的情况是withdraw请求已经被服务器端正确处理,但服务器端的返回结果由于网络等原因被掉丢了,导致客户端无法得知处理结果。如果是在网页上,一些不恰当的设计可能会使用户认为上一次操作失败了,然后刷新页面,这就导致了withdraw被调用两次,账户也被多扣了一次钱。如图所示:

解决方案一:采用分布式事务,通过引入支持分布式事务的中间件来保证withdraw功能的事务性。分布式事务的优点是对于调用者很简单,复杂性都交给了中间件来管理。缺点则是一方面架构太重量级,容易被绑在特定的中间件上,不利于异构系统的集成;另一方面分布式事务虽然能保证事务的ACID性质,而但却无法提供性能和可用性的保证。

解决方案二:幂等设计。我们可以通过一些技巧把withdraw变成幂等的,比如:

int create_ticket() 
bool idempotent_withdraw(ticket_id, account_id, amount)

create_ticket的语义是获取一个服务器端生成的唯一的处理号ticket_id,它将用于标识后续的操作

idempotent_withdrawwithdraw的区别在于关联了一个ticket_id,一个ticket_id表示的操作至多只会被处理一次,每次调用都将返回第一次调用时的处理结果。这样,idempotent_withdraw就符合幂等性了,客户端就可以放心地多次调用。

基于幂等性的解决方案中一个完整的取钱流程被分解成了两个步骤:

1.调用create_ticket()获取ticket_id
2.调用idempotent_withdraw(ticket_id, account_id, amount)

虽然create_ticket不是幂等的,但在这种设计下,它对系统状态的影响可以忽略,加上idempotent_withdraw是幂等的,所以任何一步由于网络等原因失败或超时,客户端都可以重试,直到获得结果。如图所示:

和分布式事务相比,幂等设计的优势在于它的轻量级,容易适应异构环境,以及性能和可用性方面。在某些性能要求比较高的应用,幂等设计往往是唯一的选择。

HTTP的幂等性

下面以HTTP GET、DELETE、PUT、POST四种方法为主进行语义和幂等性的介绍。

HTTP GET方法用于获取资源,不应有副作用,所以是幂等的。比如:GET http://www.bank.com/account/123456,不会改变资源的状态,不论调用一次还是N次都没有副作用。请注意,这里强调的是一次和N次具有相同的副作用,而不是每次GET的结果相同。GET http://www.news.com/latest-news这个HTTP请求可能会每次得到不同的结果,但它本身并没有产生任何副作用,因而是满足幂等性的。

HTTP DELETE方法用于删除资源,有副作用,但它应该满足幂等性。比如:DELETE http://www.forum.com/article/4231,调用一次和N次对系统产生的副作用是相同的,即删掉id为4231的帖子;因此,调用者可以多次调用或刷新页面而不必担心引起错误。

HTTP POST方法用于创建资源,所对应的URI并非创建的资源本身,而是去执行创建动作的操作者,有副作用,不满足幂等性。比如:POST http://www.forum.com/articles的语义是在http://www.forum.com/articles下创建一篇帖子,HTTP响应中应包含帖子的创建状态以及帖子的URI。两次相同的POST请求会在服务器端创建两份资源,它们具有不同的URI;所以,POST方法不具备幂等性。

HTTP PUT方法用于创建或更新操作,所对应的URI是要创建或更新的资源本身,有副作用,它应该满足幂等性。比如:PUT http://www.forum/articles/4231的语义是创建或更新ID为4231的帖子。对同一URI进行多次PUT的副作用和一次PUT是相同的;因此,PUT方法具有幂等性。

对前文示例进行改进

利用Web API的形式实现前面所提到的取款功能。

1、用POST /tickets来实现create_ticket;

2、用PUT /accounts/account_id/ticket_id&amount=xxx来实现idempotent_withdraw。

值得注意的是严格来讲amount参数不应该作为URI的一部分,真正的URI应该是/accounts/account_id/ticket_id,而amount应该放在请求的body中。这种模式可以应用于很多场合,比如:论坛网站中防止意外的重复发帖。

如何防范 POST 重复提交?

HTTP POST 操作既不是安全的,也不是幂等的(至少在HTTP规范里没有保证)。平时开发的项目中可能会出现下面这些情况:

由于用户误操作,多次点击表单提交按钮。
由于网速等原因造成页面卡顿,用户重复刷新提交页面。
黑客或恶意用户使用postman等工具重复恶意提交表单(攻击网站)。
这些情况都会导致表单重复提交,造成数据重复,增加服务器负载,严重甚至会造成服务器宕机。因此有效防止表单重复提交有一定的必要性。

解决方案

通过JavaScript屏蔽提交按钮(不推荐)

通过js代码,当用户点击提交按钮后,屏蔽提交按钮使用户无法点击提交按钮或点击无效,从而实现防止表单重复提交。

ps:js代码很容易被绕过。比如用户通过刷新页面方式,或使用postman等工具绕过前段页面仍能重复提交表单。因此不推荐此方法。

    <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
        <!DOCTYPE HTML>
        <html>
        <head>
         <title>表单</title>
            <script type="text/javascript">
            //默认提交状态为false
            var commitStatus = false;
            function dosubmit(){
                  if(commitStatus==false){
                //提交表单后,讲提交状态改为true
                  commitStatus = true;
                  return true;
                 }else{
                  return false;
              }
             }
      </script>
     </head>
   
        <body>
            <form action="/path/post" onsubmit="return dosubmit()" method="post">
             用户名:<input type="text" name="username">
            <input type="submit" value="提交" id="submit">
            </form>
        </body>
    </html>

给数据库增加唯一键约束(简单粗暴)

在数据库建表的时候在ID字段添加主键约束,用户名、邮箱、电话等字段加唯一性约束。确保数据库只可以添加一条数据。

数据库加唯一性约束sql:

alter table tableName_xxx add unique key uniq_xxx(field1, field2)

服务器及时捕捉插入数据异常:

        try {
                xxxMapper.insert(user);
            } catch (DuplicateKeyException e) {
                logger.error("user already exist");
            }

通过数据库加唯一键约束能有效避免数据库重复插入相同数据。但无法阻止恶意用户重复提交表单(攻击网站),服务器大量执行sql插入语句,增加服务器和数据库负荷。

利用Session防止表单重复提交(推荐)

实现原理:

服务器返回表单页面时,会先生成一个subToken保存于session,并把该subToen传给表单页面。当表单提交时会带上subToken,服务器拦截器Interceptor会拦截该请求,拦截器判断session保存的subToken和表单提交subToken是否一致。若不一致或session的subToken为空或表单未携带subToken则不通过。

首次提交表单时session的subToken与表单携带的subToken一致走正常流程,然后拦截器内会删除session保存的subToken。当再次提交表单时由于session的subToken为空则不通过。从而实现了防止表单重复提交。

使用:

mvc配置文件加入拦截器配置

<mvc:interceptors>
<mvc:interceptor>
<mvc:mapping path="/**"/>
<bean class="xxx.xxx.interceptor.AvoidDuplicateSubmissionInterceptor"/>
</mvc:interceptor>
</mvc:interceptors>

拦截器

package xxx.xxxx.interceptor;

import xxx.xxx.SubToken;
import org.apache.struts.util.TokenProcessor;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Method;

public class AvoidDuplicateSubmissionInterceptor extends
        HandlerInterceptorAdapter {

    public AvoidDuplicateSubmissionInterceptor() {
    }

    @Override
    public boolean preHandle(HttpServletRequest request,
                             HttpServletResponse response, Object handler) throws Exception {
        if (handler instanceof HandlerMethod) {
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Method method = handlerMethod.getMethod();
            SubToken annotation = method
                    .getAnnotation(SubToken.class);
            if (annotation != null) {
                boolean needSaveSession = annotation.saveToken();
                if (needSaveSession) {
                    request.getSession(false)
                            .setAttribute(
                                    "subToken",
                                    TokenProcessor.getInstance().generateToken(
                                            request));
                }

                boolean needRemoveSession = annotation.removeToken();
                if (needRemoveSession) {
                    if (isRepeatSubmit(request)) {
                        return false;
                    }
                    request.getSession(false).removeAttribute("subToken");
                }
            }
        }
        return true;
    }

    private boolean isRepeatSubmit(HttpServletRequest request) {
        String serverToken = (String) request.getSession(false).getAttribute(
                "subToken");
        if (serverToken == null) {
            return true;
        }
        String clinetToken = request.getParameter("subToken");
        if (clinetToken == null) {
            return true;
        }
        if (!serverToken.equals(clinetToken)) {
            return true;
        }
        return false;
    }
}  

控制层 controller

@RequestMapping("/form")
//开启一个Token
@SubToken(saveToken = true)
public String form() {
  return "/test/form";
}


@RequestMapping(value = "/postForm", method = RequestMethod.POST)
@ResponseBody
//开启Token验证,并且成功之后移除当前Token
@SubToken(removeToken = true)
public String postForm(String userName) {
  System.out.println(System.currentTimeMillis());
  try{
    System.out.println(userName);
    Thread.sleep(1500);//暂停1.5秒后程序继续执行
  }catch (InterruptedException e) {
    e.printStackTrace();
 }
 System.out.println(System.currentTimeMillis());
 return "1";
}

表单页面

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<title>Title</title>
</head>
<body>
<form method="post" action="/postForm">
<input type="text" name="userName">
<input type="hidden" name="subToken" value="${subToken}">
<input type="submit" value="提交">
</form>
</body>
</html>

使用AOP自定义切入实现(分布式锁)

实现原理:

自定义防止重复提交标记(@AvoidRepeatableCommit)。
对需要防止重复提交的Congtroller里的mapping方法加上该注解。
新增Aspect切入点,为@AvoidRepeatableCommit加入切入点。
每次提交表单时,Aspect都会保存当前 key 到 redis(须设置过期时间)。

重复提交时Aspect会判断当前redis是否有该key,若有则拦截。

自定义注解:

        import java.lang.annotation.*;
        
        /**
         * 避免重复提交
         * @author hhz
         * @version
         * @since
         */
        @Target(ElementType.METHOD)
        @Retention(RetentionPolicy.RUNTIME)
        public @interface AvoidRepeatableCommit {
        
            /**
             * 指定时间内不可重复提交,单位毫秒
             * @return
             */
            long timeout()  default 30000 ;
        
        }
自定义切入点Aspect

        /**
         * 重复提交aop
         * @author hhz
         * @version 
         * @since 
         */
        @Aspect
        @Component
        public class AvoidRepeatableCommitAspect {
        
            @Autowired
            private RedisTemplate redisTemplate;
        
            /**
             * @param point
             */
            @Around("@annotation(com.xwolf.boot.annotation.AvoidRepeatableCommit)")
            public Object around(ProceedingJoinPoint point) throws Throwable {
        
                HttpServletRequest request  = ((ServletRequestAttributes)RequestContextHolder.currentRequestAttributes()).getRequest();
                String ip = IPUtil.getIP(request);
                //获取注解
                MethodSignature signature = (MethodSignature) point.getSignature();
                Method method = signature.getMethod();
                //目标类、方法
                String className = method.getDeclaringClass().getName();
                String name = method.getName();
                String ipKey = String.format("%s#%s",className,name);
                int hashCode = Math.abs(ipKey.hashCode());
                String key = String.format("%s_%d",ip,hashCode);
                log.info("ipKey={},hashCode={},key={}",ipKey,hashCode,key);
                AvoidRepeatableCommit avoidRepeatableCommit =  method.getAnnotation(AvoidRepeatableCommit.class);
                long timeout = avoidRepeatableCommit.timeout();
                if (timeout < 0){
                                //过期时间5分钟
                    timeout = 60*5;
                }
                String value = (String) redisTemplate.opsForValue().get(key);
                if (StringUtils.isNotBlank(value)){
                    return "请勿重复提交";
                }
                redisTemplate.opsForValue().set(key, UUIDUtil.uuid(),timeout,TimeUnit.MILLISECONDS);
                //执行方法
                Object object = point.proceed();
                return object;
            }
        
        }

三、关于分布式锁

什么是分布式锁?

什么是锁?

在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。
而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。这个标记可以理解为锁。
不同地方实现锁的方式也不一样,只要能满足所有线程都能看得到标记即可。如 Java 中 synchronize 是在对象头设置标记,Lock 接口的实现类基本上都只是某一个 volitile 修饰的 int 型变量其保证每个线程都能拥有对该 int 的可见性和原子修改,linux 内核中也是利用互斥量或信号量等内存数据做标记。
除了利用内存数据做锁其实任何互斥的都能做锁(只考虑互斥情况),如流水表中流水号与时间结合做幂等校验可以看作是一个不会释放的锁,或者使用某个文件是否存在作为锁等。只需要满足在对标记进行修改能保证原子性和内存可见性即可。

什么是分布式?

分布式的 CAP 理论告诉我们:

任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。

目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。

分布式场景

此处主要指集群模式下,多个相同服务同时开启.

在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,但是在分布式环境下,就没有那么简单啦。

分布式与单机情况下最大的不同在于其不是多线程而是多进程。
多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。

什么是分布式锁?

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(我觉得分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠。。。一个大坑)
分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。

分布式锁应该具备的条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

分布式锁的实现方式:

基于数据库实现分布式锁;
基于缓存(Redis等)实现分布式锁;
基于Zookeeper实现分布式锁;

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

Memcached:利用 Memcached 的 add 命令。此命令是原子性操作,只有在 key 不存在的情况下,才能 add 成功,也就意味着线程得到了锁。
Redis:和 Memcached 的方式类似,利用 Redis 的 setnx 命令。此命令同样是原子性操作,只有在 key 不存在的情况下,才能 set 成功。
Zookeeper:利用 Zookeeper 的顺序临时节点,来实现分布式锁和等待队列。Zookeeper 设计的初衷,就是为了实现分布式锁服务的。
Chubby:Google 公司实现的粗粒度分布式锁服务,底层利用了 Paxos 一致性算法。

基于数据库实现分布式锁

基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

(1)创建一个表:

DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
  `desc` varchar(255) NOT NULL COMMENT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

(2)想要执行某个方法,就使用这个方法名向表中插入数据:

INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:

delete from method_lock where method_name ='methodName';

注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!

使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:

1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;

2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;

3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;

4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

基于Redis实现分布式锁

选用Redis实现分布式锁原因
Redis有很高的性能
Redis命令对此支持较好,实现起来比较方便
在此就不介绍Redis的安装了,具体在Linux和Windows中的安装可以查看我前面的博客。
http://www.cnblogs.com/liuyang0/p/6504826.html

使用命令介绍
SETNX
SETNX key val
当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。

expire
expire key timeout
为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。

delete
delete key
删除key

在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

实现
使用的是jedis来连接Redis。

实现思想
获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
分布式锁的核心代码如下:

package redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.util.List;
import java.util.UUID;


public class DistributedLock {
    private final JedisPool jedisPool;

    public DistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     * @param locaName  锁的key
     * @param acquireTimeout  获取超时时间
     * @param timeout   锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String locaName,
                                  long acquireTimeout, long timeout) {
        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即key值
            String lockKey = "lock:" + locaName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int)(timeout / 1000);

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     * @param lockName 锁的key
     * @param identifier    释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            conn = jedisPool.getResource();
            while (true) {
                // 监视lock,准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}

基于ZooKeeper实现分布式锁

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

ZooKeeper的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机。ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。

基于ZooKeeper分布式锁的流程
在zookeeper指定节点(locks)下创建临时顺序节点node_n
获取locks下所有子节点children
对子节点按节点自增序号从小到大排序
判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
若监听事件生效,则回到第二步重新进行判断,直到获取到锁
具体实现
下面就具体使用java和zookeeper实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。

通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
以下整体流程基本与上述描述流程一致,只是在监听的时候使用的是CountDownLatch来监听前一个节点。

zookeeper 分布式锁源代码:

package zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;


public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk = null;
    // 根节点
    private String ROOT_LOCK = "/locks";
    // 竞争的资源
    private String lockName;
    // 等待的前一个锁
    private String WAIT_LOCK;
    // 当前锁
    private String CURRENT_LOCK;
    // 计数器
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 30000;
    private List<Exception> exceptionList = new ArrayList<Exception>();

    /**
     * 配置分布式锁
     * @param config 连接的url
     * @param lockName 竞争资源
     */
    public DistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            // 连接zookeeper
            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(ROOT_LOCK, false);
            if (stat == null) {
                // 如果根节点不存在,则创建根节点
                zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    // 节点监视器
    public void process(WatchedEvent event) {
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
    }

    public void lock() {
        if (exceptionList.size() > 0) {
            throw new LockException(exceptionList.get(0));
        }
        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
                return;
            } else {
                // 等待锁
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new LockException("锁名有误");
            }
            // 创建临时有序节点
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " 已经创建");
            // 取所有子节点
            List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
            // 取出所有lockName的锁
            List<String> lockObjects = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) {
                    lockObjects.add(node);
                }
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK);
            // 若当前节点为最小节点,则获取锁成功
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }

            // 若不是最小节点,则找到自己的前一个节点
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    // 等待锁
    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev);
            this.countDownLatch = new CountDownLatch(1);
            // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " 等到了锁");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println("释放锁 " + CURRENT_LOCK);
            zk.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }


    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

四、分布式事务与异步消息处理

一、分布式事务前奏

二、柔性事务解决方案架构

在电商领域等互联网场景下,传统的事务在数据库性能和处理能力上都暴露出了瓶颈。柔性事务有两个特性:基本可用和柔性状态。所谓基本可用是指分布式系统出现故障的时候允许损失一部分的可用性。柔性状态是指允许系统存在中间状态,这个中间状态不会影响系统整体的可用性,比如数据库读写分离的主从同步延迟等。柔性事务的一致性指的是最终一致性。

(一)、基于可靠消息的最终一致性方案概述

(二)、TCC事务补偿型方案

(三)、最大努力通知型

三、基于可靠消息的最终一致性方案详解

(一)、消息发送一致性

消息中间件在分布式系统中的核心作用就是异步通讯、应用解耦和并发缓冲(也叫作流量削峰)。在分布式环境下,需要通过网络进行通讯,就引入了数据传输的不确定性,也就是CAP理论中的分区容错性。

消息发送一致性是指产生消息的业务动作与消息发送一致,也就是说如果业务操作成功,那么由这个业务操作所产生的消息一定要发送出去,否则就丢失。

处理方式一

public void completeOrderService() {
    // 处理订单
    order.process();

    // 发送会计原始凭证消息
    pipe.sendAccountingVouchetMessage();
}

在上面的情况中,如果业务操作成功,执行的消息发送之前应用发生故障,消息发送不出去,导致消息丢失,将会产生订单系统与会计系统的数据不一致。如果消息系统或者网络异常,也会导致消息发送不出去,也会造成数据不一致。

处理方式二

public void completeOrderService() {
    // 发送会计原始凭证消息
    pipe.sendAccountingVouchetMessage();

    // 处理订单
    order.process();
}

如果将上面的两个操作调换一下顺序,这种情况就会更加不可控了,消息发出去了业务订单可能会失败,会造成订单系统与业务系统的数据不一致。那么JMS标准中的XA协议是否可以保障发送的一致性?

(二)、保证消息一致的变通做法

  1. 发送消息:主动方现将应用把消息发给消息中间件,消息状态标记为“待确认”状态。
  2. 消息中间件收到消息后,把消息持久化到消息存储中,但是并不影响被动方投递消息。
  3. 消息中间件返回消息持久化结果,主动方根据返回的结果进行判断如何进行业务操作处理:
    1. 失败:放弃执行业务操作处理,结束,必要时向上层返回处理结果。
    2. 成功:执行业务操作处理。
  4. 业务操作完成后,把业务操作结果返回给消息中间件。
  5. 消息中间件收到业务操作结构后,根据业务结果进行处理:
    1. 失败:删除消息存储中的消息,结束。
    2. 成功:更新消息存储中的消息状态为“待发送”,然后执行消息投递。
  6. 前面的正向流程都成功之后,向被动方应用投递消息。

但是在上面的处理流程中,任何一个环节都有可能出现问题。

(三)、常规MQ消息处理流程和特点

(四)、消息重复发送问题和业务接口幂等性设计

image

对于未确认的消息,采用按规则重新投递的方式进行处理。对于以上流程,消息重复发送会导致业务处理接口出现重复调用的问题。消息消费过程中消息重复发送的主要原因就是消费者成功接收处理完消息后,消息中间件没有及时更新投递状态导致的。如果允许消息重复发送,那么消费方应该实现业务接口的幂等性设计。

(五)、本地消息服务方案

(六)、独立消息服务方案

(七)、消息服务子系统的设计实现

示例消息数据表:

名称 数据类型 允许空 默认值 属性 释义
uuid varchar(50) No unique UUID
version int(11) No 0 版本号
editer varchar(100) Yes NULL 修改者
creater varchar(100) Yes NULL 创建者
edit_time datetime Yes 0000-00-00 00:00:00 最后修改时间
create_time datetime No 0000-00-00 00:00:00 创建时间
msg_id varchar(50) No 消息ID
msg_body longtext No 消息内容
msg_date_type varchar(50) Yes 消息数据类型
consumer_queue varchar(100) No 消费队列
send_times int(6) No 0 消息重发次数
is_dead varchar(20) No 是否死亡
status varchar(20) No 状态
remark varchar(200) Yes 备注
field0 varchar(200) Yes 扩展字段0
field1 varchar(200) Yes 扩展字段1
field2 varchar(200) Yes 扩展字段2

参考资料

https://blog.csdn.net/wuzhiwei549/article/details/80692278
https://www.i3geek.com/archives/841
https://www.cnblogs.com/seesun2012/p/9214653.html
https://github.com/yangliu0/DistributedLock
https://www.cnblogs.com/liuyang0/p/6744076.html
https://www.cnblogs.com/liuyang0/p/6800538.html
https://mwhittaker.github.io/blog/an_illustrated_proof_of_the_cap_theorem/
https://www.infoq.cn/article/cap-twelve-years-later-how-the-rules-have-changed
https://www.cnblogs.com/bluemiaomiao/p/11216380.html


Kotlin开发者社区

专注分享 Java、 Kotlin、Spring/Spring Boot、MySQL、redis、neo4j、NoSQL、Android、JavaScript、React、Node、函数式编程、编程思想、"高可用,高性能,高实时"大型分布式系统架构设计主题。

High availability, high performance, high real-time large-scale distributed system architecture design

分布式框架:Zookeeper、分布式中间件框架等
分布式存储:GridFS、FastDFS、TFS、MemCache、redis等
分布式数据库:Cobar、tddl、Amoeba、Mycat
云计算、大数据、AI算法
虚拟化、云原生技术
分布式计算框架:MapReduce、Hadoop、Storm、Flink等
分布式通信机制:Dubbo、RPC调用、共享远程数据、消息队列等
消息队列MQ:Kafka、MetaQ,RocketMQ
怎样打造高可用系统:基于硬件、软件中间件、系统架构等一些典型方案的实现:HAProxy、基于Corosync+Pacemaker的高可用集群套件中间件系统
Mycat架构分布式演进
大数据Join背后的难题:数据、网络、内存和计算能力的矛盾和调和
Java分布式系统中的高性能难题:AIO,NIO,Netty还是自己开发框架?
高性能事件派发机制:线程池模型、Disruptor模型等等。。。

合抱之木,生于毫末;九层之台,起于垒土;千里之行,始于足下。不积跬步,无以至千里;不积小流,无以成江河。

上一篇下一篇

猜你喜欢

热点阅读