分布式事务设计

2020-11-09  本文已影响0人  dylan丶QAQ

1. 对于分布式事务的整体理解

在传统业务中,我们的数据库都是单机,数据库本身就提供ACID,由于业务量激增我们将数据分到多个库中,对数据库进行分库分表设计,一个逻辑表数据被分到多个数据库中,这个时候单个库的ACID就无法管理两个数据库了

这个时候就需要理解和掌握分布式事务的概念了

2. CAP原理解析

CAP原理指出:这3个指标不可能同时满足,最多满足其中两个

我们之所以把系统架构设计成分布式系统,就是为了在某个节点不可用的情况,整个服务队外可用。这样就满足了P,如果不满足P,就一定不是一个分布式系统了

image-20200318224059218.png

A和B是两个数据节点,A向B同步数据,并且作为一个整体对外提供服务,首先满足P了

A和B同步的过程中如果同步不能保证,那么这个结构就是一个AP结构

如果我们满足C,也就是说client无论访问A还是B得到的结果都是一致的,如果A和B数据同步出现延迟,只能确保A和B数据同步完成后才能对外提供服务,这个时候A就保证不了,这就是一个CP结构

3. ACID原理和BASE原理

A:原子性

C:一致性

I:隔离性

D:持久性

ACID是强一致性,ACID在CAP的原理中保证的是CA,因此对于ACID也在向BASE转变

BASE是什么呢?

Basically Available(基本可用):分布式系统在出现故障后,允许损失部分可用性,保证核心可用,比如电商大促,服务降级的体现

Soft-state(软状态):允许系统存在中间状态,中间状态是不会影响系统整体的可用性,比如到达各个分片的数据同步状态,允许这些数据有一段时间不同步

Eventually consistent(最终一致):指系统中所有数据副本在经过一定时间后,最终达到一致的状态

BASE模型是ACID的反面,不同于ACID,BASE强调牺牲高一致性,从而获得可用性,数据允许一段时间不一致但最终一致就可以了

在分布式事务中都是依赖ACID或BASE模型实现的

4. 分布式事务涉及的问题分析

单一数据库结构中

分布式架构中

解决分布式事务的方案:

5. 通过XA协议实现两阶段提交

第一个阶段:prepare

image-20200321202031686.png

TM告诉所有的RM进行准备,如果都返回ok了,才进行commit,一旦在准备阶段出现任何问题都会返回错误给到TM,TM就会回滚并结束操作

第二阶段:commit

image-20200321202234509.png

到commit阶段如果第二个RM出现问题没有提交成功会返回一个状态到TM,而第一个RM已经完成的commit就不能回滚了,这个时候就需要人工介入了

XA协议的两阶段提交总结:

6. 使用Atomikos进行分布式事务

先准备环境:两台MySQLv5.7数据库

导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

数据源配置代码加上了TM管理器

package com.icodingedu.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;

@Configuration
public class DB195Config {

    @Bean("db195")
    public DataSource db195(){
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUser("gavin");
        mysqlXADataSource.setPassword("123456");
        mysqlXADataSource.setUrl("jdbc:mysql://39.100.17.31:3306/user_195");

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        return atomikosDataSourceBean;
    }
        //这里就相当于一个TM,只加在一个config里就行
    @Bean("xaTransaction")
    public JtaTransactionManager jtaTransactionManager(){
        UserTransaction userTransaction = new UserTransactionImp();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        return new JtaTransactionManager(userTransaction,userTransactionManager);
    }
}

另一个数据源

package com.icodingedu.config;

import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class DB197Config {

    @Bean("db197")
    public DataSource db197(){
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUser("gavin");
        mysqlXADataSource.setPassword("123456");
        mysqlXADataSource.setUrl("jdbc:mysql://39.100.19.243:3306/user_197");

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        return atomikosDataSourceBean;
    }
}

调用代码

package com.icodingedu.service;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;

@Service
public class DBService {

    @Transactional(transactionManager = "xaTransaction")
    public void insertTest(@Qualifier("db195")DataSource dataSource195,
                           @Qualifier("db197")DataSource dataSource197){
        JdbcTemplate jdbc195 = new JdbcTemplate(dataSource195);
        String sql1 = "insert into user_info(id,username) values(1,'gavin1')";
        int i = jdbc195.update(sql1);
        System.out.println("**************影响的行数:"+i);

        JdbcTemplate jdbc197 = new JdbcTemplate(dataSource197);
        String sql2 = "insert into user_info(id,username) values(1,'gavin1')";
        int i1 = jdbc197.update(sql2);
        System.out.println("**************影响的行数:"+i1);
    }
}

