java 编程

Netty 模拟心跳通信

2018-02-22  本文已影响0人  真海ice

在长连接场景下有很多发送心跳测试的需求,如服务注册与实现等。

一 、服务器端

ServerNetty:心跳服务端
ServerHeartBeatHandler:处理某个客户端的心跳通信

package com.test.thread.netty.heartBeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;

/**
 * netty 模拟发送心跳的服务器端
 * @author zhb
 */
public class ServerNetty {
    
    private int port;
    
    public ServerNetty(int port){
        this.port = port;
    }
    
    // netty 服务端启动
    public void action() throws InterruptedException{
        
        // 用来接收进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // nio服务的启动类
            ServerBootstrap sbs = new ServerBootstrap();
            // 配置nio服务参数
            sbs.group(bossGroup, workerGroup)
               .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
               .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
               .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
               .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        
                        // marshalling 序列化对象的解码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                        // marshalling 序列化对象的编码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                        
                        // 处理接收到的请求
                        socketChannel.pipeline().addLast(new ServerHeartBeatHandler()); // 这里相当于过滤器,可以配置多个
                    }
               });
            
            System.err.println("server 开启--------------");
            // 绑定端口,开始接受链接
            ChannelFuture cf = sbs.bind(port).sync();
            
            // 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
            cf.channel().closeFuture().sync();
        } finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }       
    }
        
    // 开启netty服务线程
    public static void main(String[] args) throws InterruptedException {
        new ServerNetty(Constant.serverSocketPort).action();
    }   
    
}

package com.test.thread.netty.heartBeat;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.HashMap;

/**
 * 处理具体的一个客户端的心跳处理
 * @author zhb
 */
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {

    // 允许接入的认证信息
    private static HashMap<String, Object> auth_map = new HashMap<String, Object>();
    
    private static final String SUCCESS_KEY = "auth_success_key";
    
    static{
        // 可配置多个
        auth_map.put("127.0.0.1", "1234");
        // 192.168.56.1
        auth_map.put("192.168.56.1", "1234");
    }
    
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        System.out.println("--------------------------------------------");
        // 如果信息是字符串类型,就去做认证
        if(msg instanceof String){
            auth(ctx, msg);
        // 认证通过后的心跳信息   
        }else if(msg instanceof RequestInfo){
            
            handlerHeartBeatInfo(ctx, msg);
        }else{
            ctx.writeAndFlush("info error!").addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * 处理检测客户端心跳信息
     * @param msg
     * @param msg2 
     */
    private void handlerHeartBeatInfo(ChannelHandlerContext ctx, Object msg) {
        
        RequestInfo info = (RequestInfo) msg;
        System.out.println("--------------------------------------------");
        System.out.println("当前主机ip为: " + info.getIp());
        System.out.println("当前主机cpu情况: ");
        HashMap<String, Object> cpu = info.getCpuPercMap();
        System.out.println("总使用率: " + cpu.get("combined"));
        System.out.println("用户使用率: " + cpu.get("user"));
        System.out.println("系统使用率: " + cpu.get("sys"));
        System.out.println("等待率: " + cpu.get("wait"));
        System.out.println("空闲率: " + cpu.get("idle"));
        
        System.out.println("当前主机memory情况: ");
        HashMap<String, Object> memory = info.getMemoryMap();
        System.out.println("内存总量: " + memory.get("total"));
        System.out.println("当前内存使用量: " + memory.get("used"));
        System.out.println("当前内存剩余量: " + memory.get("free"));
        // 返回心跳信息接收成功   
        ctx.writeAndFlush("info received!");
    }
    
    /**
     * 验证客户端的信息
     * @param ctx
     * @param msg
     */
    private void auth(ChannelHandlerContext ctx, Object msg) {
        
        String[]  authStr = ((String)msg).split(",");   
        String authKey = (String) auth_map.get(authStr[0]);     
        // 请求的ip对应的key和服务端的是否一致
        if(authKey != null && authKey.equals(authStr[1])){
            ctx.writeAndFlush(SUCCESS_KEY);
        }else{
            // 返回认证失败
            ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
        }
    }
        
    // 数据读取完毕的处理
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.err.println("服务端读取数据完毕");
    }
    
    // 出现异常的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("server 读取数据出现异常");
        ctx.close();
    }
        
}

二、客户端

ClientNetty:心跳测试客户端
ClientHeartBeatHandler:客户端心跳测试处理类
RequestInfo:发送心跳信息的实体类
HeartBeatTask:发送心跳信息任务类

package com.test.thread.netty.heartBeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.UnsupportedEncodingException;

import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;

/**
 * 客户端发送请求
 * @author zhb
 *
 */
public class ClientNetty {
    
    // 要请求的服务器的ip地址
    private String ip;
    // 服务器的端口
    private int port;
    
    public ClientNetty(String ip, int port){
        this.ip = ip;
        this.port = port;
    }
    
