分布式环境下多实例多线程读取同一张表的分析设计

2019-05-20  本文已影响0人  梦想又照进现实

场景描述:

数据迁移,应用会部署多个实例,具体几个不确定,每个应用为提高处理效率要求使用多线程处理,读取的数据来源为同一个DB2数据库中的同一个表,表数据量1亿,读取到的数据写入到NoSQL存储中;

设计分析:

需要支持水平扩展,实例数量可以随时调整,每个应用的线程数也可以根据部署实例所在机器性能调整,读取的又是同一个大表的数据,需要考虑的问题有:
1、多实例间处理数据不能重复,线程间处理数据不能重复;
2、如果设计为外来请求触发数据迁移,需要考虑负载均衡产生请求倾斜问题,要求每个实例地位对等;

设计方案一:

数据分段示意.png

第一步,Redis中设计数据分块锁,各个实例竞争这把锁,拿到锁的实例进行数据分块操作,没有拿到锁的则退出,拿到锁的按设置的分块大小常量进行数据RowNumber的分页查询,仅查主键,拿到的数据集合放到CurrentHashMap中,key值设计为分段序号,并同时创建key的锁,累加锁总数,map、分段锁和锁总数均存放于Redis中,释放分块锁;
第二步,根据锁总数遍历尝试竞争某个分段锁,拿到任何一个分段锁即退出竞争;
第三步,根据拿到的分段锁序号,读取key为该分段锁序号中的主键集合,根据总量和线程数进行模运算分发主键做数据查询,这里也可以平均分配,设计CountDownLatch计数器等待每个线程结束后,删除redis中CurrentHashMap中的该key的数据,释放分段锁;
第四步,检查redis中CurrentHashMap对象的size是否为0,不为0则重复第二步直到为0退出;

分段锁 (1).png

附:所有的锁都设置过期时间,以防应用异常无法释放锁;

单机版:

代码:
SegmentDataWithLock

public class SegmentDataWithLock {


        //分段锁持有map
        private Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<String, ReentrantLock>();

        //分段持有主键数组map
        private Map<String, String[]> segmentMap = new HashMap<>();

        //分段前缀
        private final static String SEGMENT_PIX = "seg";

        //分段大小
        private final int segmentSize = 8;

        //总分段数
        private int segmentCnt = 0;

        /**
         * 按分段名称检查分段锁,没有则新建一个锁
         * @param segment
         * @return Lock
         */
        public Lock checkGetLock(String segment, boolean setDefault){
            ReentrantLock reentrantLock = lockMap.get(segment);
            if (reentrantLock == null && setDefault) {
                synchronized (this){
                    reentrantLock = lockMap.get(segment);
                    if (reentrantLock == null) {
                        reentrantLock = new ReentrantLock();
                        System.out.println("lock for " + segment + " not exists! so create a lock: " + reentrantLock);
                        lockMap.put(segment, reentrantLock);
                        return  reentrantLock;
                    }
                    return  reentrantLock;
                }
            }
            return  reentrantLock;
        }

        /**
         * 对数组进行分段并设置分段锁
         */
        public void blokFullWithSegmentation(String[] arr) {
            List<List<String>> segList=new ArrayList<>();
            List<String> mList= Arrays.asList(arr);
            //System.out.println(mList.toString());
            //开始分段
            this.delivedEachSubList(mList,segList,segmentSize);

            //为每个分段赋值及分段锁
            for (int i = 0; i < segList.size(); i++) {
                //System.out.println(segList.get(i).toString()+"");
                List<String> singleSegList = segList.get(i);
                segmentMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)),  singleSegList.toArray(new String[0]));
                lockMap.put(this.SEGMENT_PIX.concat(String.valueOf(i)),new ReentrantLock());
                this.segmentCnt = segList.size();
            }
        }

        public static void delivedEachSubList(List<String> mList, List<List<String>> segList, int segmentSize) {
            if( mList.size()%segmentSize!=0) {
                for (int j = 0; j < mList.size() / segmentSize + 1; j++) {
                    if ((j * segmentSize + segmentSize) < mList.size()) {
                        segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11    j=0,j+3=3   j=j*3+1
                    } else if ((j * segmentSize + segmentSize) > mList.size()) {
                        segList.add(mList.subList(j * segmentSize, mList.size()));
                    } else if (mList.size() < segmentSize) {
                        segList.add(mList.subList(0, mList.size()));
                    }
                }
            }else if(mList.size()%segmentSize==0){
                for (int j = 0; j < mList.size() / segmentSize; j++) {
                    if ((j * segmentSize + segmentSize) <= mList.size()) {
                        segList.add(mList.subList(j * segmentSize, j * segmentSize + segmentSize));//0-3,4-7,8-11    j=0,j+3=3   j=j*3+1
                    } else if ((j * segmentSize+ segmentSize) > mList.size()) {
                        segList.add(mList.subList(j * segmentSize, mList.size()));
                    } else if (mList.size() < segmentSize) {
                        segList.add(mList.subList(0, mList.size()));
                    }
                }
            }
        }


        public void removeDataAndLockOfSegment(String segment){
            segmentMap.remove(segment);
            lockMap.remove(segment);
        }

        public Map<String, ReentrantLock> getLockMap() {
            return lockMap;
        }

        public Map<String, String[]> getSegmentMap() {
            return segmentMap;
        }

        public int getSegmentCnt() { return segmentCnt; }
}