测试代码

import com.icodingedu.service.DBService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import javax.sql.DataSource;

@SpringBootTest
class DistributeTransactionsApplicationTests {

    @Autowired
    DBService dbService;

    @Test
    void contextLoads(@Qualifier("db195") DataSource dataSource195,
                      @Qualifier("db197")DataSource dataSource197) {
        dbService.insertTest(dataSource195,dataSource197);
    }
}

7. MyCat实现分布式事务

默认客户端是不进行分布式事务的,需要自己手动开启分布式事务

set autocommit=0;
set xa=on;
insert into user_info(id,username) values(1,'gavin1'),(5000001,'gavin2');
commit;

通过MyBatis实现客户端调用

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

POJO

package com.icodingedu.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserInfo {
    private int id;
    private String username;
}

mapper

package com.icodingedu.mapper;

import com.icodingedu.pojo.UserInfo;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

@Mapper
@Repository
public interface UserMapper {
    int addUser(UserInfo userInfo);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.UserMapper">
    <insert id="addUser" parameterType="com.icodingedu.pojo.UserInfo">
        insert into user_info(id,username) values(#{id},#{username})
    </insert>
</mapper>

service

package com.icodingedu.service;

import com.icodingedu.mapper.UserMapper;
import com.icodingedu.pojo.UserInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class MyCatService {

    @Autowired
    UserMapper userMapper;

    @Transactional(rollbackFor = Exception.class)
    public void addUser(){
        UserInfo userInfo1 = new UserInfo();
        userInfo1.setId(1);
        userInfo1.setUsername("gavin1");
        userMapper.addUser(userInfo1);

        UserInfo userInfo2 = new UserInfo();
        userInfo2.setId(5000001);
        userInfo2.setUsername("gavin2");
        userMapper.addUser(userInfo2);
    }
}

测试

package com.icodingedu;

import com.icodingedu.service.MyCatService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MycatMybatisApplicationTests {

    @Autowired
    MyCatService myCatService;

    @Test
    void contextLoads() {
        myCatService.addUser();
    }

}

8. Sharding-Jdbc实现分布式事务

Sharding-Jdbc是自动支持分布式事务的,多个执行数据库的内容,只需要放在一个方法里,通过@Transactional(rollbackFor = Exception.class)设置即可实现

9. 事务补偿机制TCC分析

TCC分别对应Try、Confirm和Cancel三种操作实现的

image-20200321215604676.png

注意的点:

TCC的特点

TCC属于应用层的一种补偿机制,需要大量的代码开发,对与开发人员要求比较高

10. 事务补偿机制代码实现

数据源config配置

DB195Config

package com.icodingedu.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlDataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;

@Configuration
@MapperScan(value = "com.icodingedu.mapper.db195",sqlSessionFactoryRef = "factoryBean195")
public class DB195Config {

    @Bean("db195")
    public DataSource db195(){
        MysqlDataSource mysqlDataSource = new MysqlDataSource();
        mysqlDataSource.setUser("gavin");
        mysqlDataSource.setPassword("123456");
        mysqlDataSource.setUrl("jdbc:mysql://39.100.17.31:3306/user_195");
        return mysqlDataSource;
    }

    @Bean("factoryBean195")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db195") DataSource dataSource) throws Exception{
        SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);

        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();

        sessionFactoryBean.setMapperLocations(patternResolver.getResources("mybatis/db195/*.xml"));
        return sessionFactoryBean;
    }

    @Bean("tm195")
    public PlatformTransactionManager platformTransactionManager(@Qualifier("db195") DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }
}

DB197Config

package com.icodingedu.config;

import com.mysql.cj.jdbc.MysqlDataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(value = "com.icodingedu.mapper.db197",sqlSessionFactoryRef = "factoryBean197")
public class DB197Config {

    @Bean("db197")
    public DataSource db197(){
        MysqlDataSource mysqlDataSource = new MysqlDataSource();
        mysqlDataSource.setUser("gavin");
        mysqlDataSource.setPassword("123456");
        mysqlDataSource.setUrl("jdbc:mysql://39.100.19.243:3306/user_197");
        return mysqlDataSource;
    }

