android 进程间通信 rabbitmq
2020-09-10 本文已影响0人
一个冬季
参考网站
https://github.com/Harry-III/RabbitMQ-Android
上手了RabbitMQ?再来看看它的交换机(Exchange)吧
RabbitMQ的Java应用(1) -- Rabbit Java Client使用
RabbitMQ(三)入门 —— RabbitMQ的五种模式和四种交换机
简单说明
本例子是改编自上面的github链接
rabbitmq解决了我什么问题
1、android端不采用轮询的方式请求服务器,有点类似推送的感觉,能即时收到服务器的信息
我修改了哪些地方
1、将rabbitmq放到单独的进程中
2、重新定义一些方法
总结
1、在多进程中通过 message.replyTo 方法将通信方式传递给 Service端
...省略
override fun onServiceConnected(name: ComponentName?, iBinder: IBinder?) {
try {
...省略
将客户端的Msssenger对象传递给Service,用于相互通信使用
message.replyTo = mClientMessenger;
...省略
mServiceMessenger?.send(message)
} catch (e: RemoteException) {
e?.printStackTrace();
}
}
2、rabbitmq的管道创建是要在线程里面,否则会报错
3、如果有2个用户都采用一个管道(管道名 A),当服务器将信息都输送到A管道后,哪个用户处理消息快,哪个用户得到的信息就多,所以最好就是每个用户一个管道
4、如果采用Messger传递信息,传递数据不能超过1M大小的,否则会导致崩溃,因为当前进程共享该大小
5、路由的意思,类似门票,只有持有该门票的人才可以通过
6、该库的5.x版本系列 需要JDK 8 进行编译和运行。在Android上,这意味着仅支持Android 7.0或更高版本。4.x发布系列支持7.0之前的[JDK 6]和Android版本。
本项目github
封装的rabbitMq代码
RabbitMQClient .java
public class RabbitMQClient {
private final String TAG = "RabbitMQ";
private final String FLAG_SEND = "send";
private final String FLAG_RECEIVE = "receive";
private final ConnectionFactory factory;
private Connection connection;
private Map<String, Channel> channelMap = new HashMap<>();
public static final String EXCHANGETYPE_FANOUT = "fanout"; //不用匹配路由,发送给所有绑定转换器的队列
public static final String EXCHANGETYPE_DIRECT = "direct"; //匹配路由一致,才发送给绑定转换器队列
public static final String EXCHANGETYPE_TOPIC = "topic"; // 通配符* 和 # 匹配路由一致,才发送给绑定转换器队列
public RabbitMQClient(String hostIp, int port, String username, String password) {
factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setHost(hostIp);
factory.setPort(port);
factory.setVirtualHost("/");//类似数据库的意思
factory.setConnectionTimeout(15 * 1000); //连接时间设置为10秒
factory.setAutomaticRecoveryEnabled(true); //恢复连接,通道
factory.setTopologyRecoveryEnabled(true); //恢复通道中 转换器,队列,绑定关系等
factory.setNetworkRecoveryInterval(5 * 1000); //恢复连接间隔,默认5秒
}
/**
* @param message 需要发送的消息
* @param queueName 管道名称
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void sendQueueMessage(String message, String queueName) throws IOException, TimeoutException, AlreadyClosedException {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
if (!channelMap.containsKey(FLAG_SEND + queueName)) {
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channelMap.put(FLAG_SEND + queueName, channel);
}
//空名字的交换机,需要设置routingKey,此时会将routingKey 作为 队列名使用
channelMap.get(FLAG_SEND + queueName).basicPublish("", queueName, null, message.getBytes());
}
/**
* @param exchangeName 交换机名称
* @param message 需要发送的消息
* @param queueName 队列名称
* @param routingKey 路由规则
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion 发送 exchangeType direct 类型的信息
**/
public void sendDirectTypeMessage(String exchangeName, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName)) {
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, EXCHANGETYPE_DIRECT);
channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName, channel);
}
channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_DIRECT + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
}
/**
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param message 发送的消息
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion 发送 exchangeType fanout 类型的信息
**/
public void sendFanoutTypeMessage(String exchangeName, String queueName, String message) throws IOException, TimeoutException, AlreadyClosedException {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
if (!channelMap.containsKey(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName)) {
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, EXCHANGETYPE_FANOUT);
channelMap.put(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName, channel);
}
channelMap.get(FLAG_SEND + exchangeName + EXCHANGETYPE_FANOUT + queueName).basicPublish(exchangeName, "", null, message.getBytes());
}
/**
* @param exchangeName 交换机名称
* @param exchangeType 模式
* @param queueName 队列名称
* @param message 需要发送的消息
* @param routingKey 路由规则
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void sendExchangeNameQueueMessage(String exchangeName, String exchangeType, String message, String queueName, String routingKey) throws IOException, TimeoutException, AlreadyClosedException {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
if (!channelMap.containsKey(FLAG_SEND + exchangeName + exchangeType + queueName)) {
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType);
channelMap.put(FLAG_SEND + exchangeName + exchangeType + queueName, channel);
}
if (exchangeType.equals(EXCHANGETYPE_FANOUT)) {
channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, "", null, message.getBytes());
} else if (exchangeType.equals(EXCHANGETYPE_DIRECT)) {
channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
} else if (exchangeType.equals(EXCHANGETYPE_TOPIC)) {
channelMap.get(FLAG_SEND + exchangeName + exchangeType + queueName).basicPublish(exchangeName, routingKey, null, message.getBytes());
}
}
/**
* @param queueName 队列名称
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void receiveQueueMessage(final String queueName, final ResponseListener listener)
throws IOException, TimeoutException, AlreadyClosedException {
receiveQueueRoutingKeyMessage(queueName, "", "", "", listener);
}
/**
* @param queueName 队列名称
* @param routingKey 路由规则
* @param exchangeName 交换机名称
* @param exchangeType 交换机类型
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, String exchangeName, String exchangeType, final ResponseListener listener)
throws IOException, TimeoutException, AlreadyClosedException {
if (exchangeType.equals(EXCHANGETYPE_DIRECT) || exchangeType.equals(EXCHANGETYPE_TOPIC)) {
if (TextUtils.isEmpty(routingKey)) {
throw new NullPointerException("路由规则不能为空");
}
}
if (!TextUtils.isEmpty(routingKey)) {
if (TextUtils.isEmpty(exchangeName)) {
throw new NullPointerException("交换机名称不能为空");
}
}
if (!channelMap.containsKey(FLAG_RECEIVE + routingKey + queueName)) {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
final Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
//绑定转换器,使用路由筛选消息
if (!TextUtils.isEmpty(routingKey)) {
channel.exchangeDeclare(exchangeName, exchangeType);
channel.queueBind(queueName, exchangeName, routingKey); //设置绑定
}
//监听队列
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
if (listener != null) {
listener.receive(message);
}
channel.basicAck(envelope.getDeliveryTag(), false); //消息应答
}
});
channelMap.put(FLAG_RECEIVE + routingKey + queueName, channel);
Log.e(TAG,"已经连接上了,队列名称:" + queueName);
}
}
/**
* 关闭所有资源
*/
public void close() {
for (Channel next : channelMap.values()) {
if (next != null && next.isOpen()) {
try {
next.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
channelMap.clear();
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public interface ResponseListener {
void receive(String message);
}
}
RabbitMQUtil .java
public class RabbitMQUtil {
private boolean isRunning = true;
private RabbitMQClient rabbitMQ;
private ExecutorService executor;
public RabbitMQUtil(String hostIp, int port, String username, String password) {
rabbitMQ = new RabbitMQClient(hostIp, port, username, password);
executor = Executors.newSingleThreadExecutor(); //根据项目需要设置常用线程个数
}
/**
* @param message 发送的消息
* @param queueName 队列名称
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void sendMessage(final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
rabbitMQ.sendQueueMessage(message, queueName);
if (sendMessageListener != null) sendMessageListener.sendMessage(true);
} catch (IOException | TimeoutException | AlreadyClosedException e) {
e.printStackTrace();
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
if (sendMessageListener != null) sendMessageListener.sendMessage(false);
}
}
});
}
/**
* @param message 发送的消息
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void sendMessage(final String message, final String exchangeName, final String exchangeType, final String queueName, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
rabbitMQ.sendExchangeNameQueueMessage(exchangeName, exchangeType, message, queueName, routingKey);
if (sendMessageListener != null) sendMessageListener.sendMessage(true);
} catch (IOException | TimeoutException | AlreadyClosedException e) {
e.printStackTrace();
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
if (sendMessageListener != null) sendMessageListener.sendMessage(false);
}
}
});
}
/**
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param message 需要发送的消息
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void sendFanoutTypeMessage(final String exchangeName, final String message, final String queueName, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
rabbitMQ.sendFanoutTypeMessage(exchangeName, queueName, message);
if (sendMessageListener != null) sendMessageListener.sendMessage(true);
} catch (IOException | TimeoutException | AlreadyClosedException e) {
e.printStackTrace();
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
if (sendMessageListener != null) sendMessageListener.sendMessage(false);
}
}
});
}
/**
* @param exchangeName 交换机名称
* @param message 需要发送的消息
* @param queueName 队列名称
* @param routingKey 路由规则
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion 发送 exchangeType direct 类型的信息
**/
public void sendDirectTypeMessage(final String exchangeName, final String queueName, final String message, final String routingKey, final SendMessageListener sendMessageListener,final ErrorMessageListener errorMessageListener) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
rabbitMQ.sendDirectTypeMessage(exchangeName, queueName, message, routingKey);
if (sendMessageListener != null) sendMessageListener.sendMessage(true);
} catch (IOException | TimeoutException | AlreadyClosedException e) {
e.printStackTrace();
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
if (sendMessageListener != null) sendMessageListener.sendMessage(false);
}
}
});
}
/**
* @param queueName 队列名称
* @date 创建时间:2020/9/8 0008
* @auther gaoxiaoxiong
* @Descriptiion
**/
public void receiveQueueMessage(String queueName, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
String newQueueName = null;
if (TextUtils.isEmpty(queueName)){
newQueueName = createDefaultQueueName(queueName);
}else {
newQueueName = queueName;
}
final String finalNewQueueName = newQueueName;
executor.execute(new Runnable() {
@Override
public void run() {
while (isRunning) {
try {
rabbitMQ.receiveQueueMessage(finalNewQueueName, new RabbitMQClient.ResponseListener() {
@Override
public void receive(String message) {
if (listener != null) listener.receiveMessage(message);
}
});
} catch (IOException | TimeoutException | AlreadyClosedException e) {
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
e.printStackTrace();
SystemClock.sleep(5000);
}
}
}
});
}
public void receiveQueueRoutingKeyMessage(String queueName, final String routingKey, final String exchangeName, final String exchangeType, final ReceiveMessageListener listener,final ErrorMessageListener errorMessageListener) {
String newQueueName = null;
if (TextUtils.isEmpty(queueName)){
newQueueName = createDefaultQueueName(queueName);
}else {
newQueueName = queueName;
}
final String finalNewQueueName = newQueueName;
executor.execute(new Runnable() {
@Override
public void run() {
while (isRunning) {
try {
rabbitMQ.receiveQueueRoutingKeyMessage(finalNewQueueName, routingKey, exchangeName, exchangeType, new RabbitMQClient.ResponseListener() {
@Override
public void receive(String message) {
if (listener != null) listener.receiveMessage(message);
}
});
} catch (IOException | TimeoutException | AlreadyClosedException e) {
if (errorMessageListener!=null){
errorMessageListener.errorMessage(e);
}
e.printStackTrace();
SystemClock.sleep(5000); //等待五秒
}
}
}
});
}
public String createDefaultQueueName(String routingKey) {
if (TextUtils.isEmpty(routingKey)){
routingKey = "";
}
return routingKey + "@" + UUID.randomUUID();
}
/**
* 建议:
* 在application中关闭或者在结束工作时关闭
*/
public void close() {
isRunning = false;
executor.execute(new Runnable() {
@Override
public void run() {
rabbitMQ.close();
executor.shutdownNow();
}
});
}
public interface ReceiveMessageListener {
void receiveMessage(String message);
}
public interface SendMessageListener {
void sendMessage(boolean isSuccess);
}
public interface ErrorMessageListener{
void errorMessage(Exception e);
}
}