    // 请求端主题
    private void action() throws InterruptedException, UnsupportedEncodingException {
        
        EventLoopGroup bossGroup = new NioEventLoopGroup();     
        Bootstrap bs = new Bootstrap();
        
        bs.group(bossGroup)
          .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  
                    // marshalling 序列化对象的解码
                    socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                    // marshalling 序列化对象的编码
                    socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                  
                    // 处理来自服务端的响应信息
                    socketChannel.pipeline().addLast(new ClientHeartBeatHandler());
              }
         });
        
        // 客户端开启
        ChannelFuture cf = bs.connect(ip, port).sync();     
        // 等待直到连接中断
        cf.channel().closeFuture().sync();      
        bossGroup.shutdownGracefully();
    }
        
    public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
        new ClientNetty("127.0.0.1", Constant.serverSocketPort).action();
    }
        
}

package com.test.thread.netty.heartBeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
 * 处理客户端的心跳测试
 * @author zhb
 *
 */
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
    
    private InetAddress addr;
    
    private static final String SUCCESS_KEY = "auth_success_key";
    
    private ScheduledFuture<?> heartBeat;
    
    // 心跳发送执行
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 

    //当channel已激活就执行的方法
    public void channelActive(ChannelHandlerContext ctx) throws UnknownHostException{
        
        
        addr = InetAddress.getLocalHost();
        String ip = addr.getHostAddress();
        String key = "1234";
        
        // 相当于认证证书
        String auth = ip + "," + key;
        System.err.println("client 发送认证给你信息: " + auth);
        // 首先向服务器发送认证
        ctx.writeAndFlush(auth);
        
    }
    
    // 读取服务端响应的信息
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        try {
            if(msg instanceof String){
                System.err.println("client 收到服务端的响应信息:" +(String)msg);
                if(((String)msg).equals(SUCCESS_KEY)){
                    // 如果服务器端认证成功,开始执行心跳信息 每5秒执行一次
                    heartBeat = scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx, addr), 0, 5, TimeUnit.SECONDS);
                }
            }else{
                System.err.println(msg);
            }
        } finally {
            // 没有返回信息,要释放msg
            ReferenceCountUtil.safeRelease(msg);
        }
    }

    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.err.println("服务端出现异常");
        
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }   

    // 数据读取完毕的处理
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.err.println("服务端读取数据完毕");
    }
        
}

package com.test.thread.netty.heartBeat;

import java.net.InetAddress;
import java.util.HashMap;

import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.SigarException;

import io.netty.channel.ChannelHandlerContext;
/**
 * 客户端发送一个心跳信息的线程
 * @author zhb
 *
 */
public class HeartBeatTask implements Runnable {
    
    private ChannelHandlerContext ctx;  
    private InetAddress addr;
    
    public HeartBeatTask(ChannelHandlerContext ctx, InetAddress addr){
        this.ctx = ctx;
        this.addr = addr;
    }

    @Override
    public void run() {     
        try {
            RequestInfo reqInfo = new RequestInfo();
            reqInfo.setIp(addr.getHostAddress());
            
            // 该类可以自行检查本地的系统相关参数,具体信息可以百度
            Sigar sigar = new Sigar();
            //cpu prec
            CpuPerc cpuPerc = sigar.getCpuPerc();
            HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
            cpuPercMap.put("combined", cpuPerc.getCombined());
            cpuPercMap.put("user", cpuPerc.getUser());
            cpuPercMap.put("sys", cpuPerc.getSys());
            cpuPercMap.put("wait", cpuPerc.getWait());
            cpuPercMap.put("idle", cpuPerc.getIdle());
            
            // memory
            Mem mem = sigar.getMem();
            HashMap<String, Object> memoryMap = new HashMap<String, Object>();
            memoryMap.put("total", mem.getTotal() / 1024L);
            memoryMap.put("used", mem.getUsed() / 1024L);
            memoryMap.put("free", mem.getFree() / 1024L);
            
            reqInfo.setCpuPercMap(cpuPercMap);
            reqInfo.setMemoryMap(memoryMap);
            // 发送心跳信息
            ctx.writeAndFlush(reqInfo);
            
        } catch (SigarException e) {
            e.printStackTrace();
        }
        
    }
    

}

package com.test.thread.netty.heartBeat; 

import java.io.Serializable;
import java.util.HashMap;
/**
 * 心跳信息实体类
 * @author zhb
 * 
 */
public class RequestInfo implements Serializable {

    private String ip ;
    // cpu的一些信息
    private HashMap<String, Object> cpuPercMap ;
    // 内存的一些信息
    private HashMap<String, Object> memoryMap;
    //.. other field
    
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public HashMap<String, Object> getCpuPercMap() {
        return cpuPercMap;
    }
    public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
        this.cpuPercMap = cpuPercMap;
    }
    public HashMap<String, Object> getMemoryMap() {
        return memoryMap;
    }
    public void setMemoryMap(HashMap<String, Object> memoryMap) {
        this.memoryMap = memoryMap;
    }   
    
}

上一篇下一篇

猜你喜欢

热点阅读