    @Bean("factoryBean197")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db197") DataSource dataSource) throws Exception{
        SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);

        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();

        sessionFactoryBean.setMapperLocations(patternResolver.getResources("mybatis/db197/*.xml"));
        return sessionFactoryBean;
    }

    @Bean("tm197")
    public PlatformTransactionManager platformTransactionManager(@Qualifier("db197") DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }
}

195Mapper

package com.icodingedu.mapper.db195;

import com.icodingedu.pojo.UserInfo195;
import org.springframework.stereotype.Repository;

@Repository
public interface UserMapper195 {
    int update(UserInfo195 userInfo195);
    UserInfo195 queryById(int id);
}

197Mapper

package com.icodingedu.mapper.db197;

import com.icodingedu.pojo.UserInfo197;
import org.springframework.stereotype.Repository;

@Repository
public interface UserMapper197 {
    int update(UserInfo197 userInfo197);
    UserInfo197 queryById(int id);
}

195 POJO

package com.icodingedu.pojo;

import lombok.Data;
import org.springframework.stereotype.Component;

@Data
public class UserInfo195 {
    private int id;
    private String username;
    private int account;
}

197 POJO

package com.icodingedu.pojo;

import lombok.Data;
import org.springframework.stereotype.Component;

@Data
public class UserInfo197 {
    private int id;
    private String username;
    private int account;
}

195 xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.UserMapper195">
    <update id="update" parameterType="com.icodingedu.pojo.UserInfo195">
        update user_info set account=#{account} where id=#{id}
    </update>
    <select id="queryById" resultType="com.icodingedu.pojo.UserInfo195">
        select * from user_info where id=#{id}
    </select>
</mapper>

197 xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.UserMapper197">
    <update id="update" parameterType="com.icodingedu.pojo.UserInfo197">
        update user_info set account=#{account} where id=#{id}
    </update>
    <select id="queryById" resultType="com.icodingedu.pojo.UserInfo197">
        select * from user_info where id=#{id}
    </select>
</mapper>

service

package com.icodingedu.service;

import com.icodingedu.mapper.db195.UserMapper195;
import com.icodingedu.mapper.db197.UserMapper197;
import com.icodingedu.pojo.UserInfo195;
import com.icodingedu.pojo.UserInfo197;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class UserInfoService {

    @Autowired
    UserMapper195 userMapper195;

    @Autowired
    UserMapper197 userMapper197;

    @Transactional(transactionManager = "tm195")
    public void transfer(){
        UserInfo195 userInfo195 = userMapper195.queryById(1);
        userInfo195.setAccount(userInfo195.getAccount()-200);
        userMapper195.update(userInfo195);

        try {
            UserInfo197 userInfo197 = userMapper197.queryById(1);
            userInfo197.setAccount(userInfo197.getAccount() + 200);
            userMapper197.update(userInfo197);

            int i = 1 / 0;
        }catch (Exception ex){
            ex.printStackTrace();
            UserInfo197 userInfo197 = userMapper197.queryById(1);
            userInfo197.setAccount(userInfo197.getAccount() - 200);
            userMapper197.update(userInfo197);
            throw ex;
        }
        System.out.println("执行完毕********************");
        
    }
}

11. 基于本地消息的最终一致性方案设计

image-20200322201614603.png

这个图例的理解:

基于本地消息的特点

12. 基于本地消息的最终一致性代码实现

支付的消息表

