fescar源码分析5

2019-02-10  本文已影响0人  leiwingqueen

一、概要

这篇文章主要针对前面提到的几个问题进行分析。

二、分析

  1. xid的生成如何保证唯一
    xid和branch id的生成依赖UUIDGenerator
/*
 *  Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package com.alibaba.fescar.server;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.fescar.common.exception.ShouldNeverHappenException;

public class UUIDGenerator {

    private static AtomicLong UUID = new AtomicLong(1000);

    private static int UUID_INTERNAL = 2000000000;

    public static long generateUUID() {
        long id = UUID.incrementAndGet();
        if (id > UUID_INTERNAL) {
            synchronized (UUID) {
                if (UUID.get() >= id) {
                    id -= UUID_INTERNAL;
                    UUID.set(id);
                }
            }
        }
        return id;
    }

    public static void init(int serverNodeId) {
        try {
            UUID.set(UUID_INTERNAL * serverNodeId);
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            Calendar cal = Calendar.getInstance();
            Date date = format.parse("2019-01-01");
            cal.setTime(date);
            long base = cal.getTimeInMillis();
            long current = System.currentTimeMillis();
            UUID.addAndGet((current - base) / 1000);
        } catch (ParseException e) {
            throw new ShouldNeverHappenException(e);
        }
    }
}

看代码的逻辑主要是根据初始化的serverNodeId控制不同的段。每个段的长度为UUID_INTERNAL(2亿)。
这里有几个问题。

UUID.set(UUID_INTERNAL * serverNodeId);
  1. TC的global session和branch session如何保存
    官方的demo把session分两部分数据保存,存量的数据和日志数据。
@Override
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        TransactionWriteFuture writeFutureRequest = new TransactionWriteFuture(session, logOperation);
        try {
            if (transactionWriteFutureQueue.offer(writeFutureRequest, MAX_ENQUEUE_MILLS, TimeUnit.MILLISECONDS)) {
                return writeFutureRequest.get();
            }
        } catch (InterruptedException exx) {
            LOGGER.error("write data file error," + exx.getMessage());
        }
        return false;
    }

可以看到这里做了一个简单的优化,并没有直接写文件,而是做了一个内存队列进行缓冲,异步写入。
接下来我们看下文件写入的格式。TransactionWriteStore

@Override
    public byte[] encode() {
        byte[] bySessionRequest = this.sessionRequest.encode();
        byte byOpCode = this.getOperate().getCode();
        int len = bySessionRequest.length + 1;
        byte[] byResult = new byte[len];
        ByteBuffer byteBuffer = ByteBuffer.wrap(byResult);
        byteBuffer.put(bySessionRequest);
        byteBuffer.put(byOpCode);
        return byResult;
    }

拿GlobalSession为例,branch session雷同

@Override
    public byte[] encode() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        byteBuffer.putLong(transactionId);
        byteBuffer.putInt(timeout);
        if (null != applicationId) {
            byte[] byApplicationId = applicationId.getBytes();
            byteBuffer.putShort((short)byApplicationId.length);
            byteBuffer.put(byApplicationId);
        } else {
            byteBuffer.putShort((short)0);
        }
        if (null != transactionServiceGroup) {
            byte[] byServiceGroup = transactionServiceGroup.getBytes();
            byteBuffer.putShort((short)byServiceGroup.length);
            byteBuffer.put(byServiceGroup);
        } else {
            byteBuffer.putShort((short)0);
        }
        if (null != transactionName) {
            byte[] byTxName = transactionName.getBytes();
            byteBuffer.putShort((short)byTxName.length);
            byteBuffer.put(byTxName);
        } else {
            byteBuffer.putShort((short)0);
        }
        byteBuffer.putLong(beginTime);
        byteBuffer.flip();
        byte[] result = new byte[byteBuffer.limit()];
        byteBuffer.get(result);
        return result;
    }

采用定长的二进制协议。前面512个字节为session的内容,后面1个字节为操作类型。
存量的数据以内存的形式保存到内存里面。AbstractSessionManager.sessionMap为存量的数据的保存,其实就是一个简单的map。

protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
@Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);

    }

    @Override
    public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
        return sessionMap.get(transactionId);
    }
@Override
    public void removeGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_REMOVE);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
        sessionMap.remove(session.getTransactionId());

    }

session在完成(commit/rollback)之后会从内存里面删掉,因此数据量理论上不会很大。但是如果需要做高可用的话需要改为保存到redis/memcache等共享存储。

  1. TC的第二段提交主要做什么操作?
  1. 某个事务提交失败/回滚失败如何处理
    我们看TM的核心代码。TransactionalTemplate。
  1. lock-key的作用
    RM需要加锁,TM是不需要加锁。这是由于fescar设计的模型上TM是不涉及更新DB的操作。我理解这里加锁的作用是为了保证回滚操作的正确性。当然这要求接入fescar的业务必须完整接入(eg.对于一个库存表的所有更新的操作都必须接入TC,不管是否涉及到分布式事务)。
public interface LockManager {

    boolean acquireLock(BranchSession branchSession) throws TransactionException;

    boolean isLockable(long transactionId, String resourceId, String lockKey) throws TransactionException;
}

key为resourceId+表名+主键ID(hashCode除128取模)。

三、总结

总体来说fescar提供的思路我认为可以借鉴,但是需要在线上使用还是需要自己对一些组件做扩展。另外rollback失败的补偿策略不确定官方有没实现,没有的话还需要做补充。

上一篇下一篇

猜你喜欢

热点阅读