3、Kafka/flume概要
七、Kafka
·kafka是一个分布式消息系统。具有高性能、持久化、多副本备份、横向扩展能力。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。
·Kafka集群包含一个或多个服务器,服务器节点称为broker,broker存储topic的数据。broker可分为Controller与follower。Controller管理集群broker的上下线,所有topic的分区副本分配和leaderPartition选举等工作
·每条发布到Kafka集群的消息都有一个类别Topic,Topic像一个消息队列,每个topic包含一个或多个partition,Kafka分配的单位是partition。每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition,其他partition为flower作为备用选主。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
·offset:消费者在对应分区上已经消费的消息数(位置),kafka0.8 版本之前offset保存在zookeeper上。之后offset保存在kafka集群上。
1、kafka的文件存储机制
一个topic,有多个不同的partition,每个partition为一个目录。partition命名的规则是,topic的名称加上一个序号,序号从0开始。
每一个partition目录下的文件,被平均切割成大小相等的数据文件(每一个数据文件都被称为一个段(segment file);每一个segment段消息,数量不一定相等,使得老的segment可以被快速清除。默认保留7天的数据,每次满1G后,在写入到一个新的文件中。
每一个partition只需要支持顺序读写就可以,也就是说它只会往文件的末尾追加数据,这就是顺序写的过程,生产者只会对每一个partition做数据的追加(写操作)。
在partition目录下,有两类文件,一类是以log为后缀的文件,一类是以index为后缀的文件,每一个log文件和一个index文件相对应,这一对文件就是一个segment file,也就是一个段。log文件:就是数据文件,里面存放的就是消息, index文件:是索引文件,记录元数据信息。
元数据指向,对应的数据文件(log文件)中消息的物理偏移地址。log文件达到1个G后滚动重新生成新的log文件。
2、Kafka内部数据不丢失
调整Producer,consumer,broker的各项参数,保证Kafka内部数据不丢失
①producer:acks参数、retry参数、
1) 高可用型,配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)
优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长
2)折中型,配置:acks = 1 ,retries > 0 retries时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)
优点:保证了消息的可靠性和吞吐量,是个折中的方案;缺点:性能处于2者中间
3)高吞吐型,配置:acks = 0
优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求;缺点:不知道发送的消息是否成功
② Consumer: group.id 、auto.offset.reset 、enable.auto.commit
1)设置consumergroup分组的id,group.id:如果为空,则会报异常
2)设置从何处开始进行消费auto.offset.reset = earliest(最早) /latest(最晚)
3)设置是否开启自动提交消费位移的功能,默认开启 enable.auto.commit= true/false(默认true)
③Broker:replication-factor、min.insync.replicas、unclean.leander.election.enable
1)replication-factor >=2
在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。
2)min.insync.replicas = 2
分区ISR队列集合中最少有多少个副本,默认值是1
3)unclean.leander.election.enable = false
是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。
3、kafka调优,提升生产者的吞吐量
1)设置发送消息的缓冲区buffer.memory:默认32MB。如果发送消息速度小于写入消息速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住;
2)设置压缩compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。
3)设置batch的大小batch.size,默认16kb,就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量。如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
4)设置消息的发送延迟linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。一般设置一个100毫秒之内的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
4、sparkStreaming整合kafka
sparkStreaming对接kafka两种方式:
(1)Receiver模式,由kafka将数据发送数据,Spark Streaming被动接收数据;
在spark的executor当中启动了一些receiver的线程,专门去kafka拉取数据,拉取回来的数据这些receiver不会处理,然后另外一些线程专门来处理数据,基于kafka的high level API进行消费,offset自动保存到了zk当中去了,不用我们主动去维护offset的值
问题:拉取数据线程以及处理数据线程互相不会通信,造成问题:处理数据线程挂掉了,拉取数据的线程还在继续拉取数据,数据全部都堆积在execotr里面了
(2)Direct模式,由Spark Streaming主动去kafka中拉取数据。
不再单独启动线程去拉取数据,获取到的数据也不用保存在executor内存里面了,获取到的数据直接就进行处理。
问题:使用kafka的low level API进行消费,需要自己手动的维护offset值
sparkStreaming整合kafka官网提供两个jar包:
一个是基于0.8版本整合:提供两种方式整合,receiver和direct方式;
一个是基于0.10版本整合:只提供了direct方式整合。
5、在Kafka中broker的意义是什么?
在Kafka集群中,broker指Kafka服务器。
Topic主题,可以理解为一个队列。
Partition分区,为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
Offset偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
Broker一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
Producer消息生产者,向kafka broker发消息的客户端
Consumer消息消费者,向kafka broker取消息的客户端
Consumer Group消费者组,这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
6、Kafka服务器能接收到的最大信息是多少?
Kafka服务器可以接收到的消息的最大大小是1000000字节。
7、Kafka中的ZooKeeper是什么?Kafka是否可以脱离ZooKeeper独立运行?
Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。
不可以,不可能越过Zookeeper直接联系Kafka broker,一旦Zookeeper停止工作,它就不能服务客户端请求。
Zookeeper主要用于在集群中不同节点之间进行通信,在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
8、解释Kafka的用户如何消费信息?
在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节Socket转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
9、解释如何提高远程用户的吞吐量?
如果用户位于与broker不同的数据中心,则可能需要调优Socket缓冲区大小,以对长网络延迟进行摊销。
10、解释一下,在数据制作过程中,你如何能从Kafka得到准确的信息?
在数据中,为了精确地获得Kafka的消息,你必须遵循两件事: 在数据消耗期间避免重复,在数据生产过程中避免重复。
这里有两种方法,可以在数据生成时准确地获得一个语义:
每个分区使用一个单独的写入器,每当你发现一个网络错误,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功
在消息中包含一个主键(UUID或其他),并在用户中进行反复制
11、解释如何减少ISR中的扰动?broker什么时候离开ISR?(☆☆☆☆☆)
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。ISR应该总是包含所有的副本,直到出现真正的故障。如果一个副本从leader中脱离出来,将会从ISR中删除。
12、Kafka为什么需要复制?
Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
13、如果副本在ISR中停留了很长时间表明什么?
如果一个副本在ISR中保留了很长一段时间,那么它就表明,跟踪器无法像在leader收集数据那样快速地获取数据。
14、请说明如果首选的副本不在ISR中会发生什么?
如果首选的副本不在ISR中,控制器将无法将leadership转移到首选的副本。
15、Kafka有可能在生产后发生消息偏移吗?
在大多数队列系统中,作为生产者的类无法做到这一点,它的作用是触发并忘记消息。broker将完成剩下的工作,比如使用id进行适当的元数据处理、偏移量等。
作为消息的用户,你可以从Kafka broker中获得补偿。如果你注视SimpleConsumer类,你会注意到它会获取包括偏移量作为列表的MultiFetchResponse对象。此外,当你对Kafka消息进行迭代时,你会拥有包括偏移量和消息发送的MessageAndOffset对象。
16、kafka 的消息投递保证机制以及如何实现?(☆☆☆☆☆)
Kafka支持三种消息投递语义:
① At most once 消息可能会丢,但绝不会重复传递
② At least one 消息绝不会丢,但可能会重复传递
③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的。
consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。
可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
·读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once。
·读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于At least once。
·如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交,但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。
总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once,而Exactly once要求与目标存储系统协作,Kafka提供的offset可以较为容易地实现这种方式。
17、如何保证Kafka的消息有序(☆☆☆☆☆)
Kafka对于消息的重复、丢失、错误以及顺序没有严格的要求。
Kafka只能保证一个partition中的消息被某个consumer消费时是顺序的,事实上,从Topic角度来说,当有多个partition时,消息仍然不是全局有序的。
18、kafka数据丢失问题,及如何保证
1)数据丢失:
acks=1的时候(只保证写入leader成功),如果刚好leader挂了。数据会丢失。
acks=0的时候,使用异步模式的时候,该模式下kafka无法保证消息,有可能会丢。
2)brocker如何保证不丢失:
acks=all: 所有副本都写入成功并确认。
retries = 一个合理值。
min.insync.replicas=2 消息至少要被写入到这么多副本才算成功。
unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。
3)Consumer如何保证不丢失
如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。
enable.auto.commit=false 关闭自动提交offset
处理完数据之后手动提交。
19、kafka的balance是怎么做的
官方原文
Producers publish data to the topics of their choice. The producer is able to choose which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message). More on the use of partitioning in a second.
翻译:
生产者将数据发布到他们选择的主题。生产者可以选择在主题中分配哪个分区的消息。这可以通过循环的方式来完成,只是为了平衡负载,或者可以根据一些语义分区功能(比如消息中的一些键)来完成。更多关于分区在一秒钟内的使用。
20、kafka的消费者方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞。
21、为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?
九、Flume(☆☆☆☆)
1、Flume使用场景(☆☆☆☆☆)
线上数据一般主要是落地(存储到磁盘)或者通过socket传输给另外一个系统,这种情况下,你很难推动线上应用或服务去修改接口,实现直接向kafka里写数据,这时候你可能就需要flume这样的系统帮你去做传输。
2、Flume丢包问题(☆☆☆☆☆)
单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包,因此很多公司在搭建系统时,抛弃了Flume,自己研发传输系统,但是往往会参考Flume的Source-Channel-Sink模式。
一些公司在Flume工作过程中,会对业务日志进行监控,例如Flume agent中有多少条日志,Flume到Kafka后有多少条日志等等,如果数据丢失保持在1%左右是没有问题的,当数据丢失达到5%左右时就必须采取相应措施。
3、Flume与Kafka的选取
采集层主要可以使用Flume、Kafka两种技术。
·Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API;Kafka是一个可持久化的分布式的消息队列。
Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。
Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。
Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。
Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
4、数据怎么采集到Kafka,实现方式
使用官方提供的flumeKafka插件,插件的实现方式是自定义了flume的sink,将数据从channle中取出,通过kafka的producer写入到kafka中,可以自定义分区等。
5、flume管道内存,flume宕机了数据丢失怎么解决
1)Flume的channel分为很多种,可以将数据写入到文件。
2)防止非首个agent宕机的方法数可以做集群或者主备
6、flume配置方式,flume集群(问的很详细)
Flume的配置围绕着source、channel、sink叙述,flume的集群是做在agent上的,而非机器上。
7、flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?
优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。
缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。
8、flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。
Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka试讲日志缓存在kafka集群,待后期可以采集到存储层。
Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。
9、flume有哪些组件,flume的source、channel、sink具体是做什么的
1)source:用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。
2)channel:用于桥接Sources和Sinks,类似于一个队列。
3)sink:从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。
10、你是如何实现flume数据传输的监控的
11、你们的Flume怎么做数据监听?有没有做ETL?