基于BIO的RPC实现(2.0)带服务注册中心
2019-07-08 本文已影响0人
吗丁啉要餐前吃
在1.0版本的基础上,这次将服务注册中心抽取出来作为一个单独的module.
先看服务注册中心的实现
1.创建用于保存服务信息的实体
public class RegistServiceEntity implements Serializable {
private String host;
private int port;
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public RegistServiceEntity(String host, int port) {
this.host = host;
this.port = port;
}
}
2.创建服务注册中心框架类
package com.zhang.frame;
import com.zhang.entity.RegistServiceEntity;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Description: 服务注册中心
*/
public class RpcRegistCenter {
//创建线程池用于执行socket连接线程(不建议这样创建线程)
private ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//端口
private int port;
//服务缓存
private final static Map<String, Set<RegistServiceEntity>> serviceHolder=new HashMap<>();
public RpcRegistCenter(int port) {
this.port = port;
}
/**
* 创建线程类供服务注册中心socket连接使用
* */
private static class SocketTask implements Runnable{
private Socket socket;
public SocketTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())
) {
//是否是"服务注册"
Boolean isRegist=inputStream.readBoolean();
if(isRegist){
registService(inputStream);
outputStream.writeBoolean(true);
outputStream.flush();
}else{
getService(inputStream, outputStream);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取service
* */
private void getService(ObjectInputStream inputStream, ObjectOutputStream outputStream) throws IOException {
String serviceName=inputStream.readUTF();
Set<RegistServiceEntity> addressSet=serviceHolder.get(serviceName);
outputStream.writeUTF(serviceName);
outputStream.writeObject(addressSet);
outputStream.flush();
System.out.println("Service: "+serviceName +" has been call by client;");
}
/**
* 注册服务
* */
private void registService(ObjectInputStream inputStream) throws IOException {
//读取服务名
String serviceName=inputStream.readUTF();
//读取服务host
String host=inputStream.readUTF();
//读取服务端口
int port=inputStream.readInt();
//获取服务注册地址
RegistServiceEntity address=new RegistServiceEntity(host,port);
Set<RegistServiceEntity> serviceEntities=serviceHolder.get(serviceName);
if(serviceEntities==null){
serviceEntities=new HashSet<>();
}
serviceEntities.add(address);
serviceHolder.put(serviceName,serviceEntities);
System.out.println("Service: "+host+":"+port+"/"+serviceName+"has been regist.");
}
}
/**
* 开启服务
*/
public void startService() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("注册中心启动");
try {
while (true){
Socket socket=serverSocket.accept();
executorService.execute(new SocketTask(socket));
}
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
/**
* 启动main类
*/
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcRegistCenter rpcRegistCenter=new RpcRegistCenter(8888);
rpcRegistCenter.startService();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
服务端的实现
1.服务端框架类
package com.zhang.frame;
import java.io.*;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RpcServerFrame {
private static ExecutorService executorService
= Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
//RPCServer缓存service
private static final Map<String, Class<?>> serviceHolder = new HashMap<>();
//rpc服务的端口号
private int port;
public RpcServerFrame(int port) {
this.port = port;
}
//把服务注册到服务中心
public void registServerToCenter(Class<?> serviceInterface, Class<?> impl) throws IOException {
Socket socket = new Socket();
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
//连接服务注册中心serverSocket
socket.connect(new InetSocketAddress("127.0.0.1", 8888));
outputStream = new ObjectOutputStream(socket.getOutputStream());
inputStream = new ObjectInputStream(socket.getInputStream());
//本地缓存服务名对应的实现类
serviceHolder.put(serviceInterface.getName(), impl);
outputStream.writeBoolean(true);
outputStream.writeUTF(serviceInterface.getName());
outputStream.writeUTF("127.0.0.1");
outputStream.writeInt(port);
outputStream.flush();
if (inputStream.readBoolean()) {
System.out.println(serviceInterface.getName() + "regist success");
} else {
System.out.println(serviceInterface.getName() + "regist failed");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socket != null) socket.close();
if (outputStream != null) outputStream.close();
if (inputStream != null) inputStream.close();
}
}
//处理服务请求任务
private static class ServerTask implements Runnable {
private Socket client = null;
public ServerTask(Socket client) {
this.client = client;
}
@Override
public void run() {
try (ObjectInputStream inputStream =
new ObjectInputStream(client.getInputStream());
ObjectOutputStream outputStream =
new ObjectOutputStream(client.getOutputStream())) {
//方法所在类名接口名
String serviceName = inputStream.readUTF();
//方法的名字
String methodName = inputStream.readUTF();
//方法的入参类型
Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
//方法入参的值
Object[] args = (Object[]) inputStream.readObject();
Class serviceClass = serviceHolder.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " Not Found");
}
Method method = serviceClass.getMethod(methodName, paramTypes);
Object result = method.invoke(serviceClass.newInstance(), args);
outputStream.writeObject(result);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//启动RPC服务
public void startService() throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("RPC server on:" + port + ":运行");
try {
while (true) {
executorService.execute(new ServerTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
}
2.创建具体service类和序列化实体类
与1.0创建的一样,不罗列代码
3.rpc调用
public class SendSmsRpc {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcServerFrame serverFrame = new RpcServerFrame(9001);
serverFrame.registServerToCenter(SendSms.class, SendSmsImpl.class);
serverFrame.startService();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
客户端代码实现
1.客户端rpc框架代码
package com.zhang.frame;
import com.zhang.entity.RegistServiceEntity;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Random;
import java.util.Set;
public class RpcClientFrame {
//获取远程代理对象
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface){
InetSocketAddress addr=new InetSocketAddress("127.0.0.1",8888);
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
new Class<?>[]{serviceInterface},
new DynProxy(serviceInterface,addr));
}
//动态代理类
public static class DynProxy implements InvocationHandler {
private final Class<?> serviceInterface;
private final InetSocketAddress socketAddress;
//远程服务在本地的缓存
private RegistServiceEntity [] serviceArray;
public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
this.serviceInterface = serviceInterface;
this.socketAddress = socketAddress;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//先从服务注册中心取服务
Socket socket=null;
ObjectOutputStream outputStream=null;
ObjectInputStream inputStream=null;
RegistServiceEntity serviceEntity;
try {
socket=new Socket();
socket.connect(socketAddress);
outputStream=new ObjectOutputStream(socket.getOutputStream());
//false代表从服务注册中心获取服务
outputStream.writeBoolean(false);
outputStream.writeUTF(serviceInterface.getName());
outputStream.flush();
inputStream=new ObjectInputStream(socket.getInputStream());
System.out.println("Get services from registered center success:"+inputStream.readUTF());
Set<RegistServiceEntity> result = (Set<RegistServiceEntity>) inputStream.readObject();
serviceArray=new RegistServiceEntity[result.size()];
result.toArray(serviceArray);
} catch (IOException e) {
e.printStackTrace();
}finally {
if(socket!=null){
socket.close();
}
if(outputStream!=null){
outputStream.close();
}
if(inputStream!=null){
inputStream.close();
}
}
//从缓存列表中随机取一个服务器远程端口
Random random=new Random();
int index=random.nextInt(serviceArray.length);
InetSocketAddress socketAddr=new InetSocketAddress(serviceArray[index].getHost(),serviceArray[index].getPort());
//调用rpc服务接口
try {
socket=new Socket();
socket.connect(socketAddr);
outputStream=new ObjectOutputStream(socket.getOutputStream());
//方法所在的类
outputStream.writeUTF(serviceInterface.getName());
//方法名
outputStream.writeUTF(method.getName());
//方法参数类型
outputStream.writeObject(method.getParameterTypes());
//方法参数
outputStream.writeObject(args);
outputStream.flush();
inputStream=new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
}finally {
if(socket!=null) socket.close();
if(outputStream!=null) outputStream.close();
if(inputStream!=null) inputStream.close();
}
}
}
}
2.创建接口对应服务端,创建序列化实体类
复制服务端service接口,entity;复制注册中心entity
3.客户端调用
public class Client {
public static void main(String[] args) {
UserInfo userInfo = new UserInfo("张三","1359999999");
SendSms sendSms = RpcClientFrame.getRemoteProxyObj(SendSms.class);
System.out.println("Send mail: "+ sendSms.sendMail(userInfo));
}
}