Seata

JTA-atomikos源码走读

2019-10-09  本文已影响0人  7d972d5e05e8

一、JTA的核心实现类

  1. javax.transaction.UserTransaction的实现类,com.atomikos.icatch.jta.UserTransactionImp
  2. javax.transaction.TransactionManager的实现类,com.atomikos.icatch.jta.UserTransactionManager
  3. 最核心的类,javax.transaction.Transaction实现,com.atomikos.icatch.jta.TransactionImp。

UserTransaction和TransactionManager调用的实现,都是TransactionImp类。

参考文档:http://www.tianshouzhi.com/api/tutorials/distributed_transaction/386
XA/2PC,XA/3PC参考文档:https://www.cnblogs.com/duanxz/p/4672708.html

二、atomikos是如何实现分布式事务的2PC阶段:

2.1 2PC---prepare阶段

com.atomikos.icatch.jta.TransactionImp#commit 如下实现:

@Override
    public void commit() throws javax.transaction.RollbackException,
            javax.transaction.HeuristicMixedException,
            javax.transaction.HeuristicRollbackException,
            javax.transaction.SystemException, java.lang.SecurityException {
        try {
            this.compositeTransaction.commit();
        } catch (HeurHazardException hh) {
            rethrowAsJtaHeuristicMixedException(hh.getMessage(), hh);
        } catch (HeurRollbackException hr) {
            rethrowAsJtaHeuristicRollbackException(hr.getMessage(), hr);
        } catch (HeurMixedException hm) {
            rethrowAsJtaHeuristicMixedException(hm.getMessage(), hm);
        } catch (SysException se) {
            LOGGER.logError(se.getMessage(), se);
            throw new ExtendedSystemException(se.getMessage(), se);
        } catch (com.atomikos.icatch.RollbackException rb) {
            // see case 29708: all statements have been closed
            String msg = rb.getMessage();
            Throwable cause = rb.getCause();
            if (cause == null)
                cause = rb;
            rethrowAsJtaRollbackException(msg, cause);
        }
    }

com.atomikos.icatch.imp.CompositeTransactionImp#commit ,表示混合事务(多事务)的提交,如下实现:

public void commit () throws HeurRollbackException, HeurMixedException,
            HeurHazardException, SysException, SecurityException,
            RollbackException
    {
        doCommit ();
        setSiblingInfoForIncoming1pcRequestFromRemoteClient();
        
        if ( isRoot () ) {
            try {
                coordinator.terminate ( true );
            }

            catch ( RollbackException rb ) {
                throw rb;
            } catch ( HeurHazardException hh ) {
                throw hh;
            } catch ( HeurRollbackException hr ) {
                throw hr;
            } catch ( HeurMixedException hm ) {
                throw hm;
            } catch ( SysException se ) {
                throw se;
            } catch ( Exception e ) {
                throw new SysException (
                        "Unexpected error: " + e.getMessage (), e );
            }
        }
    }

注意其中的coordinator.terminate(true)方法,如下实现:

protected void terminate ( boolean commit ) throws HeurRollbackException,
            HeurMixedException, SysException, java.lang.SecurityException,
            HeurCommitException, HeurHazardException, RollbackException,
            IllegalStateException

    {    
        synchronized ( fsm_ ) {
            if ( commit ) {
                if ( participants_.size () <= 1 ) {
                    commit ( true );
                } else {
                    int prepareResult = prepare ();
                    // make sure to only do commit if NOT read only
                    if ( prepareResult != Participant.READ_ONLY )
                        commit ( false );
                }
            } else {
                rollback ();
            }
        }
    }

注意该方法是线程安全的,只会有一个线程执行。看到participants_.size==1的时候,表示就一个参与者,无需走2PC,直接commit(true)即可。但是大于2个参与者的话,就必须老老实实的2PC。核心看prepare ()方法

prepare ()方法如下:

