store模块阅读22:HA(1):HAConnection

2017-12-13  本文已影响31人  赤子心_d709

简介

HAConnection用于描述master和slave用于同步数据的连接
两个service如下
  ReadSocketService :读来自 Slave节点 的数据。
  WriteSocketService :写到往 Slave节点 的数据。
重要方法
  ReadSocketService#run
  ReadSocketService#processReadEvent
  WriteSocketService#run
  WriteSocketService#transferData

部分涉及HAService的代码后面再讲

master和slave同步数据的协议如下


参照refer

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final HAService haService;//上层ha服务
    private final SocketChannel socketChannel;
    private final String clientAddr;//slave地址
    private WriteSocketService writeSocketService;//内部类
    private ReadSocketService readSocketService;//内部类

    private volatile long slaveRequestOffset = -1;//slave第一次请求的offset
    private volatile long slaveAckOffset = -1;//确认slave的最大位置

函数

构造函数和状态处理函数比较简单

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();//ha服务连接数+1
    }

    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }

    public void shutdown() {
        this.writeSocketService.shutdown(true);
        this.readSocketService.shutdown(true);
        this.close();
    }

    public void close() {
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }
        }
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

内部类

ReadSocketService

读socket服务

属性以及构造函数

        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
        private final Selector selector;
        private final SocketChannel socketChannel;
        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);//读的buffer,分配1M
        private int processPostion = 0;//byteBufferRead处理到的位置
        private volatile long lastReadTimestamp = System.currentTimeMillis();//记录最后读取的时间,20s超时

        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
            this.thread.setDaemon(true);
        }

run方法

代码,注释如下

        /**
         * while循环中
         *  1.调用processReadEvent,如果出错则break
         *  2.如果读的间隔超过了指定时间(默认20s)则break
         * 退出了break
         *  1.读写service stop
         *  2.ha服务移除当前HAConnection记录
         *  3.selector,channel关闭,清理
         */
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    boolean ok = this.processReadEvent();//处理读事件
                    if (!ok) {//处理错误
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }

                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {//心跳间隔超过了指定间隔,默认20s
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }

            this.makeStop();//读线程stop

            writeSocketService.makeStop();//写线程stop

            //ha服务移除当前HAConnection记录
            haService.removeConnection(HAConnection.this);

            //ha服务的连接数-1
            HAConnection.this.haService.getConnectionCount().decrementAndGet();

            //取消注册的key
            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }
            //关闭socket以及channel
            try {
                this.selector.close();
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }

            HAConnection.log.info(this.getServiceName() + " service end");
        }

processReadEvent

        /**
         * 处理读事件
         * 1.如果byteBufferRead写满了,就flip准备重新写,更新processPostion
         * 2.只要buffer还能写
         *  从socketChannel内容写到byteBufferRead中
         *  更新lastReadTimestamp
         *  找到pos之前最近%8==0的位置(每次传输的是一个long,刚好8个字节)
         *  读取之前8个字节,记录slaveAckOffset,代表slave发送过来的offset,即slave同步到的offset
         *  notifyTransferSome。master通知slave: "已经知道slave同步到slaveAckOffset这个位置了"
         */
        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;

            if (!this.byteBufferRead.hasRemaining()) {//没有什么要读了
                this.byteBufferRead.flip();//从头读
                this.processPostion = 0;
            }

            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);//channel中的内容读到byteBufferRead中
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        //最后读取时间
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {//buffer最新的位置离处理的位置超过了8字节(一个long占的位置)
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);//找到对应的整
                            long readOffset = this.byteBufferRead.getLong(pos - 8);//读取这个long
                            this.processPostion = pos;//记录处理到的位置
                            // 设置已确认的最大位置
                            HAConnection.this.slaveAckOffset = readOffset;
                            if (HAConnection.this.slaveRequestOffset < 0) {//初始请求的位置,代表是slave第一次请求
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }
                            // 通知目前Slave进度
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }

WriteSocketService

写socket服务,完成master向slave发送数据

属性以及构造函数

        private final Selector selector;
        private final SocketChannel socketChannel;
        
        private final int headerSize = 8 + 4;//头部信息 包含 physical offset(long) + bodySize(int)
        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);//master给slave同步数据时的头部buffer
        private long nextTransferFromWhere = -1;//传递数据从哪开始
        private SelectMappedBufferResult selectMappedBufferResult;//master给slave同步数据的内容,即body
        private boolean lastWriteOver = true;
        private long lastWriteTimestamp = System.currentTimeMillis();//最后一次写的时间

        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
            this.thread.setDaemon(true);
        }

