数据分析实践 | flink | 流程优化篇
2018-11-25 本文已影响0人
Sevsea
0x01flink执行流程了解一下
流程如下:
flink执行流程由一个Source数据处理,结果分发到四个窗口进行处理。
0x02表象:
flink需要优化,最先表现出来的现状就是:
窗口中使用metric体现出每秒的数据处理量很低,或停止。
1.代码中添加metric使用方法可参考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
2.如果使用flink dashboard也可以使用metric�功能进行统计
此处以flink dashboard为例。
0x03问题点及优化:
1.数据反压
数据体现(背压(Backpressure)机制):
-> 一个window中数据处理的速率慢
-> 导致Source数据处理过程越来越慢
-> 再导致所有窗口处理越来越慢。
dashboard体现:
dashboard可以在背压这里看到HIGH时,则存在数据反压问题。
flink数据反压
反压逻辑:
若流程为A-B-C-D-E-F ,ABCDE出现反压(即这里status为high),则表示F处理流程导致E -> D-> C->B ->A 相继慢。
优化方式:
1.数据标记分流[详细代码见通用优化]
2.窗口优化[详细代码见通用优化]
2.数据倾斜
在多进程环境下:
数据体现:
-> 每个窗口中所有数据的分布不平均,某个窗口处理数据量太大导致速率慢。
-> 导致Source数据处理过程越来越慢
-> 再导致所有窗口处理越来越慢。
dashboard体现:
dashboard中Subtasks中打开每个窗口可以看到每个窗口进程的运行情况:
flink数据倾斜
如上图,数据分布很不均匀,导致部分窗口数据处理缓慢。
优化方式:
1.数据标记分流[详细代码见通用优化]
2.窗口优化[详细代码见通用优化]
3.在不影响逻辑的前提下,keyby对数据分流时选择较为均匀的数据。
3.消费滞后
尚未出现数据反压和数据倾斜的状况,但是flink的watermarks追不上实时时间,不能实时处理。
需单进程确认点:
1. flink读取的数据是否产生的及时。
2. 窗口Aggregate处理是否存在死循环或较慢的点
(如:正则/redis/http等)
3. flink计算结果的输出处理慢。
(如:使用.disablechain.addsink()后再在dashboard中查看窗口和输出分别处理的速率)
可优化点:
- 将窗口的处理逻辑优化的简单一些,将较长时间的处理放在数据处理部分或windowFunction部分。
4.需在窗口内做大量的外连情况,如redis/es等,redis连接过多会慢或直接报错。[2019.11.17更新]
解决方案:
1.可以在窗口外面申请全局redis连接池作为全局变量。
class MyProcessWindowFunction extends RichWindowFunction[Accumulator,String,String,TimeWindow] {
@transient var config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(300)
config_redis.setMaxWaitMillis (2*1000)
@transient var jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
@transient var client = Esinit() // 此处为es外联的申明
@transient var log = LoggerFactory.getLogger(getClass)
//其他的一些全局变量也可以在这里定义,如log
LoginCheck_api.KeepSession()
//检查保持状态的函数也可以在这里处理,这样不会每个窗口都处理一遍。
override def apply (key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[String]): Unit = {
...
//窗口如果定义为null则重新做定义
if(jedisPool==null){
w_log = LoggerFactory.getLogger(getClass)
config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(300)
config_redis.setMaxWaitMillis (2*1000)
jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
LoginCheck_api.KeepSession()
}
if(client==null){
client = Esinit()
}
...
2.网络延时问题[2019.12.4更新]
场景:flink反压,且排查redis无太多慢查日志
检查提交集群对redis的延时情况,正常应该在0.099ms以内不会影响到程序的处理过程。
3.将对外操作放进单独多线程操作(如果上述两个问题都解决不了问题)[2019.12.4更新]
以redis举例:
import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object ThreadPool {
var config_redis = new JedisPoolConfig()
config_redis.setMaxTotal(500)
config_redis.setMaxIdle(500)
config_redis.setBlockWhenExhausted(false)
config_redis.setMaxWaitMillis (1000)
config_redis.setMinEvictableIdleTimeMillis(6000)
config_redis.setTimeBetweenEvictionRunsMillis(3000)
var jedisPool = new JedisPool(config_redis,"10.10.10.10",1234,0,"yourpassword")
val threadPool:ExecutorService=Executors.newFixedThreadPool(500)
def sadd(key:String,value:String):Int= {
var r = 1
try {
val future=new FutureTask[String](new Callable[String] {
override def call():String = {
var isexists = 1L //sadd返回1为添加成功,0为已存在/添加不成功
var jedis = jedisPool.getResource
try{
isexists = jedis.sadd(bolt_url,id_str)
}catch{
case e=>
}finally {
jedis.close()
}
return isexists.toString
}
})
threadPool.execute(future)
r = future.get().toInt //导出结果
if(r==1){
...//逻辑操作
}else{
...//逻辑操作
}
}finally {
// threadPool.shutdown()
}
return r//可选择是否返回结果
}
def main (args: Array[String]): Unit = {
var t =sadd("a","b")
println(t)
threadPool.shutdown()
}
}
而后在窗口中调用ThreadPool.sadd方法,获取到redis操作结果后的逻辑操作也可在窗口外进行,窗口只负责调度。
5.通用优化:
1.数据标记分流:
使用数据标记过滤进入窗口的数据,
而非使用filter,map等方式去筛选数据。
split分流 select选择分流.
val frequency_ = Features.split(
(s:Map[String,Any])=>
s.get("method").get.toString match{
case "a"|"b"|"c"|
=> List("str")
case "1"|"2"
=>List("int")
case _
=>List("normal")
}
)
val all = frequency_.select("str","int").assignTimestampsAndWatermarks(new TimestampExtractor())
all.keyby().aggregate()
...
Ps. https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
2.窗口聚合计算
window apply窗口最后触发时进行一次性计算 aggregate来一条数据计算一次。
Ps.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
3.keyby关键词无法自行选择较均匀的情况下,
可以采用keyby(Random(20)+key)的形式进行分配窗口。
最好的方式:
原有DataStream中添加专门用于分窗口的字段,但是可能会影响你窗口聚合的结果。
def dealing_input(str):(String,String){
val keyby_key = scala.util.Random.nextInt(20).toString+"-"+key
return (data,keyby_key)
}
input.keyby(_._2).window().xxx
如何在处理完将随时数去掉请参考另一篇文章:
https://www.jianshu.com/p/1bca3c2758c1
遇坑待更新