技术博客我的心里路程Java

自己写RPC

2018-01-05  本文已影响18人  老生住长亭

1,基于的:spring+protobuf+Netty+zookeeper+proxy+fastclass(反射机制)

定义使用工具图

工具图

2.服务器架构

基础架构

3、代码

关键点: 序列化和反序列化

package com.nestoop.org.net.rpc.cluster.util.serialize;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;import org.objenesis.ObjenesisStd;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.Schema;import com.dyuproject.protostuff.runtime.RuntimeSchema;/** * 自定义格式化 * @author xbao * */public class RpcSerializationUtil {//protobuf 格式化public static class ProtoBufSerialiable{public static final Logger logger = LoggerFactory.getLogger(ProtoBufSerialiable.class); private static Map, Schema?> cachedSchemaMap = new ConcurrentHashMap(); private static Objenesis objenesis = new ObjenesisStd(true); private ProtoBufSerialiable() {} //获取schema private staticSchemagetSchema(Classclassz){ //缓存 @SuppressWarnings("unchecked")Schemaschema=(Schema) cachedSchemaMap.get(classz); //为空时,需要创建schema if(schema == null ){//创建schema schema=RuntimeSchema.createFrom(classz); if(schema !=null){ cachedSchemaMap.put(classz, schema); } } return schema; } //获取序列化的数据 public staticbyte[] serialize(T serializeObjcet){ Classclassz=(Class) serializeObjcet.getClass(); //设置缓冲区,使用512,还有个256大小的缓冲区 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try{ //获取Schema Schemaschema =getSchema(classz); return ProtostuffIOUtil.toByteArray(serializeObjcet, schema, buffer); }catch(Exception e){ logger.debug("protobuf 序列化出现异常..................."); e.printStackTrace(); }finally{ //清除缓冲区 buffer.clear(); } return null; } //反序列化 public staticT deSerialize(byte[] bytes,Classclassz){ try{ T serializeObject=(T) objenesis.newInstance(classz); Schema schema=getSchema(classz);

ProtostuffIOUtil.mergeFrom(bytes, serializeObject, schema);

return serializeObject;

}catch(Exception e){

logger.debug("protobuf 反序列化出现异常...................");

e.printStackTrace();

}finally{

}

return null;

}

}

}

关键点:zk创建节点和删除节点

server端:

package com.nestoop.org.net.rpc.cluster.registry;

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.nestoop.org.net.rpc.cluster.constant.RpcClusterConstant;

import com.nestoop.org.net.rpc.cluster.watcher.RpClusterWatcher;

/**

* RPC 注册服务

* @author xbao

*

*/

