用MINA实现UDP通信的例子
概述:
Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。
UDP通信实现:
1、pom.xml,需要依赖:
<!-- mina核心源码 -->
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.7</version>
<scope>test</scope>
</dependency>
2、MinaServer.java
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.ExpiringSessionRecycler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
public class MinaServer {
static NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
public static void main(String[] args) throws Exception {
acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
// 设置handler,把这个IoHandler 注册到IoService
acceptor.setHandler(new MinaServerHandler());
acceptor.setSessionRecycler(new ExpiringSessionRecycler(15 * 1000));
DatagramSessionConfig dcfg = acceptor.getSessionConfig();
dcfg.setReuseAddress(true);
dcfg.setReceiveBufferSize(1024);
dcfg.setSendBufferSize(1024);
//绑定端口
acceptor.bind(new InetSocketAddress(2222));
acceptor.bind(new InetSocketAddress(6000));
System.out.println("Server Listening...");
}
public static Boolean SendMsg(String message, String sessionip, int port) {
try {
SocketAddress ra = new InetSocketAddress(sessionip, port);
SocketAddress loc = new InetSocketAddress(2222);
IoSession MySession = acceptor.newSession(ra, loc);
byte[] res = hex2Byte(message);
IoBuffer buf = IoBuffer.wrap(res);
WriteFuture future = MySession.write(buf, ra);
future.awaitUninterruptibly(100);
if (future.isWritten()) {
MySession.closeOnFlush();
return true;
} else {
MySession.closeOnFlush();
return false;
}
} catch (Exception ex) {
return false;
}
}
public static byte[] hex2Byte(String hex) {
String digital = "0123456789ABCDEF";
char[] hex2char = hex.toCharArray();
byte[] bytes = new byte[hex.length() / 2];
int temp;
for (int i = 0; i < bytes.length; i++) {
temp = digital.indexOf(hex2char[2 * i]) * 16;
temp += digital.indexOf(hex2char[2 * i + 1]);
bytes[i] = (byte) (temp & 0xff);
}
return bytes;
}
}
3、MinaServerHandler.java
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
public class MinaServerHandler extends IoHandlerAdapter {
public static String HEX = "0123456789ABCDEF";
public static int i = 0;
public static final CharsetDecoder decoder = (Charset.forName("UTF-8")).newDecoder();
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
super.exceptionCaught(session, cause);
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
//IoBuffer buffer = (IoBuffer) message;
//byte[] bb = buffer.array();
//String msg = bytesToHex(bb);
System.out.println("i get msg:"+message.toString() + " ======> " + i++);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
}
public static String bytesToHex(byte[] bytes) {
StringBuffer sb = new StringBuffer(bytes.length * 2);
String Hex;
for (byte b : bytes) {
sb.append(HEX.charAt((b >> 4) & 0x0f));
sb.append(HEX.charAt(b & 0x0f));
}
Hex=sb.toString();
return Hex;
}
}
方法解释:
(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。
(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。
(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。