TransferData

public class TransferData {

    private SegmentDataWithLock segmentDataWithLock  ;

    private  List<String> newDataContainer = new ArrayList<>();

   public TransferData(){
       segmentDataWithLock = new SegmentDataWithLock();
   }

    /**
     * 切分数据
     * @param data
     */
    public void segmentData(String[] data){
        segmentDataWithLock.blokFullWithSegmentation(data);
    }

    /**
     * 迁移数据
     * @param key
     */
    public void transfer(String key) {
        Lock lock = segmentDataWithLock.checkGetLock(key, false);
        if (lock == null) {
            System.out.println(key+" lock is null"+": "+Thread.currentThread().getName());
            return;
        }

        lock.lock();
        try {
            Map<String, String[]> segmentMap =  segmentDataWithLock.getSegmentMap();
            System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> start, lock:"+ lock.toString());
            try {
                if (segmentMap.get(key) != null && segmentMap.get(key).length > 0) {
                    System.out.println(Thread.currentThread().getName()+":"+Arrays.asList(segmentMap.get(key) ));
//                    Random random = new Random();
//                    long time = random.nextInt(10000) ;
//                    Thread.sleep(time);

                    newDataContainer.addAll(Arrays.asList(segmentMap.get(key) ));
                }

                //删除数据和锁
                segmentDataWithLock.removeDataAndLockOfSegment(key);
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(System.currentTimeMillis()+": "+Thread.currentThread().getName()+": transfer key("+key+")-> complete "  );
        } finally {
            lock.unlock();
        }
    }


    public boolean isFinish(){
        return segmentDataWithLock.getSegmentMap().isEmpty() ;
    }

    public  List<String> getNewDataContainer() {
        return newDataContainer;
    }

    public  int getDataSegmentCnt() {
        return segmentDataWithLock.getSegmentCnt();
    }

}

Worker

class  Worker implements Runnable{
    private CountDownLatch downLatch;
    private TransferData instance;

    Worker( TransferData instance,CountDownLatch downLatc){
        this.downLatch = downLatc;
        this.instance = instance;
    }

    @Override
    public void run() {
        for (int i = 0; i < instance.getDataSegmentCnt(); i++) {
            instance.transfer("seg" + i);
        }
        downLatch.countDown();
    }
}

TransferDataTest

public class TransferDataTest {

    private static final int handlerThreads = 10;

    public static void main(String[] args) {
        long start =  System.currentTimeMillis();
        int arrLength = 1000;
        TransferData instance = new TransferData();
        String[] data = new String[arrLength];
        for (int i = 0; i < arrLength; i++) {
            data[i] = "TestData".concat(String.valueOf(i));
        }
        //迁移前
        System.out.println("老数据:"+ Arrays.asList(data));

        //切分数据块
        instance.segmentData(data);

        //迁移开始并计数
        CountDownLatch countDownLatch = new CountDownLatch(handlerThreads);
        for (int i = 0; i < handlerThreads; i++) {
            //模拟并发迁移
            new Thread(new Worker(instance,countDownLatch)).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //迁移后
        System.out.println("Finished:"+instance.isFinish());
        System.out.println("新数据:"+instance.getNewDataContainer());
        System.out.println("耗时:"+(System.currentTimeMillis() - start)+"ms");
    }
}

设计方案二:

对方案一存放的CurrentHashMap改为ConcurrentLinkedQueue,这样可以避免使用分段锁

单机版

SegmentDataWithQueue

public class SegmentDataWithQueue {

