为初学者而来~手工最简MQ(二)Broker

2020-10-16  本文已影响0人  eSoo

本文仅展示核心代码,全部代码,请移步:git-soomq

1,服务端

服务端的设计就非常简单了,最核心的就是消息的存取,以及响应生产者和消费者的网络请求
分为2部分:

1.1 消息文件

消息的存储我们参考kafka,并简化其逻辑,因为是最简单的mq,我们只考虑单机的情况的就行,每个topic存储2个文件

topicname.index
topicname.data

.index 文件存储格式为:
消息顺序号:消息截止位置
.data 文件按照顺序存储具体的消息

文件操作:

package com.esoo.mq.server.message;

import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;

import java.io.RandomAccessFile;

/**
 * 为每个topic创建一个对象进行管理
 */
public class MessageFile {
    private String topic;
    private Long offset;
    //索引文件
    private RandomAccessFile indexFile = null ;
    //数据文件
    private RandomAccessFile dataFile = null ;

    //追加消息(生产者进行调用)
    public ProcessorCommand appendMsg(ProcessorCommand in){

        try {
            //加锁,避免竞争,文件乱码
            synchronized (in.getResult().getTopic()) {

                //读取index文件最后一行
                String lastLine = readLastLine(indexFile, null);
                int lastOffset = 1;
                //消息体追加到data文件中,并返回文件末尾位置,作为本条消息的offset
                long lastindex =  writeEndLine(dataFile, in.getResult().getBody());
                if (lastLine != null && !lastLine.equals("")) {
                    String index[] = lastLine.split(":");
                    lastOffset = Integer.valueOf(index[0]);
                    lastOffset = lastOffset + 1;
                }
                //组装本条消息index 序列号:消息体末尾位置
                String insertMsgIndex = lastOffset + ":" + lastindex + "\t\n";
                writeEndLine(indexFile, insertMsgIndex.getBytes());
                in.setSuccess(true);
            }
        }catch (Exception e){
            e.printStackTrace();

            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        }
        return in;

    }

    //读取消息,消费者进行调用
    public ProcessorCommand readMsg(ProcessorCommand in){


        try {
            synchronized (in.getResult().getTopic()) {
                // 消息定位位置
                int seekIn = 0;
                // 消息体大小
                int bodySize = 0;
                //先定位到开始
                indexFile.seek(0);
                String indesMap=null;
                //遍历index文件,找到上一个消息 offset 与本消息offset 进行相减就是消息体大小
                while ((indesMap = indexFile.readLine())!=null){
                    String index[] = indesMap.split(":");
                    int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
                    int off = Integer.valueOf(String.valueOf(index[1]).trim());
                    if (inNum == in.getResult().getOffset()) {
                        seekIn = off;
                    }
                    if (inNum == (in.getResult().getOffset() + 1)) {
                        bodySize = off - seekIn;
                    }
                }
                if (bodySize == 0) {
                    in.setSuccess(false);
                    in.setExmsg("offset is end");
                    return in;
                }
                //定位到具体位置
                dataFile.seek(seekIn);

                //进行消息读取
                byte[] b = new byte[bodySize];
                dataFile.read(b);
                in.getResult().setBody(b);

                in.setSuccess(true);
                System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
            }
        }catch (Exception e){
            e.printStackTrace();
            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        }
        return in;

    }

    //写消息到最后一行
    public static long writeEndLine(RandomAccessFile file, byte[] msg)
            throws Exception {
        // 文件长度,字节数
        long fileLength = file.length();
        // 将写文件指针移到文件尾。
        file.seek(fileLength);
        file.write(msg);
        return file.getFilePointer();

    }

    //读取最后一行的消息
    public static String readLastLine(RandomAccessFile file, String charset) throws Exception {

        long len = file.length();
        if (len == 0L) {
            return "";
        } else {
            long pos = len - 1;
            while (pos > 0) {
                pos--;
                file.seek(pos);
                if (file.readByte() == '\n') {
                    break;
                }
            }
            if (pos == 0) {
                file.seek(0);
            }
            byte[] bytes = new byte[(int) (len - pos)];
            file.read(bytes);
            if (charset == null) {
                return new String(bytes);
            } else {
                return new String(bytes, charset);
            }
        }

    }

    public static String readByOffset(RandomAccessFile file, String charset) throws Exception {

        return null;
    }



    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getOffset() {
        return offset;
    }

    public void setOffset(Long offset) {
        this.offset = offset;
    }

    public RandomAccessFile getIndexFile() {
        return indexFile;
    }

    public void setIndexFile(RandomAccessFile indexFile) {
        this.indexFile = indexFile;
    }

    public RandomAccessFile getDataFile() {
        return dataFile;
    }

    public void setDataFile(RandomAccessFile dataFile) {
        this.dataFile = dataFile;
    }
}

1.2 网络编程

利用netty 开放端口,响应生产者与消费者,每个消息包装成一个commod,commod类型

网络启动

package com.esoo.mq.server;

import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SooMQServer {
    private static Integer serverPort=9870;
    ServerBootstrap b = new ServerBootstrap();

    public void start(){
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));
                    ch.pipeline().addLast(new NettySooMqServerOutHandler());
                    ch.pipeline().addLast(new NettySooMqServerHandler());
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            System.out.println(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }
    }

}

网络逻辑分发

注意:回写给客户端的消息体类型必须与入参保持一致,否则netty无法解析


netty
package com.esoo.mq.server.netty.handler;


import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.processor.Processor;
import com.esoo.mq.server.processor.ProcessorFactory;
import io.netty.channel.*;

@ChannelHandler.Sharable
public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        try {
            ProcessorCommand command = (ProcessorCommand) msg;
            System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
            Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
            msg = processor.handle(command);
            ChannelFuture f = ctx.writeAndFlush(msg);
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("msg ctx send");
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }



}

生产者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class SendMessageProcessor implements Processor<Message,Message> {

    @Override
    public ProcessorCommand handle(ProcessorCommand task) {
        MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
        task = file.appendMsg(task);
        return task;
    }


}

消费者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class ReadMessageProcessor implements Processor<Message,Message> {

    @Override
    public ProcessorCommand handle(ProcessorCommand task) {
        Message msg = task.getResult();
        MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
        task = file.readMsg(task);
        return task;
    }


}

上一篇下一篇

猜你喜欢

热点阅读