run方法

        /**
         * 正常情况下while循环
         *  1.等到slave请求,读取服务中更新slaveRequestOffset
         *  2.计算nextTransferFromWhere
         *      2.1如果为首次连接,那么slave传递的offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
         *      2.2否则为slave同步过来的slaveRequestOffset
         *  3.如果之前发送给slave内容没有完成,那么一直接着发送
         *  4.如果之前发送给slave内容发完了,而且过了心跳时间(默认5s),那么只传递一个header过去(header中记录的bodySize为0)
         *  5.根据nextTransferFromWhere去mappedFile找合适大小的mappedFile内容,记录在selectMappedBufferResult
         *  6.调用transferData把头部byteBufferHeader以及body内容selectMappedBufferResult发送过去
         * 如果遇到异常,break处while循环
         *  1.读写service stop
         *  2.haService移除当前记录
         *  3.socket,channel关闭,清理
         *
         */
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);

                    if (-1 == HAConnection.this.slaveRequestOffset) {//slave还未请求过
                        Thread.sleep(10);
                        continue;
                    }

                    if (-1 == this.nextTransferFromWhere) {
                        //如果为首次连接,那么offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMapedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }
                            this.nextTransferFromWhere = masterOffset;
                        } else {//否则从指定的offset开始同步数据给slave
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;//如果不是首次连接,就从确认的slave的offset开始
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }

                    if (this.lastWriteOver) {//上次写完了

                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        //5s一次心跳,如果超时,记录的bodySize为0
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                            // Build Header
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);//头部信息 phyOffset为nextTransferFromWhere
                            this.byteBufferHeader.putInt(0);//头部信息 bodySize为0
                            this.byteBufferHeader.flip();

                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
                    } else {
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }

                    //获取指定位置之后的buffer
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        //同步数据不得超过默认32k
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;//下次传输的位置

                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);//头部12字节
                        this.byteBufferHeader.putLong(thisOffset);//开始位置
                        this.byteBufferHeader.putInt(size);//bodySize大小
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                    } else {
                        // 没新的消息,挂起等待
                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {

                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
            //break出来,代表异常
            if (this.selectMappedBufferResult != null) {
                this.selectMappedBufferResult.release();
            }
            //停止读写service
            this.makeStop();

            readSocketService.makeStop();

            haService.removeConnection(HAConnection.this);//移除该记录

            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }

            try {
                this.selector.close();
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }

            HAConnection.log.info(this.getServiceName() + " service end");
        }

transferData

master给slave写数据

        /**
         * 传输数据
         * 1.写头部
         * 2.写body
         */
        private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;//长度为0的写的次数
            // Write Header,写头部
            while (this.byteBufferHeader.hasRemaining()) {
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    //更新写时间
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // Write Body,写内容
            if (!this.byteBufferHeader.hasRemaining()) {//头部写完了
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());//把master的内容传递给slave
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;//清空次数
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;//发送完了之后置为null
            }

            return result;
        }

思考

两个线程run方法的流程,思路

都写在注释上了

processReadEvent时如果byteBufferRead一次读入了大量数据怎么办

源码上


image.png

如果一下子读的内容比processPostion 大了几十几百,会怎么办,为什么只读最后8个字节就行
这里应该是HAClient的发送内容保证的,最后发送的offset一定是最大的,因此之前的offset,即使收到了没有处理,也没有关系

WriteSocketService和ReadSocketService的异同

同:都继承ServiceThread,都记录一个最后读(写)时间,出现问题时都需要关闭两个Service,处理socket等
异:

1.同步协议不一样
读数据只要读一个long,代表maxPhyOffset
写要有一个12字节的header,后面是同步的内容
2.检测的时长不一样
读数据是20s没有读算超时(houseKeep)
写是5s没有写算超时(heartBeat)

问题

processReadEvent代码重复

image.png

我并不知道为啥要这样写

备注

思考中第二个对于HAClient的讲解,源码还没看

refer

http://blog.csdn.net/quhongwei_zhanqiu/article/details/39144469
http://technoboy.iteye.com/blog/2368458

上一篇 下一篇

猜你喜欢

热点阅读