android 进程间通信 rabbitmq

2020-09-10  本文已影响0人  一个冬季
参考网站

https://github.com/Harry-III/RabbitMQ-Android

上手了RabbitMQ?再来看看它的交换机(Exchange)吧
RabbitMQ的Java应用(1) -- Rabbit Java Client使用
RabbitMQ(三)入门 —— RabbitMQ的五种模式和四种交换机

简单说明

本例子是改编自上面的github链接

rabbitmq解决了我什么问题

1、android端不采用轮询的方式请求服务器,有点类似推送的感觉,能即时收到服务器的信息

我修改了哪些地方

1、将rabbitmq放到单独的进程中
2、重新定义一些方法

总结

1、在多进程中通过 message.replyTo 方法将通信方式传递给 Service端

...省略
 override fun onServiceConnected(name: ComponentName?, iBinder: IBinder?) {
            try {
                ...省略
                    将客户端的Msssenger对象传递给Service,用于相互通信使用
                message.replyTo = mClientMessenger;
              ...省略
                mServiceMessenger?.send(message)
            } catch (e: RemoteException) {
                e?.printStackTrace();
            }
        }

2、rabbitmq的管道创建是要在线程里面,否则会报错
3、如果有2个用户都采用一个管道(管道名 A),当服务器将信息都输送到A管道后,哪个用户处理消息快,哪个用户得到的信息就多,所以最好就是每个用户一个管道

发送信息到管道.jpg
4、如果采用Messger传递信息,传递数据不能超过1M大小的,否则会导致崩溃,因为当前进程共享该大小
5、路由的意思,类似门票,只有持有该门票的人才可以通过
6、该库的5.x版本系列 需要JDK 8 进行编译和运行。在Android上,这意味着仅支持Android 7.0或更高版本。4.x发布系列支持7.0之前的[JDK 6]和Android版本

本项目github

封装的rabbitMq代码

RabbitMQClient .java

public class RabbitMQClient {
    private final String TAG = "RabbitMQ";
    private final String FLAG_SEND = "send";
    private final String FLAG_RECEIVE = "receive";

    private final ConnectionFactory factory;
    private Connection connection;
    private Map<String, Channel> channelMap = new HashMap<>();

    public static final String EXCHANGETYPE_FANOUT = "fanout";   //不用匹配路由,发送给所有绑定转换器的队列
    public static final String EXCHANGETYPE_DIRECT = "direct";  //匹配路由一致,才发送给绑定转换器队列
    public static final String EXCHANGETYPE_TOPIC = "topic";  // 通配符* 和 # 匹配路由一致,才发送给绑定转换器队列


    public RabbitMQClient(String hostIp, int port, String username, String password) {
        factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setHost(hostIp);
        factory.setPort(port);
        factory.setVirtualHost("/");//类似数据库的意思
        factory.setConnectionTimeout(15 * 1000);         //连接时间设置为10秒
        factory.setAutomaticRecoveryEnabled(true);   //恢复连接,通道
        factory.setTopologyRecoveryEnabled(true);    //恢复通道中 转换器,队列,绑定关系等
        factory.setNetworkRecoveryInterval(5 * 1000);    //恢复连接间隔,默认5秒
    }


