Netty 实战之 antfs文件系统的设计与实现

2018-02-28  本文已影响49人  逅弈

逅弈 转载请注明原创出处,谢谢!

上篇文章,我描述了如何“设计一个大文件存储系统”,这篇文章,我将对核心的架构和实现做下详细的描述。

系统的架构如下图所示:

antfs.png

整个系统主要有三个部分组成:

系统的执行流程如下:
1.启动一个Zk集群,或使用已存在的Zk
2.启动Queen,将自己注册到zk上去,路径为/queen,节点类型为临时节点
3.启动Worker,将字节注册到zk上去,路径为/root/worker,节点类型为临时顺序节点
4.Worker到/queen下获取Queen节点,通过netty与Queen进行heartbeat
5.Client调用Queen,进行文件的store或者restore操作
6.Queen将指令转发到具体的Worker
7.Worker进行相关的任务执行,并按条件进行数据的同步

其中,各组件之间通过自定义协议进行通讯,定义一个数据包Packet,如下:

public class Packet {
    /** the default magic number */
    public static final byte MAGIC = 0x13;
    /** the heart beat header */
    public static final Header HEART_BEAT_HEADER = new Header(MAGIC,MsgType.HEARTBEAT.getVal(),0);
    /** the heart beat packet */
    public static final Packet HEART_BEAT_PACKET = new Packet(HEART_BEAT_HEADER,null);
    /** the header size */
    public static final int HEADER_SIZE = 6;
    private Header header;
    private String body;
    // 省略get、set
    public static class Header{
        /** magic number */
        private byte magic;
        /** the message type */
        private byte msgType;
        /** packet body length */
        private int len;
        // 省略get、set
    }
}

MsgType定义如下:

public enum MsgType {
    /**
     * heart beat
     * from worker to queen
     */
    HEARTBEAT((byte)0x00),
    /**
     * file store
     * from client to queen
     * from queen to worker
     */
    STORE((byte)0x03),
    /**
     * file restore
     * from client to queen
     * from queen to worker
     */
    RESTORE((byte)0x04),
    /**
     * file chunk replication
     * from worker to worker
     */
    REPLICA((byte)0x10),
    /**
     * file meta sync
     * from queen to worker
     */
    SYNC((byte)0x11);
}

Worker与Queen进行heartbeat是通过Worker启动时,在EventLoop中调度一个定时任务实现,具体代码如下:

ChannelFuture future = bootstrap.bind(node.getPort()).sync();
LOGGER.info("WorkerServer Startup at port:{}", node.getPort());

Channel channel = future.channel();
// schedule a heartbeat runnable
channel.eventLoop().scheduleAtFixedRate(new HeartbeatClient(),0, Constants.HEART_BEAT_PERIOD,TimeUnit.SECONDS);
LOGGER.info("HeartbeatClient has scheduled");

HeartbeatClient实际是一个Runnable,具体的工作就是连接上Queen之后,向Queen发送HeartBeat的Packet,具体的核心代码如下:

public HeartbeatClient(){
    if(discovery==null){
        LOGGER.warn("discovery is null,can't get queenNode");
        return;
    }
    // connect to Queen
    connect();
}

@Override
public void run() {
    heartBeat();
}

private void heartBeat() {
    if(channel==null){
        LOGGER.warn("channel is null,can't send a heart beat packet to queen");
        return;
    }
    Packet packet = Packet.HEART_BEAT_PACKET;
    LOGGER.debug("worker send heart beat packet to queen with packet={},remoteAddress={}",packet,channel.remoteAddress());
    channel.writeAndFlush(packet);
}

整个系统的代码在github上:antfs,欢迎有兴趣的一起贡献。

更多原创好文,请关注「逅弈逐码」
上一篇下一篇

猜你喜欢

热点阅读