public class RpcServiceRegistry {

private static final Logger logger = LoggerFactory.getLogger(RpcServiceRegistry.class);

//

private String registryAddress;

private CountDownLatch latch = new CountDownLatch(1);

public RpcServiceRegistry(String registryAddress) {

this.registryAddress = registryAddress;

}

public  void  register(String data){

if(data !=null){

//创建zookeeper

ZooKeeper zookeeper=connectServer();

//注册到zookeeper

createNode(zookeeper,data);

}

}

/**

* 创建zookeeper

* @return

*/

private ZooKeeper connectServer() {

logger.debug("创建zookeeper....................");

ZooKeeper zk=null;

try {

zk=new ZooKeeper(registryAddress, RpcClusterConstant.ZK_SESSION_TIMEOUT, new RpClusterWatcher(latch));

} catch (IOException e) {

logger.debug("创建zookeeper出现异常 ,exception message:{}",e.getMessage());

e.printStackTrace();

}

try {

latch.await();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return zk;

}

//创建znode

private void createNode(ZooKeeper zookeeper,String data){

logger.debug("zookeeper创建znode....................");

try{

byte[]  bytes=data.getBytes();

//path

String path=zookeeper.create(RpcClusterConstant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

logger.debug("zookeeper创建Node:(path=>data) =>({} => {})", path, data);

}catch(Exception e){

logger.debug("zookeeper创建Node.出现异常,exception message:{}",e.getMessage());

checkRootPathExits(zookeeper,RpcClusterConstant.ZK_REGISTRY_PATH,data);

}

}

private void checkRootPathExits(ZooKeeper zookeeper,String rootPath,String data){

logger.debug("Server 检查zookeeper 是否存在rootPath:{}",rootPath);

try {

//是否存在是否实时指定的rootpath

Stat stat=zookeeper.exists(rootPath, true);

logger.debug("Server 端检查zookeeper 是否存在rootPath:{}",rootPath);

if(stat ==null){

logger.debug("Server 端创建在zookeeper 的rootPath节点:{}",rootPath);

zookeeper.create(rootPath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

client端:

package com.nestoop.yelibar.org.rpc.client.registry;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.nestoop.yelibar.org.rpc.client.until.RpcClientConstant;/** * 客户端发现注册器 * @author Administrator * */public class RpcClientServiceRegistry {private static final Logger logger = LoggerFactory.getLogger(RpcClientServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private volatile ListdataList = new ArrayList();private String registryAddress; public RpcClientServiceRegistry() {}public RpcClientServiceRegistry(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer(); if (zk != null) { watchNode(zk,true); }}//发现zookeeper中的服务 public String discover() { String logStr=null; String data = null; int size = dataList.size(); logger.debug("zookeeper can used znode size: {}", size); if (size > 0) { if (size == 1) { data = dataList.get(0); logStr=String.format("zookeeper 可以使用的个数:%s,zonde的数据是:%s", size,data); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); logger.debug("zookeeper using random data: {}", data); logStr=String.format("zookeeper 可以使用的个数:%s,zonde的数据是:%s", size,data); } } logger.debug(logStr); return data; } //根据地址,客户端寻找zookeeper的实例,创建链接private ZooKeeper connectServer() {logger.info("Rpc客户端链接创建zookeeper:"); ZooKeeper zookeeper = null; try { zookeeper = new ZooKeeper(registryAddress, RpcClientConstant.ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (IOException e) { logger.error("链接创建zookeeper 出现异常:{}", e.getMessage()); } catch (InterruptedException e) {e.printStackTrace();} return zookeeper; }//监控zookeeper的znodeprivate void watchNode(final ZooKeeper zk,boolean iswatch) {logger.info("Rpc客户端监控zookeeper:");ListnodeList=null; try { if(!iswatch){ nodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH, new Watcher() { public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk,false); } } }); }else{ nodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH,true); }// ListnodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH,true);// // ListdataList = new ArrayList();

//           

            for (String node : nodeList) {

                byte[] bytes = zk.getData(RpcClientConstant.ZK_REGISTRY_PATH + "/" + node, false, null);

                dataList.add(new String(bytes));

            }

            logger.debug("node data: {}", dataList);

            this.dataList = dataList;

        } catch (KeeperException e) {

        logger.error("watchNode KeeperException 出现异常信息:{}", e.getMessage());

        } catch (InterruptedException e) {

        logger.error("watchNode InterruptedException出现异常信息:{}", e.getMessage());

e.printStackTrace();

}

    }

}

动态代理:

package com.nestoop.yelibar.org.rpc.client.proxy;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterResponse;import com.nestoop.yelibar.org.rpc.client.netty.handler.RpcClientHandler;import com.nestoop.yelibar.org.rpc.client.registry.RpcClientServiceRegistry;/** * 客户端代理 * @author xbao * */public class RpcProxy {private RpcClientServiceRegistry clientRegistry;private String registryAddress;public RpcProxy(RpcClientServiceRegistry clientRegistry) {this.clientRegistry = clientRegistry;}public RpcProxy(String registryAddress) {this.registryAddress = registryAddress;}public RpcProxy(RpcClientServiceRegistry clientRegistry,String registryAddress) {this.clientRegistry = clientRegistry;this.registryAddress = registryAddress;}public RpcProxy() {}/** * 使用动态了道理模式创建实体 */@SuppressWarnings("unchecked")public T createRpcService(Class interfaceClass){

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass}, new RpcProxyInvocationHandler(registryAddress,clientRegistry));

}

public static class RpcProxyInvocationHandler implements InvocationHandler{

private static final Logger logger = LoggerFactory.getLogger(RpcProxyInvocationHandler.class);

private RpcClientServiceRegistry clientRegistry;

private String registryAddress;

public RpcProxyInvocationHandler(String registryAddress,RpcClientServiceRegistry clientRegistry){

this.registryAddress=registryAddress;

this.clientRegistry=clientRegistry;

}

public Object invoke(Object proxy, Method method, Object[] parameters) throws Throwable {

//创建一个请求

RpcClusterRequest request=new RpcClusterRequest();

request.setClassName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setParameters(parameters);

request.setParameterTypes(method.getParameterTypes());

request.setRequestId(UUID.randomUUID().toString());

String serverAddress="";

//寻找zookeeper

if(clientRegistry != null){

serverAddress=clientRegistry.discover();

}

logger.debug("Rpc Client 从zookeeper发现服务端的调用 Server Address:{}", serverAddress);

//分割zookeeper地址,找到zookeeper

String[] array = serverAddress.split(":");

            String host = array[0];

            int port = Integer.parseInt(array[1]);

            //创建rpc client;

            RpcClientHandler rpcClienthandler=new RpcClientHandler(host, port);

            //发送消息

            RpcClusterResponse response = rpcClienthandler.sendMessageToServer(request);

if(response == null ){

logger.debug("Rpc 调用server端返回的对象response", response);

return null;

}else{

            if(response.getError() !=null){

            logger.error("Rpc Client Proxy 动态代理出现错误 ,错误信息:{}", response.getError());

            throw response.getError();

            }else{

            logger.debug("Rpc Client Proxy 非常正确 ,正确返回对象是:{}", response.getResult());

            return response.getResult();

            }

}

}

}

}

netty:

server 端:服务端的编码,-对返回响应进行编码 和对请求进行解码

package com.nestoop.org.net.rpc.cluster.netty.decodeorcode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterResponse;import com.nestoop.org.net.rpc.cluster.util.serialize.RpcSerializationUtil.ProtoBufSerialiable;/** * 使用Netty编码功能 集成MessageToByteEncoder * @author xbao * */public class RpcClusterEnCoder extends MessageToByteEncoder{

public static final Logger logger = LoggerFactory.getLogger(RpcClusterEnCoder.class);

private Class classz;

public RpcClusterEnCoder(Class classz) {

this.classz = classz;

}

@Override

protected void encode(ChannelHandlerContext handlerContext, RpcClusterResponse inobject, ByteBuf outBuf)throws Exception {

logger.debug("服务端编码对象:{}",inobject);

if(classz.isInstance(inobject)){

byte[] dataByteArray=ProtoBufSerialiable.serialize(inobject);

outBuf.writeInt(dataByteArray.length);

outBuf.writeBytes(dataByteArray);

}

}

}

客户端解码和编码:请求编码,服务端请求响应解码

package com.nestoop.yelibar.org.rpc.client.netty.decodeorcode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.yelibar.org.rpc.client.serialize.RpcSerializationUtil.ProtoBufSerialiable;/** * 使用Netty编码功能 集成MessageToByteEncoder * @author xbao * */public class RpcClusterEnCoder extends MessageToByteEncoder{

public static final Logger logger = LoggerFactory.getLogger(RpcClusterEnCoder.class);

private Class classz;

public RpcClusterEnCoder(Class classz) {

this.classz = classz;

}

@Override

protected void encode(ChannelHandlerContext handlerContext, RpcClusterRequest inobject, ByteBuf outBuf)throws Exception {

logger.debug("客户端请求编码对象:{}",inobject);

if(classz.isInstance(inobject)){

byte[] dataByteArray=ProtoBufSerialiable.serialize(inobject);

outBuf.writeInt(dataByteArray.length);

outBuf.writeBytes(dataByteArray);

}

}

}

上一篇下一篇

猜你喜欢

热点阅读