JTA-atomikos源码走读
一、JTA的核心实现类
- javax.transaction.UserTransaction的实现类,com.atomikos.icatch.jta.UserTransactionImp
- javax.transaction.TransactionManager的实现类,com.atomikos.icatch.jta.UserTransactionManager
- 最核心的类,javax.transaction.Transaction实现,com.atomikos.icatch.jta.TransactionImp。
UserTransaction和TransactionManager调用的实现,都是TransactionImp类。
参考文档:http://www.tianshouzhi.com/api/tutorials/distributed_transaction/386
XA/2PC,XA/3PC参考文档:https://www.cnblogs.com/duanxz/p/4672708.html
二、atomikos是如何实现分布式事务的2PC阶段:
2.1 2PC---prepare阶段
com.atomikos.icatch.jta.TransactionImp#commit 如下实现:
@Override
public void commit() throws javax.transaction.RollbackException,
javax.transaction.HeuristicMixedException,
javax.transaction.HeuristicRollbackException,
javax.transaction.SystemException, java.lang.SecurityException {
try {
this.compositeTransaction.commit();
} catch (HeurHazardException hh) {
rethrowAsJtaHeuristicMixedException(hh.getMessage(), hh);
} catch (HeurRollbackException hr) {
rethrowAsJtaHeuristicRollbackException(hr.getMessage(), hr);
} catch (HeurMixedException hm) {
rethrowAsJtaHeuristicMixedException(hm.getMessage(), hm);
} catch (SysException se) {
LOGGER.logError(se.getMessage(), se);
throw new ExtendedSystemException(se.getMessage(), se);
} catch (com.atomikos.icatch.RollbackException rb) {
// see case 29708: all statements have been closed
String msg = rb.getMessage();
Throwable cause = rb.getCause();
if (cause == null)
cause = rb;
rethrowAsJtaRollbackException(msg, cause);
}
}
com.atomikos.icatch.imp.CompositeTransactionImp#commit ,表示混合事务(多事务)的提交,如下实现:
public void commit () throws HeurRollbackException, HeurMixedException,
HeurHazardException, SysException, SecurityException,
RollbackException
{
doCommit ();
setSiblingInfoForIncoming1pcRequestFromRemoteClient();
if ( isRoot () ) {
try {
coordinator.terminate ( true );
}
catch ( RollbackException rb ) {
throw rb;
} catch ( HeurHazardException hh ) {
throw hh;
} catch ( HeurRollbackException hr ) {
throw hr;
} catch ( HeurMixedException hm ) {
throw hm;
} catch ( SysException se ) {
throw se;
} catch ( Exception e ) {
throw new SysException (
"Unexpected error: " + e.getMessage (), e );
}
}
}
注意其中的coordinator.terminate(true)方法,如下实现:
protected void terminate ( boolean commit ) throws HeurRollbackException,
HeurMixedException, SysException, java.lang.SecurityException,
HeurCommitException, HeurHazardException, RollbackException,
IllegalStateException
{
synchronized ( fsm_ ) {
if ( commit ) {
if ( participants_.size () <= 1 ) {
commit ( true );
} else {
int prepareResult = prepare ();
// make sure to only do commit if NOT read only
if ( prepareResult != Participant.READ_ONLY )
commit ( false );
}
} else {
rollback ();
}
}
}
注意该方法是线程安全的,只会有一个线程执行。看到participants_.size==1的时候,表示就一个参与者,无需走2PC,直接commit(true)即可。但是大于2个参与者的话,就必须老老实实的2PC。核心看prepare ()方法
prepare ()方法如下:
public int prepare () throws RollbackException,
java.lang.IllegalStateException, HeurHazardException,
HeurMixedException, SysException
{
// FIRST, TAKE CARE OF DUPLICATE PREPARES
// Recursive prepare-calls should be avoided for not deadlocking rollback/commit methods
// If a recursive prepare re-enters, then it will see a voting state -> reject.
// Note that this may also avoid some legal prepares, but only rarely
if ( getState ().equals ( TxState.PREPARING ) )
throw new RollbackException ( "Recursion detected" );
int ret = Participant.READ_ONLY + 1;
synchronized ( fsm_ ) {
ret = stateHandler_.prepare ();
if ( ret == Participant.READ_ONLY ) {
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "prepare() of Coordinator " + getCoordinatorId ()
+ " returning READONLY" );
} else {
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "prepare() of Coordinator " + getCoordinatorId ()
+ " returning YES vote");
}
}
return ret;
}
prepare()方法把实现委托给状态处理器,stateHandler_对象。当前2PC的第一个prepare阶段的状态处理器由ActiveStateHandler来处理。
com.atomikos.icatch.imp.ActiveStateHandler#prepare 如下实现:
protected int prepare () throws RollbackException,
java.lang.IllegalStateException, HeurHazardException,
HeurMixedException, SysException
{
int count = 0; // number of participants
PrepareResult result = null; // synchronization
boolean allReadOnly = true; // if still true at end-> readonly vote
int ret = 0; // return value
Vector<Participant> participants = getCoordinator ().getParticipants ();
CoordinatorStateHandler nextStateHandler = null;
if ( orphansExist() ) {
try {
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Orphans detected: "
+ getCoordinator ().getLocalSiblingCount () + " vs "
+ globalSiblingCount_ + " - forcing rollback." );
rollbackWithAfterCompletionNotification(new RollbackCallback() {
public void doRollback()
throws HeurCommitException,
HeurMixedException, SysException,
HeurHazardException, IllegalStateException {
rollbackFromWithinCallback(false,false);
}});
} catch ( HeurCommitException hc ) {
throw new HeurMixedException();
}
throw new RollbackException ( "Orphans detected." );
}
try {
try {
getCoordinator().setState ( TxState.PREPARING );
} catch ( RuntimeException error ) {
//See case 23334
String msg = "Error in preparing: " + error.getMessage() + " - rolling back instead";
LOGGER.logWarning ( msg , error );
try {
rollbackWithAfterCompletionNotification(new RollbackCallback() {
public void doRollback()
throws HeurCommitException,
HeurMixedException, SysException,
HeurHazardException, IllegalStateException {
rollbackFromWithinCallback(false,false);
}});
throw new RollbackException ( msg , error);
} catch ( HeurCommitException e ) {
LOGGER.logError ( "Illegal heuristic commit during rollback before prepare:" + e );
throw new HeurMixedException();
}
}
count = participants.size ();
result = new PrepareResult ( count );
Enumeration<Participant> enumm = participants.elements ();
while ( enumm.hasMoreElements () ) {
Participant p = (Participant) enumm.nextElement ();
PrepareMessage pm = new PrepareMessage ( p, result );
if ( getCascadeList () != null && p.getURI () != null ) { //null for OTS
Integer sibnum = (Integer) getCascadeList ().get ( p.getURI () );
if ( sibnum != null ) { // null for local participant!
p.setGlobalSiblingCount ( sibnum.intValue () );
}
p.setCascadeList ( getCascadeList () );
}
getPropagator ().submitPropagationMessage ( pm );
} // while
result.waitForReplies ();
boolean voteOK = result.allYes ();
setReadOnlyTable ( result.getReadOnlyTable () );
allReadOnly = result.allReadOnly ();
if ( !voteOK ) {
int res = result.getResult ();
try {
rollbackWithAfterCompletionNotification(new RollbackCallback() {
public void doRollback()
throws HeurCommitException,
HeurMixedException, SysException,
HeurHazardException, IllegalStateException {
rollbackFromWithinCallback(true,false);
}});
} catch ( HeurCommitException hc ) {
// should not happen:
// means that ALL subordinate work committed heuristically.
// this is impossible since it assumes that ALL
// participants voted YES in the first place,
// which contradicts the fact that we are dealing with
// !voteOK
throw new SysException ( "Unexpected heuristic: "
+ hc.getMessage (), hc );
}
// let recovery clean up in the background
throw new RollbackException ( "One or more resources refused to commit (possibly because of a timeout in the resource - see the log for details). This transaction has been rolled back instead." );
}
} catch ( RuntimeException runerr ) {
throw new SysException ( "Error in prepare: " + runerr.getMessage (), runerr );
} catch ( InterruptedException err ) {
// cf bug 67457
InterruptedExceptionHelper.handleInterruptedException ( err );
throw new SysException ( "Error in prepare: " + err.getMessage (), err );
}
// here we are if all yes.
if ( allReadOnly ) {
nextStateHandler = new TerminatedStateHandler ( this );
getCoordinator ().setStateHandler ( nextStateHandler );
ret = Participant.READ_ONLY;
notifySynchronizationsAfterCompletion(TxState.COMMITTING,TxState.TERMINATED); //cf bug 127485
} else {
nextStateHandler = new IndoubtStateHandler ( this );
getCoordinator ().setStateHandler ( nextStateHandler );
ret = Participant.READ_ONLY + 1;
}
return ret;
}
该方法是对prepare阶段的实现,所有的参与者都被注册到Vector数组中,什么时候注册进去的,暂时还没找到代码。设置异常处理器,然后设置状态为TxState.PREPARING。然后开始遍历每个参与者,执行各个资源管理器的prepare阶段,执行的代码在getPropagator ().submitPropagationMessage ( pm )中
getPropagator ().submitPropagationMessage ( pm )代码实现如下:
public synchronized void submitPropagationMessage ( PropagationMessage msg )
{
PropagatorThread t = new PropagatorThread ( msg );
if ( threaded_ ) {
TaskManager.SINGLETON.executeTask ( t );
} else {
t.run();
}
}
发现这里是通过多线程实现, t.run()的实现如下
PropagatorThread.run()实现如下:
private static class PropagatorThread implements Runnable
{
private PropagationMessage msg;
PropagatorThread ( PropagationMessage msg )
{
this.msg = msg;
}
public void run()
{
try {
boolean tryAgain = true;
do {
tryAgain = msg.submit();
if ( tryAgain ) {
//wait a little before retrying
Thread.sleep ( RETRY_INTERVAL );
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Propagator: retrying " + "message: " + msg );
}
} while ( tryAgain );
}
catch ( Exception e ) {
LOGGER.logWarning ( "ERROR in propagator: " + e.getMessage () +
(msg != null ? " while sending message: " + msg : "") , e );
}
}
}
msg.submit()的实现如下:
protected boolean submit ()
{
boolean failed = false;
boolean transienterr = false;
Exception exception = null;
Object result = null;
boolean retried = false;
try {
result = send ();
} catch ( PropagationException e ) {
failed = true;
transienterr = e.isTransient ();
exception = e.getDetail ();
} finally {
if ( failed && transienterr && retrycount_ < MAX_RETRIES_ON_COMM_FAILURE ) {
retried = true;
retrycount_++;
}
if ( result_ != null ) {
result_.addReply ( new Reply ( result, exception,
getParticipant (), retried ) );
}
}
return retried;
}
最终定位到PropagationMessage.send()方法,PropagationMessage是抽象类,其实现有四个类,PrepareMessage,CommitMessage,RollbackMessage,ForgetMessage。当前prepare阶段,调用PrepareMessage.send()方法
其实现如下:
protected Boolean send () throws PropagationException
{
Participant part = getParticipant ();
int ret = 0;
Boolean result = null;
try {
ret = part.prepare ();
if ( ret == Participant.READ_ONLY )
result = null;
else
result = new Boolean ( true );
} catch ( HeurHazardException heurh ) {
throw new PropagationException ( heurh, false );
} catch ( RollbackException jtr ) {
// NO vote.
result = new Boolean ( false );
} catch ( Exception e ) {
// here, participant might be indoubt!
HeurHazardException heurh = new HeurHazardException ();
throw new PropagationException ( heurh, false );
}
return result;
}
可以看到执行到了参与者的prepare方法,part.prepare()方法。由类com.atomikos.datasource.xa.XAResourceTransaction#prepare实现。
XAResourceTransaction.prepare方法实现如下:
@Override
public synchronized int prepare() throws RollbackException,
HeurHazardException, HeurMixedException, SysException {
int ret = 0;
terminateInResource();
if (TxState.ACTIVE == this.state) {
// tolerate non-delisting apps/servers
suspend();
}
// duplicate prepares can happen for siblings in serial subtxs!!!
// in that case, the second prepare just returns READONLY
if (this.state == TxState.IN_DOUBT)
return Participant.READ_ONLY;
else if (!(this.state == TxState.LOCALLY_DONE))
throw new SysException("Wrong state for prepare: " + this.state);
try {
// refresh xaresource for MQSeries: seems to close XAResource after
// suspend???
testOrRefreshXAResourceFor2PC();
if (LOGGER.isTraceEnabled()) {
LOGGER.logTrace("About to call prepare on XAResource instance: "
+ this.xaresource);
}
ret = this.xaresource.prepare(this.xid);
} catch (XAException xaerr) {
String msg = interpretErrorCode(this.resourcename, "prepare",
this.xid, xaerr.errorCode);
if (XAException.XA_RBBASE <= xaerr.errorCode
&& xaerr.errorCode <= XAException.XA_RBEND) {
LOGGER.logWarning(msg, xaerr); // see case 84253
throw new RollbackException(msg);
} else {
LOGGER.logError(msg, xaerr);
throw new SysException(msg, xaerr);
}
}
setState(TxState.IN_DOUBT);
if (ret == XAResource.XA_RDONLY) {
if (LOGGER.isDebugEnabled()) {
LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
+ " ) returning XAResource.XA_RDONLY " + "on resource "
+ this.resourcename
+ " represented by XAResource instance "
+ this.xaresource);
}
return Participant.READ_ONLY;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.logDebug("XAResource.prepare ( " + this.xidToHexString
+ " ) returning OK " + "on resource "
+ this.resourcename
+ " represented by XAResource instance "
+ this.xaresource);
}
return Participant.READ_ONLY + 1;
}
}
终于到了我一直想了解的问题?就是:哪里调用了XAResource的prepare()方法。因为JTA所有的XAResource,都是由各个数据库厂商去实现XAResource接口的。这里,终于看到开始和真正的数据库交互了。
由于我本地只安装了Mysql的驱动jar包,只有mysql的XA实现,mysql的XAResource的com.mysql.cj.jdbc.MysqlXAConnection.prepare()方法的实现如下:
@Override
public int prepare(Xid xid) throws XAException {
StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
commandBuf.append("XA PREPARE ");
appendXid(commandBuf, xid);
dispatchCommand(commandBuf.toString());
return XA_OK; // TODO: Check for read-only
}
2.2 2PC---doCommit阶段
该阶段的本质上,和prepare的一样。
先在如下:
protected void terminate ( boolean commit ) throws HeurRollbackException,
HeurMixedException, SysException, java.lang.SecurityException,
HeurCommitException, HeurHazardException, RollbackException,
IllegalStateException
{
synchronized ( fsm_ ) {
if ( commit ) {
if ( participants_.size () <= 1 ) {
commit ( true );
} else {
int prepareResult = prepare ();
// make sure to only do commit if NOT read only
if ( prepareResult != Participant.READ_ONLY )
commit ( false );
}
} else {
rollback ();
}
}
}
其中 commit(false)方法,就是提交阶段。最终,会执行到,com.atomikos.icatch.imp.CommitMessage#send。CommitMessage是上面提到的PropagationMessage 四个实现类之一。最后也到了,com.atomikos.datasource.xa.XAResourceTransaction#commit方法。
@Override
public synchronized void commit(boolean onePhase)
throws HeurRollbackException, HeurHazardException,
HeurMixedException, RollbackException, SysException {
terminateInResource();
if (this.state.equals(TxState.TERMINATED))
return;
if (this.state.equals(TxState.HEUR_MIXED))
throw new HeurMixedException();
if (this.state.equals(TxState.HEUR_ABORTED))
throw new HeurRollbackException();
if (this.xaresource == null) {
String msg = toString + ": no XAResource to commit?";
LOGGER.logError(msg);
throw new HeurHazardException(msg);
}
try {
if (TxState.ACTIVE.equals(this.state)) {
// tolerate non-delisting apps/servers
suspend();
}
} catch (ResourceException re) {
// happens if already rolled back or something else;
// in any case the transaction can be trusted to act
// as if rollback already happened
throw new com.atomikos.icatch.RollbackException(re.getMessage());
}
if (!(this.state.isOneOf(TxState.LOCALLY_DONE, TxState.IN_DOUBT, TxState.HEUR_HAZARD)))
throw new SysException("Wrong state for commit: " + this.state);
try {
// refresh xaresource for MQSeries: seems to close XAResource after suspend???
if (!onePhase) { // cf case 167209
testOrRefreshXAResourceFor2PC();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.logDebug("XAResource.commit ( " + this.xidToHexString
+ " , " + onePhase + " ) on resource " + this.resourcename +
" represented by XAResource instance " + this.xaresource);
}
this.xaresource.commit(this.xid, onePhase);
} catch (XAException xaerr) {
String msg = interpretErrorCode(this.resourcename, "commit",
this.xid, xaerr.errorCode);
LOGGER.logWarning(msg, xaerr);
if (XAException.XA_RBBASE <= xaerr.errorCode
&& xaerr.errorCode <= XAException.XA_RBEND) {
if (!onePhase)
throw new SysException(msg, xaerr);
else
throw new com.atomikos.icatch.RollbackException(
"Already rolled back in resource.");
} else {
switch (xaerr.errorCode) {
case XAException.XA_HEURHAZ:
setState(TxState.HEUR_HAZARD);
throw new HeurHazardException();
case XAException.XA_HEURMIX:
setState(TxState.HEUR_MIXED);
throw new HeurMixedException();
case XAException.XA_HEURCOM:
forget();
break;
case XAException.XA_HEURRB:
setState(TxState.HEUR_ABORTED);
throw new HeurRollbackException();
case XAException.XAER_NOTA:
if (!onePhase) {
// see case 21552
LOGGER.logWarning("XAResource.commit: invalid Xid - transaction already committed in resource?");
setState(TxState.TERMINATED);
break;
}
default:
// fix for bug 31209
setState(TxState.HEUR_HAZARD);
throw new SysException(msg, xaerr);
}
}
}
setState(TxState.TERMINATED);
}
最终,commit操作也终于到了XAResource方法,也是由厂商实现。
很晚了,暂时先不写了。等有时间再补充。