SparkStream消费kafka消息delay,但job处理
在场景系统中,通过SparkStream直接消费kafka数据,出现处理逻辑耗时在毫秒级,但是很多的job delay。
示例代码如下:
valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)
valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)
valtopicSet = topics.split(",").toSet
valkafkaParams =Map("metadata.broker.list"-> brokerList)
logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")
valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))
//采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用
valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
data.map(dataParse).print()
代码逻辑非常简单,在dataParse方法中也只是进行json的解析,但是一个任务处理都达到4s,而SparkStream设置为了1s一个批次,从而导致越来越多的job等待,如下图:
从上图发现存在stage持续时间为4s,故查看其详细信息,发现当前stage存在很长时间的空闲,如下图:
故查看executor端日志,发现11:09:09完成task计算后到11:09:12期间,executor处于空闲状态,日志如下:
此时,追踪driver端日志,试图从driver发现当前任务在进行怎样的操作,发现这段一段日志:
17/09/29 11:09:12 DEBUG scheduler.TaskSetManager: Moving to RACK_LOCAL after waiting for 3000ms,在等待3000ms后移动到机架本地模式,继续追查当前stage启动时间,找到日志如下:
也就是说,在9~12这个时间点中,当前task都在进行一个等待操作,而超时间为3000ms,超时后执行了Moving to RACK_LOCAL操作,并检测到本地级别的机架本地没有任务,所以移动到Any级别。
追踪源码试图找出当前job进行了什么样的操作,定位到源码如下:
从else的判断条件可知,当(当前时间 - 最新Task启动时间) > 本地等待时间,即会答应当前log,继续追踪发现源码内容:
至此我们已经找到了上面task等待3s的原因,在设置sparkConf的时候,并没有设置当前三个参数,则取默认值,但是这个配置又是做什么的呢?
查找了相关资料,并向大牛请教后得到这样的解释:
spark在消费数据时,优先采用节点本地模式,即NODE_LOCAL(节点本地模式)>RACK_LOCAL(机架本地模式)>ANY(任意),这样在大数据量时可以做到减少网络io,每一批数据默认会等待三秒,如果三秒后数据所在节点上依旧没有启动task后,会修改为RACK_LOCAL,并且提交任务,失败后立马改为ANY模式。
而基于当前业务,SparkStream必须每1s处理一批数据,并且只给定了一个executor,所以大部分的节点上是不存在task的,如果每批数据等待节点本地启动task,这样会导致越来越多的job delay。故只能修改相关参数的默认值,跳过wait,直接将模式设置为ANY,修改代码如下:
valsparkConf =newSparkConf().setAppName("Scene")
sparkConf.set("spark.locality.wait.process","0")
sparkConf.set("spark.locality.wait.node","0")
sparkConf.set("spark.locality.wait.rack","0")
valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)
valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)
valtopicSet = topics.split(",").toSet
valkafkaParams =Map("metadata.broker.list"-> brokerList)
logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")
valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))
//采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用
valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
data.map(dataParse).print()
问题圆满解决。