think php

tp5框架使用XA事务遇到的坑/think php

2020-10-22  本文已影响0人  马京拓普

我tp框架版本是5.1.39 LTS,因为公司项目的原因,一个操作需要修改到两个数据库的数据,所以就用了XA分布式事务,接下来就说一下针对一些报错的解决方法。

错误1:

SQLSTATE[HY000]: General error: 2014 Cannot execute queries while other unbuffered queries are active. Consider using PDOStatement::fetchAll(). Alternatively, if your code is only ever going to run against mysql, you may enable query buffering by setting the PDO::MYSQL_ATTR_USE_BUFFERED_QUERY attribute.


image.png
解决方法:

1.打开thinkphp/library/think/db/connector/Mysql.php
2.把startTransXa、prepareXa、commitXa、rollbackXa这4个方法里的$this->execute()改成$this->linkID->exec()

例如startTransXa方法:

    /**
     * 启动XA事务
     * @access public
     * @param  string $xid XA事务id
     * @return void
     */
    public function startTransXa($xid)
    {
        $this->initConnect(true);
        if (!$this->linkID) {
            return false;
        }

        $this->execute("XA START '$xid'");
    }

改成

     /**
     * 启动XA事务
     * @access public
     * @param  string $xid XA事务id
     * @return void
     */
    public function startTransXa($xid)
    {
        $this->initConnect(true);
        if (!$this->linkID) {
            return false;
        }

        $this->linkID->exec("XA START '$xid'");
    }

错误2:

SQLSTATE[XAE08]: <<Unknown error>>: 1440 XAER_DUPID: The XID already exists


image.png
解决方法:

1.打开thinkphp/library/think/db/Query.php,第404行
2.一次操作开启多个XA事务时,xid是不能重复的,所以我们要每开启一个XA事务就用一个新的xid
3.看下面的修改前后变化

原来的方法代码:

    /**
     * 执行数据库Xa事务
     * @access public
     * @param  callable $callback 数据操作方法回调
     * @param  array    $dbs      多个查询对象或者连接对象
     * @return mixed
     * @throws PDOException
     * @throws \Exception
     * @throws \Throwable
     */
    public function transactionXa($callback, array $dbs = [])
    {
        $xid = uniqid('xa');

        if (empty($dbs)) {
            $dbs[] = $this->getConnection();
        }

        foreach ($dbs as $key => $db) {
            if ($db instanceof Query) {
                $db = $db->getConnection();

                $dbs[$key] = $db;
            }

            $db->startTransXa($xid);
        }

        try {
            $result = null;
            if (is_callable($callback)) {
                $result = call_user_func_array($callback, [$this]);
            }

            foreach ($dbs as $db) {
                $db->prepareXa($xid);
            }

            foreach ($dbs as $db) {
                $db->commitXa($xid);
            }

            return $result;
        } catch (\Exception $e) {
            foreach ($dbs as $db) {
                $db->rollbackXa($xid);
            }
            throw $e;
        } catch (\Throwable $e) {
            foreach ($dbs as $db) {
                $db->rollbackXa($xid);
            }
            throw $e;
        }
    }

改成:

    /**
     * 执行数据库Xa事务
     * @access public
     * @param  callable $callback 数据操作方法回调
     * @param  array    $dbs      多个查询对象或者连接对象
     * @return mixed
     * @throws PDOException
     * @throws \Exception
     * @throws \Throwable
     */
    public function transactionXa($callback, array $dbs = [])
    {
      
        if (empty($dbs)) {
            $dbs[] = $this->getConnection();
        }
        //定义一个装xid的空数组
        $xid_data=[];

        //根据事务中数据库个数生成xid
        foreach ($dbs as $key => $db) {
            $xid_data[$key] = uniqid('xa');
        }

        foreach ($dbs as $key => $db) {
            if ($db instanceof Query) {
                $db = $db->getConnection();

                $dbs[$key] = $db;
            }
            //每个事务操作使用自己的xid
            $db->startTransXa($xid_data[$key]);
        }

        try {
            $result = null;
            if (is_callable($callback)) {
                $result = call_user_func_array($callback,[$this]);
            }

            foreach ($dbs as $key => $db) {
                //每个事务操作使用自己的xid
                $db->prepareXa($xid_data[$key]);
            }

            foreach ($dbs as $key => $db) {
                //每个事务操作使用自己的xid
                $db->commitXa($xid_data[$key]);
            }

            return $result;
        } catch (\Exception $e) {
            foreach ($dbs as $key => $db) {
                 //每个事务操作使用自己的xid
                $db->rollbackXa($xid_data[$key]);
            }
            throw $e;
        } catch (\Throwable $e) {
            foreach ($dbs as $key => $db) {
                 //每个事务操作使用自己的xid
                $db->rollbackXa($xid_data[$key]);
            }
            throw $e;
        }
    }