    //数据段的队列
    private ConcurrentLinkedQueue<List<String>> dataQueue = new ConcurrentLinkedQueue<>();


    //分段大小
    private final int segmentSize = 8;


    /**
     * 对数组进行分段并存储到队列
     */
    public void blockFullWithSegmentation(String[] arr) {List<List<String>> segList = new ArrayList<>();
        List<String> srcList= Arrays.asList(arr);
        //开始分段
        SegmentDataWithLock.delivedEachSubList(srcList,segList,segmentSize);

        //为每个分段存储到队列
        for (int i = 0; i < segList.size(); i++) {
            //System.out.println(segList.get(i).toString()+"");
            List<String> singleSegList = segList.get(i);
            dataQueue.offer(singleSegList);
        }
    }

    public List<String> getNodeFromDataQueue() {
        return dataQueue.poll();
    }

    public boolean isEmptyQueue(){
        return  dataQueue.isEmpty();
    }
}

TransferDat

public class TransferDat {

    private SegmentDataWithQueue segmentDataWithQueue;

    private List<String> newDataContainer = new ArrayList<>();

    public TransferDat() {
        segmentDataWithQueue = new SegmentDataWithQueue();
    }

    /**
     * 切分数据
     *
     * @param data
     */
    public void segmentData(String[] data) {
        segmentDataWithQueue.blockFullWithSegmentation(data);
    }

    /**
     * 迁移数据
     */
    public void transfer() {
        try {
            while (!segmentDataWithQueue.isEmptyQueue()) {
                List<String> dataNodeList = segmentDataWithQueue.getNodeFromDataQueue();

                if (!CollectionUtils.isEmpty(dataNodeList)) {
                    System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() +
                            ": transfer data :" + dataNodeList.toString());
                    newDataContainer.addAll(dataNodeList);
                    Thread.sleep(300L);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(System.currentTimeMillis() + ": " + Thread.currentThread().getName() + ": transfer complete ");
    }


    public boolean isFinish() {
        return segmentDataWithQueue.isEmptyQueue();
    }

    public List<String> getNewDataContainer() {
        return newDataContainer;
    }

}

Worker

public class Worker implements Runnable {

    private CountDownLatch downLatch;
    private TransferDat transferDat;

    public Worker(TransferDat transferDat, CountDownLatch downLatch) {
        this.downLatch = downLatch;
        this.transferDat = transferDat;
    }

    @Override
    public void run() {
        transferDat.transfer();
        downLatch.countDown();
    }
}

TransferDatTest

public class TransferDatTest {

    private static final int handlerThreads = 10;

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        int arrLength = 1000;

        String[] data = new String[arrLength];
        for (int i = 0; i < arrLength; i++) {
            data[i] = "TestData".concat(String.valueOf(i));
        }
        //迁移前
        System.out.println("老数据:" + Arrays.asList(data));

        TransferDat transferDat = new TransferDat();
        //切分数据块
        transferDat.segmentData(data);

        //迁移开始并计数
        CountDownLatch downLatch = new CountDownLatch(handlerThreads);
        ExecutorService es = Executors.newFixedThreadPool(handlerThreads);
        //模拟并发迁移
        try {
            for (int i = 0; i < handlerThreads; i++) {
                es.submit(new Thread(new Worker(transferDat, downLatch)));
            }
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        es.shutdown();

        //迁移后
        System.out.println("Finished:" + transferDat.isFinish());
        System.out.println("新数据:" + transferDat.getNewDataContainer());
        System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
    }

}

方案三:
利用DB2数据自带的锁
select * From RRTEST where pkID='20070223ORD01267732' for update with RS

gitee代码:
https://gitee.com/danni505/SegmentData.git

上一篇 下一篇

猜你喜欢

热点阅读