public int prepare () throws RollbackException,
            java.lang.IllegalStateException, HeurHazardException,
            HeurMixedException, SysException
    {
        // FIRST, TAKE CARE OF DUPLICATE PREPARES

        // Recursive prepare-calls should be avoided for not deadlocking rollback/commit methods
        // If a recursive prepare re-enters, then it will see a voting state -> reject.
        // Note that this may also avoid some legal prepares, but only rarely
        if ( getState ().equals ( TxState.PREPARING ) )
            throw new RollbackException ( "Recursion detected" );

        int ret = Participant.READ_ONLY + 1;
        synchronized ( fsm_ ) {
            ret = stateHandler_.prepare ();
            if ( ret == Participant.READ_ONLY ) {

                 if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace (  "prepare() of Coordinator  " + getCoordinatorId ()
                        + " returning READONLY" );
            } else {

                 if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "prepare() of Coordinator  " + getCoordinatorId ()
                        + " returning YES vote");
            }
        }
        return ret;

    }

prepare()方法把实现委托给状态处理器,stateHandler_对象。当前2PC的第一个prepare阶段的状态处理器由ActiveStateHandler来处理。

com.atomikos.icatch.imp.ActiveStateHandler#prepare 如下实现:

 protected int prepare () throws RollbackException,
            java.lang.IllegalStateException, HeurHazardException,
            HeurMixedException, SysException
    {

        int count = 0; // number of participants
        PrepareResult result = null; // synchronization
        boolean allReadOnly = true; // if still true at end-> readonly vote
        int ret = 0; // return value
        Vector<Participant> participants = getCoordinator ().getParticipants ();
        CoordinatorStateHandler nextStateHandler = null;

        if ( orphansExist() ) {
            try {
                if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Orphans detected: "
                        + getCoordinator ().getLocalSiblingCount () + " vs "
                        + globalSiblingCount_ + " - forcing rollback." );
                rollbackWithAfterCompletionNotification(new RollbackCallback() {
                    public void doRollback()
                            throws HeurCommitException,
                            HeurMixedException, SysException,
                            HeurHazardException, IllegalStateException {
                        rollbackFromWithinCallback(false,false);
                    }});

            } catch ( HeurCommitException hc ) {
                throw new HeurMixedException();
            }

            throw new RollbackException ( "Orphans detected." );
        }

        try {
            try {
                getCoordinator().setState ( TxState.PREPARING );
            } catch ( RuntimeException error ) {
                //See case 23334
                String msg = "Error in preparing: " + error.getMessage() + " - rolling back instead";
                LOGGER.logWarning ( msg , error );
                try {
                    rollbackWithAfterCompletionNotification(new RollbackCallback() {
                        public void doRollback()
                                throws HeurCommitException,
                                HeurMixedException, SysException,
                                HeurHazardException, IllegalStateException {
                            rollbackFromWithinCallback(false,false);
                        }});
                    throw new RollbackException ( msg , error);
                } catch ( HeurCommitException e ) {
                    LOGGER.logError ( "Illegal heuristic commit during rollback before prepare:" + e );
                    throw new HeurMixedException();
                }
            }
            count = participants.size ();
            result = new PrepareResult ( count );
            Enumeration<Participant> enumm = participants.elements ();
            while ( enumm.hasMoreElements () ) {
                Participant p = (Participant) enumm.nextElement ();
                PrepareMessage pm = new PrepareMessage ( p, result );
                if ( getCascadeList () != null && p.getURI () != null ) { //null for OTS
                    Integer sibnum = (Integer) getCascadeList ().get ( p.getURI () );
                    if ( sibnum != null ) { // null for local participant!
                        p.setGlobalSiblingCount ( sibnum.intValue () );
                    }
                    p.setCascadeList ( getCascadeList () );
                }

                getPropagator ().submitPropagationMessage ( pm );
            } // while

            result.waitForReplies ();

            boolean voteOK = result.allYes ();
            setReadOnlyTable ( result.getReadOnlyTable () );
            allReadOnly = result.allReadOnly ();

            if ( !voteOK ) {
             int res = result.getResult ();
               
                try {
                    rollbackWithAfterCompletionNotification(new RollbackCallback() {
                        public void doRollback()
                                throws HeurCommitException,
                                HeurMixedException, SysException,
                                HeurHazardException, IllegalStateException {
                            rollbackFromWithinCallback(true,false);
                        }});
                } catch ( HeurCommitException hc ) {
                    // should not happen:
                    // means that ALL subordinate work committed heuristically.
                    // this is impossible since it assumes that ALL
                    // participants voted YES in the first place,
                    // which contradicts the fact that we are dealing with
                    // !voteOK
                    throw new SysException ( "Unexpected heuristic: "
                            + hc.getMessage (), hc );
                }
                // let recovery clean up in the background
                throw new RollbackException ( "One or more resources refused to commit (possibly because of a timeout in the resource - see the log for details). This transaction has been rolled back instead." );
            }
        } catch ( RuntimeException runerr ) {
            throw new SysException ( "Error in prepare: " + runerr.getMessage (), runerr );
        } catch ( InterruptedException err ) {
            // cf bug 67457
            InterruptedExceptionHelper.handleInterruptedException ( err );
            throw new SysException ( "Error in prepare: " + err.getMessage (), err );
        }
        // here we are if all yes.
        if ( allReadOnly ) {
            nextStateHandler = new TerminatedStateHandler ( this );
            getCoordinator ().setStateHandler ( nextStateHandler );
            ret = Participant.READ_ONLY;
            notifySynchronizationsAfterCompletion(TxState.COMMITTING,TxState.TERMINATED); //cf bug 127485
        } else {
            nextStateHandler = new IndoubtStateHandler ( this );
            getCoordinator ().setStateHandler ( nextStateHandler );
            ret = Participant.READ_ONLY + 1;
        }

        return ret;
    }

