使用socket实现RPC远程调用升级版

2020-06-22  本文已影响0人  奋斗的韭菜汪

客户端改造:将spring项目修改成springboot项目,使用自定义注解WzxAutowired对服务端订单接口进行注入,通过spring前置处理器BeanPostProcessor.postProcessBeforeInitialization将所有加了WzxAutowired的注解的bean,注入spring容器,然后针对加了WzxAutowired的注解bean,设置了一个代理,这个代理实现是RemoteInvocationHandler,RemoteInvocationHandler实现远程调用order-service。
服务端改造:使用WzxRemoteService注解标记远程调用的接口,通过该spring后置处理器
BeanPostProcessor.postProcessAfterInitialization将所有加了WzxRemoteService的注解的bean,注入spring容器,并通过Mediator发布到远程公客户端调用
客户端:

@Component
public class AutowiredInvokeProxy implements BeanPostProcessor {
    @Autowired
    RemoteInvocationHandler invocationHandler;
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        //将所有加了WzxAutowired的注解的bean,注入spring容器
        Field[] fields = bean.getClass().getDeclaredFields();
        for(Field field : fields){
            if(field.isAnnotationPresent(WzxAutowired.class)){
                //针对这个加了WzxAutowired注解的字段,设置为一个代理的值
                Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class<?>[]{field.getType()}, invocationHandler);
                try {
                    //相当于针对加了WzxAutowired的注解,设置了一个代理,这个代理实现是RemoteInvocationHandler
                    field.set(bean, proxy);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }
        return null;
    }
}
@Component
public class RemoteInvocationHandler implements InvocationHandler {
    @Value("${wzx.host}")
    private String host;
    @Value("${wzx.port}")
    private int port;
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //先建立远程连接
        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        //传递数据(调用哪个接口,方法,参数),服务端接收到这些数据可以基于这些数据反射调用
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setArgs(args);
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setTypes(method.getParameterTypes());
        return rpcNetTransport.send(rpcRequest);
    }
}
public class RpcNetTransport {
    private String host;
    private int port;
    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public Socket createSocket() throws IOException {
        Socket socket = new Socket(host, port);
        return socket;
    }
    public Object send(RpcRequest request) throws IOException, ClassNotFoundException {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        Socket socket = createSocket();
        //IO操作
        socket.getOutputStream();
        outputStream = new ObjectOutputStream(socket.getOutputStream());
        outputStream.writeObject(request);
        //情况缓冲区
        outputStream.flush();
        //读取服务端返回数据
        inputStream = new ObjectInputStream(socket.getInputStream());
        return inputStream.readObject();
    }
}
@RestController
public class UserController {
    @WzxAutowired
    IOrderService orderService;
    @GetMapping("/test")
    public String test(){
       return orderService.queryOrderList();
    }
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WzxAutowired {
}
@SpringBootApplication
@Component("com.wzx.example")
public class UserServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class);
    }
}

application.yml

spring:
  application:
    name: user-service
server:
  port: 8080

wzx:
  host: localhost
  port: 8888
image.png

运行结果:


image.png

服务端:

@Component
public class InitialMediator implements BeanPostProcessor {

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //加了WzxRemoteService的bean进行远程发布
        if(bean.getClass().isAnnotationPresent(WzxRemoteService.class)){
            Method[] methods = bean.getClass().getDeclaredMethods();
            for(Method method : methods){
                String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                BeanMethod beanMethod = new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                //完成需要发布的bean的存储
                Mediator.map.put(key, beanMethod);
            }
        }
        return bean;
    }
}
/**
 * 定义一个中间类
 */
public class Mediator {

    //用来存储所有发布服务的实例,也就是所有加了WzxRemoteService注解的Bean
    public static Map<String, BeanMethod> map = new ConcurrentHashMap<String, BeanMethod>();
    private volatile static Mediator instance;
    private Mediator() {
    }
    public static Mediator getInstance(){
        if(instance == null){
            synchronized (Mediator.class){
                if(instance == null){
                    instance = new Mediator();
                }
            }
        }
        return instance;
    }
    public Object processor(RpcRequest request){
        String key  = request.getClassName() + "." + request.getMethodName();
        //beanMethod已经通过后置处理器加载到了map中,直接去map中获取
        BeanMethod beanMethod = map.get(key);
        if (beanMethod == null){
            return null;
        }
        Object bean = beanMethod.getBean();
        Method method = beanMethod.getMethod();
        try {
            return method.invoke(bean, request.getArgs());
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }
}
public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    //参数
    private Object[] args;
    //参数类型
    private Class[] types;

    public String getClassName() {
        return className;
    }
    public void setClassName(String className) {
        this.className = className;
    }
    public String getMethodName() {
        return methodName;
    }
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    public Object[] getArgs() {
        return args;
    }
    public void setArgs(Object[] args) {
        this.args = args;
    }
    public Class[] getTypes() {
        return types;
    }
    public void setTypes(Class[] types) {
        this.types = types;
    }
}
public class BeanMethod {
    private Object Bean;

    private Method method;

    public Object getBean() {
        return Bean;
    }

    public void setBean(Object bean) {
        Bean = bean;
    }

    public Method getMethod() {
        return method;
    }

    public void setMethod(Method method) {
        this.method = method;
    }
}
//spring容器启动完成以后会发布一个ContextRefreshedEvent
@Component
public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent> {
    public final ExecutorService executorService = Executors.newCachedThreadPool();
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        //启动服务
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(8888);
            //不断循环去监听客户端请求
            for(;;){
                Socket socket = serverSocket.accept();
                //通过Processorhandler解决IO阻塞
                executorService.execute(new ProcessorHandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ProcessorHandler implements Runnable{
    private Socket socket;

    public ProcessorHandler(Socket socket) {
        this.socket = socket;

    }
    public Socket getSocket() {
        return socket;
    }
    public void setSocket(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {
            inputStream = new ObjectInputStream(socket.getInputStream());
            RpcRequest request = (RpcRequest)inputStream.readObject();
            //路由
            Mediator mediator = Mediator.getInstance();
            Object rs = mediator.processor(request);
            System.out.println("服务端处理的结果:" + rs);
            //结果写回去
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(rs);
            outputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if(outputStream != null) {
                    outputStream.close();
                }
                if(inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface WzxRemoteService {
}
@Configuration
@ComponentScan(value = "com.wzx.example")
public class Bootstrap {
    public static void main(String[] args) {
        ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Bootstrap.class);
    }
}
//通过这个注解自动发布服务
@WzxRemoteService
public class OrderServiceImpl implements IOrderService {
    public String queryOrderInfo(String id) {
        return "this is queryOrderInfo";
    }
    public String queryOrderList() {
        return "this is queryOrderList";
    }
}
public interface IOrderService {
    String queryOrderInfo(String id);
    String queryOrderList();
}
image.png
上一篇下一篇

猜你喜欢

热点阅读