自己写RPC
1,基于的:spring+protobuf+Netty+zookeeper+proxy+fastclass(反射机制)
定义使用工具图
工具图2.服务器架构
基础架构3、代码
关键点: 序列化和反序列化
package com.nestoop.org.net.rpc.cluster.util.serialize;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.objenesis.Objenesis;import org.objenesis.ObjenesisStd;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.Schema;import com.dyuproject.protostuff.runtime.RuntimeSchema;/** * 自定义格式化 * @author xbao * */public class RpcSerializationUtil {//protobuf 格式化public static class ProtoBufSerialiable{public static final Logger logger = LoggerFactory.getLogger(ProtoBufSerialiable.class); private static Map, Schema?> cachedSchemaMap = new ConcurrentHashMap(); private static Objenesis objenesis = new ObjenesisStd(true); private ProtoBufSerialiable() {} //获取schema private staticSchemagetSchema(Classclassz){ //缓存 @SuppressWarnings("unchecked")Schemaschema=(Schema) cachedSchemaMap.get(classz); //为空时,需要创建schema if(schema == null ){//创建schema schema=RuntimeSchema.createFrom(classz); if(schema !=null){ cachedSchemaMap.put(classz, schema); } } return schema; } //获取序列化的数据 public staticbyte[] serialize(T serializeObjcet){ Classclassz=(Class) serializeObjcet.getClass(); //设置缓冲区,使用512,还有个256大小的缓冲区 LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try{ //获取Schema Schemaschema =getSchema(classz); return ProtostuffIOUtil.toByteArray(serializeObjcet, schema, buffer); }catch(Exception e){ logger.debug("protobuf 序列化出现异常..................."); e.printStackTrace(); }finally{ //清除缓冲区 buffer.clear(); } return null; } //反序列化 public staticT deSerialize(byte[] bytes,Classclassz){ try{ T serializeObject=(T) objenesis.newInstance(classz); Schema schema=getSchema(classz);
ProtostuffIOUtil.mergeFrom(bytes, serializeObject, schema);
return serializeObject;
}catch(Exception e){
logger.debug("protobuf 反序列化出现异常...................");
e.printStackTrace();
}finally{
}
return null;
}
}
}
关键点:zk创建节点和删除节点
server端:
package com.nestoop.org.net.rpc.cluster.registry;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nestoop.org.net.rpc.cluster.constant.RpcClusterConstant;
import com.nestoop.org.net.rpc.cluster.watcher.RpClusterWatcher;
/**
* RPC 注册服务
* @author xbao
*
*/
public class RpcServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(RpcServiceRegistry.class);
//
private String registryAddress;
private CountDownLatch latch = new CountDownLatch(1);
public RpcServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data){
if(data !=null){
//创建zookeeper
ZooKeeper zookeeper=connectServer();
//注册到zookeeper
createNode(zookeeper,data);
}
}
/**
* 创建zookeeper
* @return
*/
private ZooKeeper connectServer() {
logger.debug("创建zookeeper....................");
ZooKeeper zk=null;
try {
zk=new ZooKeeper(registryAddress, RpcClusterConstant.ZK_SESSION_TIMEOUT, new RpClusterWatcher(latch));
} catch (IOException e) {
logger.debug("创建zookeeper出现异常 ,exception message:{}",e.getMessage());
e.printStackTrace();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return zk;
}
//创建znode
private void createNode(ZooKeeper zookeeper,String data){
logger.debug("zookeeper创建znode....................");
try{
byte[] bytes=data.getBytes();
//path
String path=zookeeper.create(RpcClusterConstant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.debug("zookeeper创建Node:(path=>data) =>({} => {})", path, data);
}catch(Exception e){
logger.debug("zookeeper创建Node.出现异常,exception message:{}",e.getMessage());
checkRootPathExits(zookeeper,RpcClusterConstant.ZK_REGISTRY_PATH,data);
}
}
private void checkRootPathExits(ZooKeeper zookeeper,String rootPath,String data){
logger.debug("Server 检查zookeeper 是否存在rootPath:{}",rootPath);
try {
//是否存在是否实时指定的rootpath
Stat stat=zookeeper.exists(rootPath, true);
logger.debug("Server 端检查zookeeper 是否存在rootPath:{}",rootPath);
if(stat ==null){
logger.debug("Server 端创建在zookeeper 的rootPath节点:{}",rootPath);
zookeeper.create(rootPath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
client端:
package com.nestoop.yelibar.org.rpc.client.registry;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ThreadLocalRandom;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.nestoop.yelibar.org.rpc.client.until.RpcClientConstant;/** * 客户端发现注册器 * @author Administrator * */public class RpcClientServiceRegistry {private static final Logger logger = LoggerFactory.getLogger(RpcClientServiceRegistry.class);private CountDownLatch latch = new CountDownLatch(1);private volatile ListdataList = new ArrayList();private String registryAddress; public RpcClientServiceRegistry() {}public RpcClientServiceRegistry(String registryAddress) {this.registryAddress = registryAddress;ZooKeeper zk = connectServer(); if (zk != null) { watchNode(zk,true); }}//发现zookeeper中的服务 public String discover() { String logStr=null; String data = null; int size = dataList.size(); logger.debug("zookeeper can used znode size: {}", size); if (size > 0) { if (size == 1) { data = dataList.get(0); logStr=String.format("zookeeper 可以使用的个数:%s,zonde的数据是:%s", size,data); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); logger.debug("zookeeper using random data: {}", data); logStr=String.format("zookeeper 可以使用的个数:%s,zonde的数据是:%s", size,data); } } logger.debug(logStr); return data; } //根据地址,客户端寻找zookeeper的实例,创建链接private ZooKeeper connectServer() {logger.info("Rpc客户端链接创建zookeeper:"); ZooKeeper zookeeper = null; try { zookeeper = new ZooKeeper(registryAddress, RpcClientConstant.ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (IOException e) { logger.error("链接创建zookeeper 出现异常:{}", e.getMessage()); } catch (InterruptedException e) {e.printStackTrace();} return zookeeper; }//监控zookeeper的znodeprivate void watchNode(final ZooKeeper zk,boolean iswatch) {logger.info("Rpc客户端监控zookeeper:");ListnodeList=null; try { if(!iswatch){ nodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH, new Watcher() { public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk,false); } } }); }else{ nodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH,true); }// ListnodeList = zk.getChildren(RpcClientConstant.ZK_REGISTRY_PATH,true);// // ListdataList = new ArrayList();
//
for (String node : nodeList) {
byte[] bytes = zk.getData(RpcClientConstant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
logger.debug("node data: {}", dataList);
this.dataList = dataList;
} catch (KeeperException e) {
logger.error("watchNode KeeperException 出现异常信息:{}", e.getMessage());
} catch (InterruptedException e) {
logger.error("watchNode InterruptedException出现异常信息:{}", e.getMessage());
e.printStackTrace();
}
}
}
动态代理:
package com.nestoop.yelibar.org.rpc.client.proxy;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterResponse;import com.nestoop.yelibar.org.rpc.client.netty.handler.RpcClientHandler;import com.nestoop.yelibar.org.rpc.client.registry.RpcClientServiceRegistry;/** * 客户端代理 * @author xbao * */public class RpcProxy {private RpcClientServiceRegistry clientRegistry;private String registryAddress;public RpcProxy(RpcClientServiceRegistry clientRegistry) {this.clientRegistry = clientRegistry;}public RpcProxy(String registryAddress) {this.registryAddress = registryAddress;}public RpcProxy(RpcClientServiceRegistry clientRegistry,String registryAddress) {this.clientRegistry = clientRegistry;this.registryAddress = registryAddress;}public RpcProxy() {}/** * 使用动态了道理模式创建实体 */@SuppressWarnings("unchecked")public T createRpcService(Class interfaceClass){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass}, new RpcProxyInvocationHandler(registryAddress,clientRegistry));
}
public static class RpcProxyInvocationHandler implements InvocationHandler{
private static final Logger logger = LoggerFactory.getLogger(RpcProxyInvocationHandler.class);
private RpcClientServiceRegistry clientRegistry;
private String registryAddress;
public RpcProxyInvocationHandler(String registryAddress,RpcClientServiceRegistry clientRegistry){
this.registryAddress=registryAddress;
this.clientRegistry=clientRegistry;
}
public Object invoke(Object proxy, Method method, Object[] parameters) throws Throwable {
//创建一个请求
RpcClusterRequest request=new RpcClusterRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(parameters);
request.setParameterTypes(method.getParameterTypes());
request.setRequestId(UUID.randomUUID().toString());
String serverAddress="";
//寻找zookeeper
if(clientRegistry != null){
serverAddress=clientRegistry.discover();
}
logger.debug("Rpc Client 从zookeeper发现服务端的调用 Server Address:{}", serverAddress);
//分割zookeeper地址,找到zookeeper
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//创建rpc client;
RpcClientHandler rpcClienthandler=new RpcClientHandler(host, port);
//发送消息
RpcClusterResponse response = rpcClienthandler.sendMessageToServer(request);
if(response == null ){
logger.debug("Rpc 调用server端返回的对象response", response);
return null;
}else{
if(response.getError() !=null){
logger.error("Rpc Client Proxy 动态代理出现错误 ,错误信息:{}", response.getError());
throw response.getError();
}else{
logger.debug("Rpc Client Proxy 非常正确 ,正确返回对象是:{}", response.getResult());
return response.getResult();
}
}
}
}
}
netty:
server 端:服务端的编码,-对返回响应进行编码 和对请求进行解码
package com.nestoop.org.net.rpc.cluster.netty.decodeorcode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterResponse;import com.nestoop.org.net.rpc.cluster.util.serialize.RpcSerializationUtil.ProtoBufSerialiable;/** * 使用Netty编码功能 集成MessageToByteEncoder * @author xbao * */public class RpcClusterEnCoder extends MessageToByteEncoder{
public static final Logger logger = LoggerFactory.getLogger(RpcClusterEnCoder.class);
private Class classz;
public RpcClusterEnCoder(Class classz) {
this.classz = classz;
}
@Override
protected void encode(ChannelHandlerContext handlerContext, RpcClusterResponse inobject, ByteBuf outBuf)throws Exception {
logger.debug("服务端编码对象:{}",inobject);
if(classz.isInstance(inobject)){
byte[] dataByteArray=ProtoBufSerialiable.serialize(inobject);
outBuf.writeInt(dataByteArray.length);
outBuf.writeBytes(dataByteArray);
}
}
}
客户端解码和编码:请求编码,服务端请求响应解码
package com.nestoop.yelibar.org.rpc.client.netty.decodeorcode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import com.nestoop.org.net.rpc.cluster.entity.RpcClusterRequest;import com.nestoop.yelibar.org.rpc.client.serialize.RpcSerializationUtil.ProtoBufSerialiable;/** * 使用Netty编码功能 集成MessageToByteEncoder * @author xbao * */public class RpcClusterEnCoder extends MessageToByteEncoder{
public static final Logger logger = LoggerFactory.getLogger(RpcClusterEnCoder.class);
private Class classz;
public RpcClusterEnCoder(Class classz) {
this.classz = classz;
}
@Override
protected void encode(ChannelHandlerContext handlerContext, RpcClusterRequest inobject, ByteBuf outBuf)throws Exception {
logger.debug("客户端请求编码对象:{}",inobject);
if(classz.isInstance(inobject)){
byte[] dataByteArray=ProtoBufSerialiable.serialize(inobject);
outBuf.writeInt(dataByteArray.length);
outBuf.writeBytes(dataByteArray);
}
}
}