CREATE TABLE `pay_msg` (
 `id` int(11) NOT NULL,
 `order_id` int(11) NOT NULL,
 `status` int(11) NOT NULL DEFAULT '0',
 `fail_count` int(11) NOT NULL DEFAULT '0',
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

支付的信息表

CREATE TABLE `user_account` (
 `id` int(11) NOT NULL,
 `username` varchar(255) NOT NULL,
 `account` int(11) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

订单信息表

CREATE TABLE `order_info` (
 `id` int(11) NOT NULL,
 `order_status` int(11) NOT NULL,
 `order_amount` int(11) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

POJO代码

package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class PayMsg {
 private int id;
 private int order_id;
 private int status;
 private int fail_count;
}
package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class UserAccount {
 private int id;
 private String username;
 private int account;
}
package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class OrderInfo {
 private int id;
 private int order_status;
 private int order_amount;
}

Mapper代码

package com.icodingedu.mapper.db195;
​
import com.icodingedu.pojo.PayMsg;
import org.springframework.stereotype.Repository;
​
import java.util.List;
​
@Repository
public interface PayMsgMapper {
 int insertPayMs(PayMsg payMsg);
 int updatePayMsg(PayMsg payMsg);
 List<PayMsg> queryNoSend();
 PayMsg queryForId(int order_id);
}
package com.icodingedu.mapper.db195;
​
import com.icodingedu.pojo.UserAccount;
import org.springframework.stereotype.Repository;
​
@Repository
public interface UserAccountMapper {
 int updateUserAccount(UserAccount userAccount);
 UserAccount queryForId(int id);
}
package com.icodingedu.mapper.db197;
​
import com.icodingedu.pojo.OrderInfo;
import org.springframework.stereotype.Repository;
​
@Repository
public interface OrderInfoMapper {
 int updateOrderInfo(OrderInfo orderInfo);
 OrderInfo queryForId(int id);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.PayMsgMapper">
 <insert id="insertPayMs" parameterType="com.icodingedu.pojo.PayMsg">
 insert into pay_msg(id,order_id,status,fail_count) values(#{id},#{order_id},#{status},#{fail_count})
 </insert>
 <update id="updatePayMsg" parameterType="com.icodingedu.pojo.PayMsg">
 update pay_msg set status=#{status},fail_count=#{fail_count} where id=#{id}
 </update>
 <select id="queryNoSend" resultType="com.icodingedu.pojo.PayMsg">
 select * from pay_msg where status=0
 </select>
 <select id="queryForId" resultType="com.icodingedu.pojo.PayMsg">
 select * from pay_msg where order_id=#{order_id}
 </select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db195.UserAccountMapper">
 <update id="updateUserAccount" parameterType="com.icodingedu.pojo.UserAccount">
 update user_account set account=#{account} where id=#{id}
 </update>
 <select id="queryForId" resultType="com.icodingedu.pojo.UserAccount">
 select * from user_account where id=#{id}
 </select>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.OrderInfoMapper">
 <update id="updateOrderInfo" parameterType="com.icodingedu.pojo.OrderInfo">
 update order_info set order_status=#{order_status},order_amount=#{order_amount} where id=#{id}
 </update>
 <select id="queryForId" resultType="com.icodingedu.pojo.OrderInfo">
 select * from order_info where id=#{id}
 </select>
</mapper>

service

package com.icodingedu.service;
​
import com.icodingedu.mapper.db197.OrderInfoMapper;
import com.icodingedu.pojo.OrderInfo;
import com.mysql.cj.x.protobuf.MysqlxCrud;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class OrderService {
 @Autowired
 OrderInfoMapper orderInfoMapper;
​
 /**
 *
 * @param order_id
 * @return:0-更新成功,1-订单不存在
 */
 public int handleOrder(int order_id){
 OrderInfo orderInfo = orderInfoMapper.queryForId(order_id);
 if(orderInfo==null){
 return 1;
 }
 orderInfo.setOrder_status(1);
 orderInfoMapper.updateOrderInfo(orderInfo);
 return 0;
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.pojo.PayMsg;
import jdk.internal.dynalink.linker.LinkerServices;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
​
import java.util.List;
​
@Service
public class OrderNotifyService {
​
 @Autowired
 PayMsgMapper payMsgMapper;
​
 @Scheduled(cron = "0/5 * * * * ?")
 public void orderNotify() throws Exception{
 System.out.println("进入cron************");
 List<PayMsg> payMsgList = payMsgMapper.queryNoSend();
 if(payMsgList==null||payMsgList.size()==0){
 return;
 }
 for (PayMsg payMsg: payMsgList) {
 int order_id = payMsg.getOrder_id();
 //http://localhost:8080/handleorder?id=2001
 CloseableHttpClient httpClient = HttpClientBuilder.create().build();
 HttpGet httpGet = new HttpGet("http://localhost:8080/handleorder?id="+order_id);
 CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
 String response = EntityUtils.toString(httpResponse.getEntity());
 System.out.println("************调用结果:"+response);
 if("success".equals(response)){
 payMsg.setStatus(1);
 }else{
 int count = payMsg.getFail_count();
 payMsg.setFail_count(count+1);
 if(count+1>5){
 payMsg.setStatus(2);
 }
 }
 payMsgMapper.updatePayMsg(payMsg);
 }
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.mapper.db195.UserAccountMapper;
import com.icodingedu.pojo.PayMsg;
import com.icodingedu.pojo.UserAccount;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
​
@Service
public class PaymentService {
​
 @Autowired
 UserAccountMapper userAccountMapper;
​
 @Autowired
 PayMsgMapper payMsgMapper;
 /**
 *
 * @param uid
 * @param order_id
 * @param amount
 * @return:0-成功,1-用户不存在,2-余额不足
 */
 @Transactional(transactionManager = "tm195")
 public int payment(int uid,int order_id,int amount){
 UserAccount userAccount = userAccountMapper.queryForId(uid);
 if(userAccount==null){
 return 1;
 }
 int account = userAccount.getAccount();
 if(account<amount){
 return 2;
 }
 userAccount.setAccount(account-amount);
 userAccountMapper.updateUserAccount(userAccount);
​
 PayMsg payMsg = new PayMsg();
 payMsg.setId(1001);
 payMsg.setOrder_id(order_id);
 payMsg.setStatus(0);//0-未发送,1-发送成功,2-超次数
 payMsg.setFail_count(0);
 payMsgMapper.insertPayMs(payMsg);
 return 0;
 }
}

controller

package com.icodingedu.controller;
​
import com.icodingedu.service.MsgSendService;
import com.icodingedu.service.PaymentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class PaymentController {
 @Autowired
 PaymentService paymentService;
​
 @GetMapping("/payment")
 @ResponseBody
 public String payment(int uid,int orderid,int amount){
 int status = paymentService.payment(uid,orderid,amount);
 return "支付成功: "+status;
 }
}
package com.icodingedu.controller;
​
import com.icodingedu.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class OrderController {
​
 @Autowired
 OrderService orderService;
​
 @GetMapping("/handleorder")
 @ResponseBody
 public String orderSet(int id){
 try {
 int flag = orderService.handleOrder(id);
 if (flag == 0) {
 return "success";
 } else {
 return "fail";
 }
 }catch (Exception ex){
 ex.printStackTrace();
 return "fail";
 }
 }
}

Application

package com.icodingedu;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
​
@SpringBootApplication
@EnableScheduling
public class TccTransactionApplication {
​
 public static void main(String[] args) {
 SpringApplication.run(TccTransactionApplication.class, args);
 }
​
}

13. 基于消息队列MQ的最终一致性方案设计

image-20200322213147932.png

基于MQ实现最终一致性的问题分析

具体逻辑实现

image-20200323001802383.png

14. 基于消息队列MQ的最终一致性代码实现

POM依赖

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

配置发送和接收

spring:
 rabbitmq:
 host: 39.100.17.31
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 connection-timeout: 10000
 listener:
 simple:
 concurrency: 1
 max-concurrency: 1
 auto-startup: true
 prefetch: 1
 acknowledge-mode: manual

POJO

package com.icodingedu.pojo;
​
import lombok.Data;
​
@Data
public class ReceiveMsg {
 private int id;
 private int order_id;
 private int fail_count;
}

Mapper

package com.icodingedu.mapper.db197;
​
import com.icodingedu.pojo.ReceiveMsg;
import org.springframework.stereotype.Repository;
​
@Repository
public interface ReceiveMsgMapper {
 int insert(ReceiveMsg receiveMsg);
 int update(ReceiveMsg receiveMsg);
 ReceiveMsg queryForOrderId(int order_id);
}

xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
 PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
 "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.icodingedu.mapper.db197.ReceiveMsgMapper">
 <insert id="insert" parameterType="com.icodingedu.pojo.ReceiveMsg">
 insert into receive_msg(id,order_id,fail_count) values(#{id},#{order_id},#{fail_count})
 </insert>
 <update id="update" parameterType="com.icodingedu.pojo.ReceiveMsg">
 update receive_msg set fail_count=#{fail_count} where id=#{id}
 </update>
 <select id="queryForOrderId" resultType="com.icodingedu.pojo.ReceiveMsg">
 select * from receive_msg where order_id=#{order_id}
 </select>
</mapper>

controller

package com.icodingedu.controller;
​
import com.icodingedu.service.MsgSendService;
import com.icodingedu.service.PaymentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
​
@Controller
public class PaymentController {
 @Autowired
 PaymentService paymentService;
​
 @Autowired
 MsgSendService msgSendService;
​
 @GetMapping("/payment")
 @ResponseBody
 public String payment(int uid,int orderid,int amount){
 int status = paymentService.payment(uid,orderid,amount);
 msgSendService.sendMessage("msg1001",String.valueOf(orderid));
 return "支付成功: "+status;
 }
}

service

package com.icodingedu.service;
​
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class MsgSendService {
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendMessage(String mid,String msg){
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(mid);
 rabbitTemplate.convertAndSend("order-exchange","receive.info",msg,correlationData);
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db197.ReceiveMsgMapper;
import com.icodingedu.pojo.ReceiveMsg;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.Map;
​
@Service
public class OrderReceiveService {
​
 @Autowired
 OrderService orderService;
​
 @Autowired
 ReceiveSendService receiveSendService;
​
 @Autowired
 ReceiveMsgMapper receiveMsgMapper;
​
 @RabbitListener(queues = "receive-queue")
 @RabbitHandler
 public void onOrderMessage(@Payload String orderid, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("*********消息收到,开始消费*********");
 System.out.println("OrderID :"+orderid);
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
​
 try{
 //订单状态发生了改变
 int flag = orderService.handleOrder(Integer.valueOf(orderid));
 if(flag==0){
 channel.basicAck(deliverTag,false);
 //通知发送端我接收到消息了
 receiveSendService.sendMessage("send1001",orderid);
 }else{
 ReceiveMsg receiveMsg = receiveMsgMapper.queryForOrderId(Integer.valueOf(orderid));
 if(receiveMsg==null){
 receiveMsg = new ReceiveMsg();
 receiveMsg.setId(3001);
 receiveMsg.setFail_count(1);
 receiveMsg.setOrder_id(Integer.valueOf(orderid));
 receiveMsgMapper.insert(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }else{
 int fail_count = receiveMsg.getFail_count();
 if(fail_count>5){
 channel.basicAck(deliverTag,false);
 }else{
 receiveMsg.setFail_count(fail_count+1);
 receiveMsgMapper.update(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }
 }
 }
 }catch (Exception ex){
 ex.printStackTrace();
 ReceiveMsg receiveMsg = receiveMsgMapper.queryForOrderId(Integer.valueOf(orderid));
 if(receiveMsg==null){
 receiveMsg = new ReceiveMsg();
 receiveMsg.setId(3001);
 receiveMsg.setFail_count(1);
 receiveMsg.setOrder_id(Integer.valueOf(orderid));
 receiveMsgMapper.insert(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }else{
 int fail_count = receiveMsg.getFail_count();
 if(fail_count>5){
 channel.basicAck(deliverTag,false);
 }else{
 receiveMsg.setFail_count(fail_count+1);
 receiveMsgMapper.update(receiveMsg);
 channel.basicNack(deliverTag,false,true);
 }
 }
 }
 }
}
package com.icodingedu.service;
​
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class ReceiveSendService {
​
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendMessage(String mid,String msg){
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(mid);
 rabbitTemplate.convertAndSend("order-exchange","send.confirm",msg,correlationData);
 }
}
package com.icodingedu.service;
​
import com.icodingedu.mapper.db195.PayMsgMapper;
import com.icodingedu.pojo.PayMsg;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
​
import java.util.Map;
​
@Service
public class SendReceiveService {
​
 @Autowired
 PayMsgMapper payMsgMapper;
​
 @RabbitListener(queues = "sendconfirm-queue")
 @RabbitHandler
 public void onOrderMessage(@Payload String orderid, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("===========接收发送确认的消息============");
 System.out.println("=====orderid "+orderid);
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 PayMsg payMsg = payMsgMapper.queryForId(Integer.valueOf(orderid));
 if(payMsg!=null){
 payMsg.setStatus(1);
 payMsgMapper.updatePayMsg(payMsg);
 channel.basicAck(deliverTag,false);
 }
 }
}
上一篇下一篇

猜你喜欢

热点阅读