rocketmq consumer优雅停机的探索
背景
最近在项目(一个dubbo服务)发布的时候经常报这个错
nested exception is org.apache.ibatis.exceptions.PersistenceException: ### Error querying database. Cause: org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is com.alibaba.druid.pool.DataSourceClosedException: dataSource already closed
简单看来是数据库连接已经关闭了,但还在执行逻辑。很奇怪,发布时,dubbo是会先被从zk中摘除掉,再停止服务,按道理说是不会有这种错报出来,接着看调用栈发现是rocketmq consumer中报出来的,原来rocketmq consumer线程一直在跑,并没有停止,所以在spring容器关闭的时候,还在执行的逻辑会报错。
其中consumer启动代码大致如下:
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr); consumer.subscribe(topic, tag);
consumer.setConsumeMessageBatchMaxSize(getConsumeMessageBatchMaxSize());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MsgListener());
if (getInstanceName() != null) {
consumer.setInstanceName(getInstanceName());
}
consumer.start();
Thread shutdownThread = new Thread(new Runnable() {
public void run() {
if (consumer != null) {
consumer.shutdown();
}
}
}, this.getClass().getName() + "MsgListener");
shutdownThread.setPriority(Thread.MAX_PRIORITY);
}
...
public void destroy() {
if (consumer != null) {
consumer.shutdown();
}
}
虽然注册了一个shutdown的hook,而且设置了最高优先级,但从实际中看来是没有达到预期的效果的。
查了资料,有这么一段描述:
- Thread.setPriority()可能根本不做任何事情,这跟你的操作系统和虚拟机版本有关
- 线程优先级对于不同的线程调度器可能有不同的含义,可能并不是你直观的推测。特别地,优先级并不一定是指CPU的分享。在UNIX系统,优先级或多或少可以认为是CPU的分配,但Windows不是这样
- 线程的优先级通常是全局的和局部的优先级设定的组合。Java的setPriority()方法只应用于局部的优先级。换句话说,你不能在整个可能的范围 内设定优先级。(这通常是一种保护的方式,你大概不希望鼠标指针的线程或者处理音频数据的线程被其它随机的用户线程所抢占)
- 不同的系统有不同的线程优先级的取值范围,但是Java定义了10个级别(1-10)。这样就有可能出现几个线程在一个操作系统里有不同的优先级,在另外一个操作系统里却有相同的优先级(并因此可能有意想不到的行为)
- 操作系统可能(并通常这么做)根据线程的优先级给线程添加一些专有的行为(例如”only give a quantum boost if the priority is below X“)。这里再重复一次,优先级的定义有部分在不同系统间有差别。
- 大多数操作系统的线程调度器实际上执行的是在战略的角度上对线程的优先级做临时操作(例如当一个线程接收到它所等待的一个事件或者I/O),通常操作系统知道最多,试图手工控制优先级可能只会干扰这个系统。
- 你的应用程序通常不知道有哪些其它进程运行的线程,所以对于整个系统来说,变更一个线程的优先级所带来的影响是难于预测的。例如你可能发现,你有一个预期 为偶尔在后台运行的低优先级的线程几乎没有运行,原因是一个病毒监控程序在一个稍微高一点的优先级(但仍然低于普通的优先级)上运行,并且无法预计你程序 的性能,它会根据你的客户使用的防病毒程序不同而不同。
总结一句话就是优先级高的在全局范围内不一定会优先执行。
注册的spring bean 的destroy执行顺序也有问题,他不一定是在db connect关闭前执行,也就是说两个bean的销毁并没有先后顺序,这点看下面的分析。
spring依赖调整尝试
由于整个项目是基于spring容器管理的,所以第一想到的是调整spring销毁bean的顺序,如果最先销毁consumer这个bean问题就解决了。我们看下面这个例子,有两个bean,A和B,A依赖B
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="a" class="depend.A" init-method="init" destroy-method="destroy" >
</bean>
<bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
</bean>
</beans>
package depend;
import javax.annotation.Resource;
public class A {
@Resource
private B b;
public B getB() {
return b;
}
public void setB(B b) {
this.b = b;
}
public void init() {
System.out.println("A init");
}
public void destroy() {
System.out.println("A destroy");
}
public void doSomething() {
System.out.println("A doSomething");
}
}
package depend;
public class B {
public void init() {
System.out.println("B init");
}
public void destroy() {
System.out.println("B destroy");
}
public void doSomething() {
System.out.println("B doSomething");
}
}
package main;
import depend.A;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class DependMain {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("Depend.xml");
A a = (A) context.getBean("a");
a.doSomething();
context.registerShutdownHook();
}
}
项目中也是用@Resource来注入依赖的,我们执行一下main发现可能会有如下的输出:
A init
B init
A doSomething
B destroy
A destroy
说明A在初始化时并没有在B之后,而且A也是在B销毁后才销毁的,说明了@Resource并没有处理依赖关系。
稍微修改一下xml配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="a" class="depend.A" init-method="init" destroy-method="destroy" depends-on="b">
</bean>
<bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
</bean>
</beans>
加一个depend-on,发现输出是这样的:
B init
A init
A doSomething
A destroy
B destroy
说明依赖生效了,同时我发现这样写也是可以处理依赖关系的:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="a" class="depend.A" init-method="init" destroy-method="destroy">
<property name="b" ref="b"/>
</bean>
<bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
</bean>
</beans>
这也解释了为什么只有Dao调用db connect时报错,Dao就是用property依赖db connect的,Spring在关闭时一定是先关闭db connect,再关闭Dao,而Spring的bean在还有引用的时候是不会被回收的,所以只有db connect断开连接会报错。
拿到业务上来说,mq consumer依赖biz1Service,biz1Service依赖biz2Service,biz2Service依赖Dao,Dao依赖db connect,这要是梳理出这个depend-on实在是有点麻烦,于是放弃了这个想法。
设置环境变量尝试
找公司的java大神问了一下,他给出了一个建议是在应用启动时设置一个环境变量,假设叫MQ_RUN=1,当rocketmq消费消息时读取该变量,判断是MQ_RUN==1,成立则继续执行,否则
return ConsumeConcurrentlyStatus.RECONSUME_LATER,保证消息不丢失,当应用关闭时设置这个环境变量为MQ_RUN=0,sleep个几秒钟,再去真正关闭应用。思路大致是有了,试一下发现环境变量不是这么好设置的,因为export MQ_RUN=1定义的变量,会对自己所在的shell进程及其子进程生效,启动时好说,应用是启动脚本的子进程,MQ_RUN=1设置没问题,停止时,停止脚本是发一个信号给应用,这两个进程没有关系,设置MQ_RUN=0也就无效了。
信号尝试
由于上面的铺垫,想到了使用类似健康检查的方式来停止consumer的消费,写一个接口来设置这个环境变量,关闭应用脚本时执行一下,但是这样做会有风险,接口是对外暴露的,任何人都可以请求这个接口来关闭consumer的消费。于是又想到了信号,参考我之前的文章《如何正确地杀死你的进程》,在应用中监听一个信号,应用关闭脚本中先向应用发送一个信号,kill -x $pid,应用中监听这个信号设置MQ_RUN=0,让消费停止,再去关闭应用即可。
最后说一句
其实这个报错对线上一点都不影响,rocketmq采取的是消费ack的方式来确定消息是否消费,当报错时消息又会被塞回broker,下次会继续消费,但这个问题也是有点意思,所以记录下来,最后的解决方式我觉得还是不是很优雅,目前能想到的也只有这样,我希望的是spring能有一种严格的依赖关系,并且不需要逐个设置,只需要打开全局配置即可。