Netty学习--使用UDP广播事件
2019-01-25 本文已影响0人
何何与呵呵呵
UDP 的基础知识
-
UDP 的基础知识
TCP 连接就像打电话,其中一系列的有序消息将会在两个方向上流动。相反,UDP 则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。 -
UDP 广播
UDP 提供了向多个接收者发送消息的额外传输模式:
1.多播——传输到一个预定义的主机组;
2.广播——传输到网络(或者子网)上的所有主机。 -
UDP 示例应用程序
发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
广播系统概览
- 消息POJO: LogEvent
public class LogEvent {
public static final byte SEPARATOR = (byte) ':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
this(null, -1, logfile, msg);
}
public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
return source;
}
public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
return logfile;
}
public String getMsg() { // 返回消息内容
return msg;
}
public long getReceivedTimestamp() { // 返回接收LogEvent的时间
return received;
}
}
- 编写广播者
在广播者中使用的Netty 的UDP 相关类
Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
通过DatagramPacket 发送的日志条目
LogEventBroadcaster:ChannelPipeline 和LogEvent 事件流
-
编写监视器
(1)接收由LogEventBroadcaster 广播的UDP DatagramPacket;
(2)将它们解码为LogEvent 消息;
(3)将LogEvent 消息写出到System.out。
LogEventMonitor
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx,
DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content(); // 获取对DatagramPacket 中的数据(ByteBuf)的引用
int idx = data.indexOf(0, data.readableBytes(),
LogEvent.SEPARATOR); // 获取该SEPARATOR的索引
String filename = data.slice(0, idx) // 提取日志消息
.toString(CharsetUtil.UTF_8);
String logMsg = data.slice(idx + 1,
data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(datagramPacket.sender(),
System.currentTimeMillis(), filename, logMsg);
out.add(event);
}
}
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append(event.getReceivedTimestamp());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
System.out.println(builder.toString());
}
}
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler( new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel)
throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
} )
.localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
// if (args.length != 1) {
// throw new IllegalArgumentException(
// "Usage: LogEventMonitor <port>");
// }
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(9999));
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync();
} finally {
monitor.stop();
}
}
}
效果图
这样就可以监听你运行程序的log信息,并将信息返回到你的客服端,是不是很nice.