软件工程师成长日记

用MINA实现UDP通信的例子

2018-08-13  本文已影响1人  麦克劳林

概述:

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。

UDP通信实现:

1、pom.xml,需要依赖:

               <!-- mina核心源码 -->
    <dependency>
        <groupId>org.apache.mina</groupId>
        <artifactId>mina-core</artifactId>
        <version>2.0.16</version>
    </dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-jdk14</artifactId>
  <version>1.7.7</version>
  <scope>test</scope>
</dependency>

2、MinaServer.java

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.ExpiringSessionRecycler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;

public class MinaServer {
static NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
public static void main(String[] args) throws Exception {
    acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
    // 设置handler,把这个IoHandler 注册到IoService
    acceptor.setHandler(new MinaServerHandler());
    acceptor.setSessionRecycler(new ExpiringSessionRecycler(15 * 1000));
    DatagramSessionConfig dcfg = acceptor.getSessionConfig();
    dcfg.setReuseAddress(true);
    dcfg.setReceiveBufferSize(1024);
    dcfg.setSendBufferSize(1024);
     //绑定端口
    acceptor.bind(new InetSocketAddress(2222));
    acceptor.bind(new InetSocketAddress(6000));

    System.out.println("Server Listening...");
}

public static Boolean SendMsg(String message, String sessionip, int port) {
    try {
        SocketAddress ra = new InetSocketAddress(sessionip, port);
        SocketAddress loc = new InetSocketAddress(2222);
        IoSession MySession = acceptor.newSession(ra, loc);
        byte[] res = hex2Byte(message);
        IoBuffer buf = IoBuffer.wrap(res);
        WriteFuture future = MySession.write(buf, ra);
        future.awaitUninterruptibly(100);
        if (future.isWritten()) {
            MySession.closeOnFlush();
            return true;
        } else {
            MySession.closeOnFlush();
            return false;
        }
    } catch (Exception ex) {
        return false;
    }
}

public static byte[] hex2Byte(String hex) {
    String digital = "0123456789ABCDEF";
    char[] hex2char = hex.toCharArray();
    byte[] bytes = new byte[hex.length() / 2];
    int temp;
    for (int i = 0; i < bytes.length; i++) {
        temp = digital.indexOf(hex2char[2 * i]) * 16;
        temp += digital.indexOf(hex2char[2 * i + 1]);
        bytes[i] = (byte) (temp & 0xff);
    }
    return bytes;
  }
}

3、MinaServerHandler.java

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;


public class MinaServerHandler extends IoHandlerAdapter {

public static String HEX = "0123456789ABCDEF";
public static int i = 0;
public static final CharsetDecoder decoder = (Charset.forName("UTF-8")).newDecoder();

@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    cause.printStackTrace();
    super.exceptionCaught(session, cause);
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
    //IoBuffer buffer = (IoBuffer) message;
    //byte[] bb = buffer.array();
    //String msg = bytesToHex(bb);
    System.out.println("i get msg:"+message.toString() + " ======> " + i++);
}

@Override
public void messageSent(IoSession session, Object message) throws Exception {
    super.messageSent(session, message);
}

public static String bytesToHex(byte[] bytes) {
    StringBuffer sb = new StringBuffer(bytes.length * 2);
    String Hex;
    for (byte b : bytes) {
        sb.append(HEX.charAt((b >> 4) & 0x0f));
        sb.append(HEX.charAt(b & 0x0f));
    }
    Hex=sb.toString();
    return Hex;
}
}

方法解释:

(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。

(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。

上一篇下一篇

猜你喜欢

热点阅读