该方法是对prepare阶段的实现,所有的参与者都被注册到Vector数组中,什么时候注册进去的,暂时还没找到代码。设置异常处理器,然后设置状态为TxState.PREPARING。然后开始遍历每个参与者,执行各个资源管理器的prepare阶段,执行的代码在getPropagator ().submitPropagationMessage ( pm )中

getPropagator ().submitPropagationMessage ( pm )代码实现如下:

public synchronized void submitPropagationMessage ( PropagationMessage msg )
    {
            PropagatorThread t = new PropagatorThread ( msg );
            if ( threaded_ ) {
                TaskManager.SINGLETON.executeTask ( t );
            } else {
                t.run();
            }
    
    }

发现这里是通过多线程实现, t.run()的实现如下

PropagatorThread.run()实现如下:

 private static class PropagatorThread implements Runnable
    {
            private PropagationMessage msg;
            
            PropagatorThread ( PropagationMessage msg ) 
            {
                this.msg = msg;
            }
            
            public void run() 
            {
                try {
                    boolean tryAgain = true;
                    do {
                        tryAgain = msg.submit();
                        if ( tryAgain  ) {
                          //wait a little before retrying
                          Thread.sleep ( RETRY_INTERVAL );
                          if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Propagator: retrying " + "message: " + msg );
                        }
                    } while ( tryAgain );
                }
                catch ( Exception e ) {
                    LOGGER.logWarning ( "ERROR in propagator: " + e.getMessage () +
                            (msg != null ? " while sending message: " + msg : "") , e );
                }
            }
        
    }

msg.submit()的实现如下:

protected boolean submit ()
    {
        boolean failed = false;
        boolean transienterr = false;
        Exception exception = null;
        Object result = null;
        boolean retried = false;

        try {
            result = send ();
        } catch ( PropagationException e ) {
            failed = true;
            transienterr = e.isTransient ();
            exception = e.getDetail ();
        } finally {
            if ( failed && transienterr && retrycount_ < MAX_RETRIES_ON_COMM_FAILURE ) {
                retried = true;
                retrycount_++;
            }
            if ( result_ != null ) {
                result_.addReply ( new Reply ( result, exception,
                        getParticipant (), retried ) );
            }
        }
        return retried;
    }