    /**
     * @param message   需要发送的消息
     * @param queueName 管道名称
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void sendQueueMessage(String message, String queueName) throws IOException, TimeoutException, AlreadyClosedException {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
        }
        if (!channelMap.containsKey(FLAG_SEND + queueName)) {
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channelMap.put(FLAG_SEND + queueName, channel);
        }
        //空名字的交换机,需要设置routingKey,此时会将routingKey 作为 队列名使用
        channelMap.get(FLAG_SEND + queueName).basicPublish("", queueName, null, message.getBytes());
    }


    /**
     * @param exchangeName 交换机名称
     * @param message      需要发送的消息
     * @param queueName    队列名称
     * @param routingKey   路由规则
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion 发送 exchangeType direct 类型的信息
     **/
    public void sendDirectTypeMessage(String exchangeName, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
        }
        if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName)) {
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channel.exchangeDeclare(exchangeName, EXCHANGETYPE_DIRECT);
            channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName, channel);
        }
        channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
    }

    /**
     * @param exchangeName 交换机名称
     * @param queueName    队列名称
     * @param message      发送的消息
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion 发送 exchangeType fanout 类型的信息
     **/
    public void sendFanoutTypeMessage(String exchangeName, String queueName, String message) throws IOException, TimeoutException, AlreadyClosedException {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
        }
        if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName)) {
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channel.exchangeDeclare(exchangeName, EXCHANGETYPE_FANOUT);
            channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName, channel);
        }
        channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName).basicPublish(exchangeName, "", null, message.getBytes());
    }

    /**
     * @param exchangeName 交换机名称
     * @param exchangeType 模式
     * @param queueName    队列名称
     * @param message      需要发送的消息
     * @param routingKey   路由规则
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void sendExchangeNameQueueMessage(String exchangeName, String exchangeType, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
        }
        if (!channelMap.containsKey(FLAG_SEND + exchangeName + exchangeType + queueName)) {
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channel.exchangeDeclare(exchangeName, exchangeType);
            channelMap.put(FLAG_SEND + exchangeName + exchangeType + queueName, channel);
        }
        if (exchangeType.equals(EXCHANGETYPE_FANOUT)) {
            channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, "", null, message.getBytes());
        } else if (exchangeType.equals(EXCHANGETYPE_DIRECT)) {
            channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
        } else if (exchangeType.equals(EXCHANGETYPE_TOPIC)) {
            channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
        }
    }


    /**
     * @param queueName 队列名称
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void receiveQueueMessage(final String queueName, final ResponseListener listener)
            throws IOException, TimeoutException, AlreadyClosedException {
        receiveQueueRoutingKeyMessage(queueName, "", "", "", listener);
    }


    /**
     * @param queueName    队列名称
     * @param routingKey   路由规则
     * @param exchangeName 交换机名称
     * @param exchangeType 交换机类型
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, String exchangeName, String exchangeType, final ResponseListener listener)
            throws IOException, TimeoutException, AlreadyClosedException {

        if (exchangeType.equals(EXCHANGETYPE_DIRECT) || exchangeType.equals(EXCHANGETYPE_TOPIC)) {
            if (TextUtils.isEmpty(routingKey)) {
                throw new NullPointerException("路由规则不能为空");
            }
        }

        if (!TextUtils.isEmpty(routingKey)) {
            if (TextUtils.isEmpty(exchangeName)) {
                throw new NullPointerException("交换机名称不能为空");
            }
        }

        if (!channelMap.containsKey(FLAG_RECEIVE + routingKey + queueName)) {
            if (connection == null || !connection.isOpen()) {
                connection = factory.newConnection();
            }

            final Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            //绑定转换器,使用路由筛选消息
            if (!TextUtils.isEmpty(routingKey)) {
                channel.exchangeDeclare(exchangeName, exchangeType);
                channel.queueBind(queueName, exchangeName, routingKey);  //设置绑定
            }
            //监听队列
            channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    if (listener != null) {
                        listener.receive(message);
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);  //消息应答
                }
            });
            channelMap.put(FLAG_RECEIVE + routingKey + queueName, channel);
            Log.e(TAG,"已经连接上了,队列名称:" + queueName);
        }
    }


    /**
     * 关闭所有资源
     */
    public void close() {
        for (Channel next : channelMap.values()) {
            if (next != null && next.isOpen()) {
                try {
                    next.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }
        }
        channelMap.clear();
        if (connection != null && connection.isOpen()) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    public interface ResponseListener {
        void receive(String message);
    }
}

RabbitMQUtil .java

public class RabbitMQUtil {
    private boolean isRunning = true;
    private RabbitMQClient rabbitMQ;
    private ExecutorService executor;


    public RabbitMQUtil(String hostIp, int port, String username, String password) {
        rabbitMQ = new RabbitMQClient(hostIp, port, username, password);
        executor = Executors.newSingleThreadExecutor();  //根据项目需要设置常用线程个数
    }

    /**
     * @param message   发送的消息
     * @param queueName 队列名称
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void sendMessage(final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    rabbitMQ.sendQueueMessage(message, queueName);
                    if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                } catch (IOException | TimeoutException | AlreadyClosedException e) {
                    e.printStackTrace();
                    if (errorMessageListener!=null){
                        errorMessageListener.errorMessage(e);
                    }
                    if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                }
            }
        });
    }

    /**
     * @param message      发送的消息
     * @param exchangeName 交换机名称
     * @param queueName    队列名称
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void sendMessage(final String message, final String exchangeName, final String exchangeType, final String queueName, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    rabbitMQ.sendExchangeNameQueueMessage(exchangeName, exchangeType, message, queueName, routingKey);
                    if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                } catch (IOException | TimeoutException | AlreadyClosedException e) {
                    e.printStackTrace();
                    if (errorMessageListener!=null){
                        errorMessageListener.errorMessage(e);
                    }
                    if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                }
            }
        });
    }

    /**
     * @param exchangeName 交换机名称
     * @param queueName    队列名称
     * @param message      需要发送的消息
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void sendFanoutTypeMessage(final String exchangeName, final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    rabbitMQ.sendFanoutTypeMessage(exchangeName, queueName, message);
                    if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                } catch (IOException | TimeoutException | AlreadyClosedException e) {
                    e.printStackTrace();
                    if (errorMessageListener!=null){
                        errorMessageListener.errorMessage(e);
                    }
                    if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                }
            }
        });
    }

    /**
     * @param exchangeName 交换机名称
     * @param message      需要发送的消息
     * @param queueName    队列名称
     * @param routingKey   路由规则
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion 发送 exchangeType direct 类型的信息
     **/
    public void sendDirectTypeMessage(final String exchangeName, final String queueName, final String message, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    rabbitMQ.sendDirectTypeMessage(exchangeName, queueName, message, routingKey);
                    if (sendMessageListener != null) sendMessageListener.sendMessage(true);
                } catch (IOException | TimeoutException | AlreadyClosedException e) {
                    e.printStackTrace();
                    if (errorMessageListener!=null){
                        errorMessageListener.errorMessage(e);
                    }
                    if (sendMessageListener != null) sendMessageListener.sendMessage(false);
                }
            }
        });
    }

    /**
     * @param queueName 队列名称
     * @date 创建时间:2020/9/8 0008
     * @auther gaoxiaoxiong
     * @Descriptiion
     **/
    public void receiveQueueMessage(String queueName, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
        String newQueueName = null;
        if (TextUtils.isEmpty(queueName)){
            newQueueName = createDefaultQueueName(queueName);
        }else {
            newQueueName = queueName;
        }
        final String finalNewQueueName = newQueueName;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                while (isRunning) {
                    try {
                        rabbitMQ.receiveQueueMessage(finalNewQueueName, new RabbitMQClient.ResponseListener() {
                            @Override
                            public void receive(String message) {
                                if (listener != null) listener.receiveMessage(message);
                            }
                        });
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        e.printStackTrace();
                        SystemClock.sleep(5000);
                    }
                }
            }
        });
    }

    public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, final String exchangeName, final String exchangeType, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
        String newQueueName = null;
        if (TextUtils.isEmpty(queueName)){
            newQueueName = createDefaultQueueName(queueName);
        }else {
            newQueueName = queueName;
        }
        final String finalNewQueueName = newQueueName;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                while (isRunning) {
                    try {
                        rabbitMQ.receiveQueueRoutingKeyMessage(finalNewQueueName, routingKey, exchangeName, exchangeType, new RabbitMQClient.ResponseListener() {
                            @Override
                            public void receive(String message) {
                                if (listener != null) listener.receiveMessage(message);
                            }

                        });
                    } catch (IOException | TimeoutException | AlreadyClosedException e) {
                        if (errorMessageListener!=null){
                            errorMessageListener.errorMessage(e);
                        }
                        e.printStackTrace();
                        SystemClock.sleep(5000);  //等待五秒
                    }
                }
            }
        });
    }

    public String createDefaultQueueName(String routingKey) {
        if (TextUtils.isEmpty(routingKey)){
            routingKey = "";
        }
        return routingKey + "@" + UUID.randomUUID();
    }

    /**
     * 建议:
     * 在application中关闭或者在结束工作时关闭
     */
    public void close() {
        isRunning = false;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                rabbitMQ.close();
                executor.shutdownNow();
            }
        });
    }


    public interface ReceiveMessageListener {
        void receiveMessage(String message);
    }

    public interface SendMessageListener {
        void sendMessage(boolean isSuccess);
    }

    public interface ErrorMessageListener{
        void errorMessage(Exception e);
    }
}

上一篇下一篇

猜你喜欢

热点阅读