错误3:

SQLSTATE[XAE07]: <<Unknown error>>: 1399 XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state


image.png
解决方法:

1.会出现这个原因主要是在xa事务中,有一个操作执行错误了,需要执行回滚操作,但是根据XA分布式事务的相关知识知道,提交或回滚一个事务时,这个事务的状态不能是active状态
2.所以我们只要在捕捉异常里通过执行XA END [xid]和XA PREPARE[xid]来改变事务状态就可以回滚了,那prepareXa方法里包含了这两个语句的执行,所以我们只需要在捕捉异常里执行一下这个方法就可以了,代码沿用了错误2修改后的代码

     /**
     * 执行数据库Xa事务
     * @access public
     * @param  callable $callback 数据操作方法回调
     * @param  array    $dbs      多个查询对象或者连接对象
     * @return mixed
     * @throws PDOException
     * @throws \Exception
     * @throws \Throwable
     */
    public function transactionXa($callback, array $dbs = [])
    {
      
        if (empty($dbs)) {
            $dbs[] = $this->getConnection();
        }
        //定义一个装xid的空数组
        $xid_data=[];

        //根据事务中数据库个数生成xid
        foreach ($dbs as $key => $db) {
            $xid_data[$key] = uniqid('xa');
        }

        foreach ($dbs as $key => $db) {
            if ($db instanceof Query) {
                $db = $db->getConnection();

                $dbs[$key] = $db;
            }
            //每个事务操作使用自己的xid
            $db->startTransXa($xid_data[$key]);
        }

        try {
            $result = null;
            if (is_callable($callback)) {
                $result = call_user_func_array($callback,[$this]);
            }

            foreach ($dbs as $key => $db) {
                //每个事务操作使用自己的xid
                $db->prepareXa($xid_data[$key]);
            }

            foreach ($dbs as $key => $db) {
                //每个事务操作使用自己的xid
                $db->commitXa($xid_data[$key]);
            }

            return $result;
        } catch (\Exception $e) {
            //执行prepareXa方法改变事务状态
            foreach ($dbs as $key => $db) {
                $db->prepareXa($xid_data[$key]);
            }

            foreach ($dbs as $key => $db) {
                 //每个事务操作使用自己的xid
                $db->rollbackXa($xid_data[$key]);
            }
            throw $e;
        } catch (\Throwable $e) {
            //执行prepareXa方法改变事务状态
            foreach ($dbs as $key => $db) {
                $db->prepareXa($xid_data[$key]);
            }

            foreach ($dbs as $key => $db) {
                 //每个事务操作使用自己的xid
                $db->rollbackXa($xid_data[$key]);
            }
            throw $e;
        }
    }

完整测试例子代码:

      $test1='test1';
      $test2='test2';

        try {

            Db::transactionXa(function ()use($test1,$test2) {

                $ar=Db::connect('db_config_test1')->table('pre_test')->insert(['name'=>$test1]);
                if(!$ar){
                    throw new \Exception('新增失败1');
                }
                
                $ar=Db::connect('db_config_test2')->table('pre_test')->insert(['name'=>$test2]);
                if(!$ar){
                    throw new \Exception('新增失败2');
                }
                
            }, [Db::connect('db_config_test1'),Db::connect('db_config_test2')]);

        } catch (\Exception $e) {

            echo $e->getMessage();
            exit;

        }

        echo 'success';

结语:
第一次写文章分享,希望能帮助到大家。有什么错误的地方欢迎提出来,一起学习。感谢大家支持

上一篇 下一篇

猜你喜欢

热点阅读