最终定位到PropagationMessage.send()方法,PropagationMessage是抽象类,其实现有四个类,PrepareMessage,CommitMessage,RollbackMessage,ForgetMessage。当前prepare阶段,调用PrepareMessage.send()方法

其实现如下:

 protected Boolean send () throws PropagationException
    {
        Participant part = getParticipant ();
        int ret = 0;
        Boolean result = null;
        try {
            ret = part.prepare ();
            if ( ret == Participant.READ_ONLY )
                result = null;
            else
                result = new Boolean ( true );
        } catch ( HeurHazardException heurh ) {
            throw new PropagationException ( heurh, false );
        } catch ( RollbackException jtr ) {
            // NO vote.
            result = new Boolean ( false );
        } catch ( Exception e ) {
            // here, participant might be indoubt!
            HeurHazardException heurh = new HeurHazardException ();
            throw new PropagationException ( heurh, false );

        }
        return result;
    }

可以看到执行到了参与者的prepare方法,part.prepare()方法。由类com.atomikos.datasource.xa.XAResourceTransaction#prepare实现。

XAResourceTransaction.prepare方法实现如下:

@Override
    public synchronized int prepare() throws RollbackException,
            HeurHazardException, HeurMixedException, SysException {
        int ret = 0;
        terminateInResource();

        if (TxState.ACTIVE == this.state) {
            // tolerate non-delisting apps/servers
            suspend();
        }

        // duplicate prepares can happen for siblings in serial subtxs!!!
        // in that case, the second prepare just returns READONLY
        if (this.state == TxState.IN_DOUBT)
            return Participant.READ_ONLY;
        else if (!(this.state == TxState.LOCALLY_DONE))
            throw new SysException("Wrong state for prepare: " + this.state);
        try {
            // refresh xaresource for MQSeries: seems to close XAResource after
            // suspend???
            testOrRefreshXAResourceFor2PC();
            if (LOGGER.isTraceEnabled()) {
                LOGGER.logTrace("About to call prepare on XAResource instance: "
                        + this.xaresource);
            }
            ret = this.xaresource.prepare(this.xid);

        } catch (XAException xaerr) {
            String msg = interpretErrorCode(this.resourcename, "prepare",
                    this.xid, xaerr.errorCode);
            if (XAException.XA_RBBASE <= xaerr.errorCode
                    && xaerr.errorCode <= XAException.XA_RBEND) {
                LOGGER.logWarning(msg, xaerr); // see case 84253
                throw new RollbackException(msg);
            } else {
                LOGGER.logError(msg, xaerr);
                throw new SysException(msg, xaerr);
            }
        }
        setState(TxState.IN_DOUBT);
        if (ret == XAResource.XA_RDONLY) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
                        + " ) returning XAResource.XA_RDONLY " + "on resource "
                        + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            return Participant.READ_ONLY;
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
                        + " ) returning OK " + "on resource "
                        + this.resourcename
                        + " represented by XAResource instance "
                        + this.xaresource);
            }
            return Participant.READ_ONLY + 1;
        }
    }

终于到了我一直想了解的问题?就是:哪里调用了XAResource的prepare()方法。因为JTA所有的XAResource,都是由各个数据库厂商去实现XAResource接口的。这里,终于看到开始和真正的数据库交互了。

由于我本地只安装了Mysql的驱动jar包,只有mysql的XA实现,mysql的XAResource的com.mysql.cj.jdbc.MysqlXAConnection.prepare()方法的实现如下:

    @Override
    public int prepare(Xid xid) throws XAException {
        StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
        commandBuf.append("XA PREPARE ");
        appendXid(commandBuf, xid);

        dispatchCommand(commandBuf.toString());

        return XA_OK; // TODO: Check for read-only
    }

2.2 2PC---doCommit阶段

