SparkStreaming和kafka整合
1.SparkStreaming 1.6 + kafka 0.8.2 Receiver模式
1):采用receiver模式,SparkStreaming 需要Executor线程池开启一个线程接收kafka数据;
2):接收到的数据,会在spark其他节点上备份,存储级别是MEMORY_AND_DISK_SER_2;
3):zookeeper负责更新维护消费者offset;
4):Executor向Driver端汇报数据位置;
5):Driver端发送task到Executor执行;
总结:
解决Driver端宕机造成的未处理完数据丢失,要开启WAL(WriteAheadLog)机制,接收到的数据在hdfs中也保存
需要设置checkpoint;
可以降低存储级别,去掉"_2" 级别
不过这样又会有新的问题,增加了备份hdfs,造成数据处理的延迟增加;
Receiver模式底层读取Kafka采用了是Hight consumer api 实现,不关心offset,只要数据,无法做到人为来管理
Receiver模式的并行度:offsetspark.streaming.blockInterval = 200ms 每隔blockInterval 将接收来的数据生成一个block,block组成batch,对应RDD中的一个个的partition;
2.SparkStreaming 1.6 + kafka 0.8.2 Direct模式
总结:
1):Direct模式没有采用Receiver接收器模式,不需要将所有批次数据接收到Spark中,需要数据时从kafka中取;
2):Direct模式使用Spark自己维护消费者offset,如果设置SparkStreaming.checkpoint,早checkpoint中也有一份;
3):Direct模式相对Recevier模式简化了并行度,生成的DStream的并行度与读取的topic的partition一一对应;
4):Direct 模式底层读取Kafka数据采用了Simple Consumer API ,可以做到从每批次获取offset,也可以将offset设置给SparkStreaming接着上次的位置处理数据---可以手动维护消费者offset,offset保存spark内存中不需要借助zookeeper;
3.SparkStreaming 2.3 + kafka 0.11 Direct模式
SparkStreaming2.3版本 读取kafka 中数据 :
1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。
2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化
3.kafka中有两个参数:
heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s.这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms 的1/3
session.timeout.ms :这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳,那么kafka将移除当前的消费者。这个时间默认是30s。这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数,
如果SparkSteaming 批次间隔时间大于5分钟,也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。
4.SparkStreaming消费策略:大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。
如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。
如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。
5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。
可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。
"注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access"
6.关于消费者offset
1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。
这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费,需要我们写代码来保证数据的输出幂等。
第二,当代码逻辑改变时,无法从checkpoint中来恢复offset.
2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的,
因为有可能消费者自动提交了offset到kafka特殊的topic中,但是后期SparkStreaming 没有将接收来的数据及时处理保存造成数据丢失的情况。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。
这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。
如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset.
注意:kafka集群重启后会清空所有消费者组保存的信息,这也是这种模式的弊端
3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。