Netty学习--使用UDP广播事件

2019-01-25  本文已影响0人  何何与呵呵呵
UDP 的基础知识
广播系统概览
public class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
        return source;
    }
    public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
        return logfile;
    }
    public String getMsg() { // 返回消息内容
        return msg;
    }
    public long getReceivedTimestamp() { // 返回接收LogEvent的时间
        return received;
    }
}
在广播者中使用的Netty 的UDP 相关类

Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

通过DatagramPacket 发送的日志条目
LogEventBroadcaster:ChannelPipeline 和LogEvent 事件流
LogEventMonitor
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx,
                          DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content(); // 获取对DatagramPacket 中的数据(ByteBuf)的引用
        int idx = data.indexOf(0, data.readableBytes(),
                LogEvent.SEPARATOR); // 获取该SEPARATOR的索引
        String filename = data.slice(0, idx) // 提取日志消息
                .toString(CharsetUtil.UTF_8);
        String logMsg = data.slice(idx + 1,
                data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = new LogEvent(datagramPacket.sender(),
                System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        System.out.println(builder.toString());
    }
}
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler( new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel)
                            throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                } )
                .localAddress(address);
    }
    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
//        if (args.length != 1) {
//            throw new IllegalArgumentException(
//                    "Usage: LogEventMonitor <port>");
//        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(9999));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}
效果图

这样就可以监听你运行程序的log信息,并将信息返回到你的客服端,是不是很nice.

上一篇下一篇

猜你喜欢

热点阅读