该阶段的本质上,和prepare的一样。
先在如下:

 protected void terminate ( boolean commit ) throws HeurRollbackException,
            HeurMixedException, SysException, java.lang.SecurityException,
            HeurCommitException, HeurHazardException, RollbackException,
            IllegalStateException

    {    
        synchronized ( fsm_ ) {
            if ( commit ) {
                if ( participants_.size () <= 1 ) {
                    commit ( true );
                } else {
                    int prepareResult = prepare ();
                    // make sure to only do commit if NOT read only
                    if ( prepareResult != Participant.READ_ONLY )
                        commit ( false );
                }
            } else {
                rollback ();
            }
        }
    }

其中 commit(false)方法,就是提交阶段。最终,会执行到,com.atomikos.icatch.imp.CommitMessage#send。CommitMessage是上面提到的PropagationMessage 四个实现类之一。最后也到了,com.atomikos.datasource.xa.XAResourceTransaction#commit方法。

@Override
    public synchronized void commit(boolean onePhase)
            throws HeurRollbackException, HeurHazardException,
            HeurMixedException, RollbackException, SysException {
        terminateInResource();

        if (this.state.equals(TxState.TERMINATED))
            return;
        if (this.state.equals(TxState.HEUR_MIXED))
            throw new HeurMixedException();
        if (this.state.equals(TxState.HEUR_ABORTED))
            throw new HeurRollbackException();
        if (this.xaresource == null) {
            String msg =  toString + ": no XAResource to commit?";
            LOGGER.logError(msg);
            throw new HeurHazardException(msg);
        }

        try {

            if (TxState.ACTIVE.equals(this.state)) { 
                // tolerate non-delisting apps/servers
                suspend();
            }
        } catch (ResourceException re) {
            // happens if already rolled back or something else;
            // in any case the transaction can be trusted to act
            // as if rollback already happened
            throw new com.atomikos.icatch.RollbackException(re.getMessage());
        }

        if (!(this.state.isOneOf(TxState.LOCALLY_DONE, TxState.IN_DOUBT, TxState.HEUR_HAZARD)))
            throw new SysException("Wrong state for commit: " + this.state);
        try {
            // refresh xaresource for MQSeries: seems to close XAResource after suspend???
            if (!onePhase) { // cf case 167209
                testOrRefreshXAResourceFor2PC();
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.logDebug("XAResource.commit ( " + this.xidToHexString
                        + " , " + onePhase + " ) on resource " + this.resourcename + 
                        " represented by XAResource instance " + this.xaresource);
            }
            this.xaresource.commit(this.xid, onePhase);

        } catch (XAException xaerr) {
            String msg = interpretErrorCode(this.resourcename, "commit",
                    this.xid, xaerr.errorCode);
            LOGGER.logWarning(msg, xaerr);

            if (XAException.XA_RBBASE <= xaerr.errorCode
                    && xaerr.errorCode <= XAException.XA_RBEND) {

                if (!onePhase)
                    throw new SysException(msg, xaerr);
                else
                    throw new com.atomikos.icatch.RollbackException(
                            "Already rolled back in resource.");
            } else {
                switch (xaerr.errorCode) {
                case XAException.XA_HEURHAZ:
                    setState(TxState.HEUR_HAZARD);
                    throw new HeurHazardException();
                case XAException.XA_HEURMIX:
                    setState(TxState.HEUR_MIXED);
                    throw new HeurMixedException();
                case XAException.XA_HEURCOM:
                    forget();
                    break;
                case XAException.XA_HEURRB:
                    setState(TxState.HEUR_ABORTED);
                    throw new HeurRollbackException();
                case XAException.XAER_NOTA:
                    if (!onePhase) {
                        // see case 21552
                        LOGGER.logWarning("XAResource.commit: invalid Xid - transaction already committed in resource?");
                        setState(TxState.TERMINATED);
                        break;
                    }
                default:
                    // fix for bug 31209
                    setState(TxState.HEUR_HAZARD);
                    throw new SysException(msg, xaerr);
                }
            }
        }
        setState(TxState.TERMINATED);
    }

最终,commit操作也终于到了XAResource方法,也是由厂商实现。

很晚了,暂时先不写了。等有时间再补充。

上一篇 下一篇

猜你喜欢

热点阅读