面试题2
17.分区分桶的区别,为什么要分区
分区表:原来的一个大表存储的时候分成不同的数据目录进行存储。如果说是单分区表,那么在表的目录下就只有一级子目录,如果说是多分区表,那么在表的目录下有多少分区就有多少级子目录。不管是单分区表,还是多分区表,在表的目录下,和非最终分区目录下是不能直接存储数据文件的
分桶表:原理和hashpartitioner 一样,将hive中的一张表的数据进行归纳分类的时候,归纳分类规则就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)
分区表和分桶的区别除了存储的格式不同外,最主要的是作用:
分区表:细化数据管理,缩小mapreduce程序 需要扫描的数据量。
分桶表:提高join查询的效率,在一份数据会被经常用来做连接查询的时候建立分桶,分桶字段就是连接字段;提高采样的效率。
有了分区为什么还要分桶?
(1)获得更高的查询处理效率。桶为表加上了额外的结构,Hive在处理有些查询时能利用这个结构。
(2)使取样( sampling)更高效。在处理大规模数据集时,在开发和修改査询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。
分桶是相对分区进行更细粒度的划分。分桶将表或者分区的某列值进行hash值进行区分,如要安装name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。
与分区不同的是,分区依据的不是真实数据表文件中的列,而是我们指定的伪列,但是分桶是依据数据表中真实的列而不是伪列
18.mapjoin的原理
![](https://img.haomeiwen.com/i3109223/234587de4b0ae540.png)
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示 *+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数hive.auto.convert.join来控制,默认为true.
假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。减少昂贵的shuffle操作及reduce操作
MapJoin分为两个阶段:
通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
19.在hive的row_number中distribute by 和 partition by的区别
20.hive开发中遇到什么问题?
21.什么时候使用内部表,什么时候使用外部表
22.hive都有哪些函数,你平常工作中用到哪些
数学函数
round(DOUBLE a)
floor(DOUBLE a)
ceil(DOUBLE a)
rand()
集合函数
size(Map<K.V>)
map_keys(Map<K.V>)
map_values(Map<K.V>)
array_contains(Array<T>, value)
sort_array(Array<T>)
类型转换函数
cast(expr as <type>)
日期函数
date_format函数(根据格式整理日期)
date_add、date_sub函数(加减日期)
next_day函数
last_day函数(求当月最后一天日期)
collect_set函数
get_json_object解析json函数
from_unixtime(bigint unixtime, string format)
to_date(string timestamp)
year(string date)
month(string date)
hour(string date)
weekofyear(string date)
datediff(string enddate, string startdate)
add_months(string start_date, int num_months)
date_format(date/timestamp/string ts, string fmt)
条件函数
if(boolean testCondition, T valueTrue, T valueFalseOrNull)
nvl(T value, T default_value)
COALESCE(T v1, T v2, ...)
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
isnull( a )
isnotnull ( a )
字符函数
concat(string|binary A, string|binary B...)
concat_ws(string SEP, string A, string B...)
get_json_object(string json_string, string path)
length(string A)
lower(string A) lcase(string A)
parse_url(string urlString, string partToExtract [, string keyToExtract])
regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT)
reverse(string A)
split(string str, string pat)
substr(string|binary A, int start) substring(string|binary A, int start)
聚合函数
count sum min max avg
表生成函数
explode(array<TYPE> a)
explode(ARRAY)
json_tuple(jsonStr, k1, k2, ...)
parse_url_tuple(url, p1, p2, ...)
23.手写sql,连续活跃用户
24.left semi join和left join区别
25.group by为什么要排序
26.说说印象最深的一次优化场景,hive常见的优化思路
27.聊聊hive的执行引擎,spark和mr的区别?
28.hive的join底层mr是如何实现的?
29.sql问题,连续几天活跃的用户?
30.建好了外部表,用什么语句把数据文件加载到表里
31.Hive的执行流程?
32.hive的元数据信息存储在哪?
33.sql语句的执行顺序from-where-group by-having -select-order by -limit
34.on和where的区别
35.hive和传统数据库之间的区别
1、写时模式和读时模式
传统数据库是写时模式,在load过程中,提升了査询性能,因为预先解析之后可以对列建立索引,并压缩,但这样也会花费更多的加载时间。
Hive是读时模式,1 oad data非常迅速,因为它不需要读取数据进行解析,仅仅进行文件的复制或者移动。
2、数据格式。Hive中没有定义专门的数据格式,由用户指定,需要指定三个属性:列分隔符,行分隔符,以及读取文件数据的方法。数据库中,存储引擎定义了自己的数据格式。所有数据都会按照一定的组织存储
3、数据更新。Hive的内容是读多写少的,因此,不支持对数据的改写和删除,数据都在加载的时候中确定好的。数据库中的数据通常是需要经常进行修改
4、执行延迟。Hive在查询数据的时候,需要扫描整个表(或分区),因此延迟较高,只有在处理大数据是才有优势。数据库在处理小数据是执行延迟较低。
5、索引。Hive比较弱,不适合实时查询。数据库有。
6、执行。Hive是 Mapreduce,数据库是 Executor
7、可扩展性。Hive高,数据库低
8、数据规模。Hive大,数据库小
36.hive中导入数据的4种方式
从本地导入:load data local inpath home/liuzc into table ods.test
从hdfs导入:load data inpath user/hive/warehouse/a.txt into ods.test
查询导入:create table tmp_test as select * from ods.test
查询结果导入:insert into table tmp.test select * from ods.test
三.Spark
1.rdd的属性
![](https://img.haomeiwen.com/i3109223/e70b8400ee495c2b.png)
一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2.算子分为哪几类(RDD支持哪几种类型的操作)
转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。
转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce 等等。
动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。
动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect 该方法把数据收集到driver端 Array数组类型
所有的transformation只有遇到action才能被执行。
当触发执行action之后,数据类型不再是rdd了,数据就会存储到指定文件系统中,或者直接打印结 果或者收集起来。
3.创建rdd的几种方式
1.集合并行化创建(有数据)
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
2.读取外部文件系统,如hdfs,或者读取本地文件(最常用的方式)(没数据)
val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")
// 读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
3.从父RDD转换成新的子RDD
调用Transformation类的方法,生成新的RDD
4.spark运行流程
![](https://img.haomeiwen.com/i3109223/ebb597906275bdd0.png)
Worker的功能:定时和master通信;调度并管理自身的executor
executor:由Worker启动的,程序最终在executor中运行,(程序运行的一个容器)
spark-submit命令执行时,会根据master地址去向 Master发送请求,
Master接收到Dirver端的任务请求之后,根据任务的请求资源进行调度,(打散的策略),尽可能的 把任务资源平均分配,然后向WOrker发送指令
Worker收到Master的指令之后,就根据相应的资源,启动executor(cores,memory)
executor会向dirver端建立请求,通知driver,任务已经可以运行了
driver运行任务的时候,会把任务发送到executor中去运行。
5.Spark中coalesce与repartition的区别
1)关系:
两者都是用来改变 RDD 的 partition 数量的,repartition 底层调用的就是 coalesce 方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition 一定会发生 shuffle,coalesce 根据传入的参数来判断是否发生 shuffle
一般情况下增大 rdd 的 partition 数量使用 repartition,减少 partition 数量时使用coalesce
6.sortBy 和 sortByKey的区别
sortBy既可以作用于RDD[K] ,还可以作用于RDD[(k,v)]
sortByKey 只能作用于 RDD[K,V] 类型上。
7.map和mapPartitions的区别
![](https://img.haomeiwen.com/i3109223/249423cdc7aa3504.png)
8.数据存入Redis 优先使用map mapPartitions foreach foreachPartions哪个
使用 foreachPartition
* 1,map mapPartition 是转换类的算子, 有返回值
* 2, 写mysql,redis 的连接
foreach * 100万 100万次的连接
foreachPartions * 200 个分区 200次连接 一个分区中的数据,共用一个连接
foreachParititon 每次迭代一个分区,foreach每次迭代一个元素。
该方法没有返回值,或者Unit
主要作用于,没有返回值类型的操作(打印结果,写入到mysql数据库中)
在写入到redis,mysql的时候,优先使用foreachPartititon
9.reduceByKey和groupBykey的区别
![](https://img.haomeiwen.com/i3109223/c884d3a8028e37d4.png)
reduceByKey会传一个聚合函数, 相当于 groupByKey + mapValues
reduceByKey 会有一个分区内聚合,而groupByKey没有 最核心的区别
结论:reduceByKey有分区内聚合,更高效,优先选择使用reduceByKey。
10.cache和checkPoint的比较
都是做 RDD 持久化的
1.缓存,是在触发action之后,把数据写入到内存或者磁盘中。不会截断血缘关系
(设置缓存级别为memory_only:内存不足,只会部分缓存或者没有缓存,缓存会丢失,memory_and_disk :内存不足,会使用磁盘)
2.checkpoint 也是在触发action之后,执行任务。单独再启动一个job,负责写入数据到hdfs中。(把rdd中的数据,以二进制文本的方式写入到hdfs中,有几个分区,就有几个二进制文件)
3.某一个RDD被checkpoint之后,他的父依赖关系会被删除,血缘关系被截断,该RDD转换成了CheckPointRDD,以后再对该rdd的所有操作,都是从hdfs中的checkpoint的具体目录来读取数据。缓存之后,rdd的依赖关系还是存在的。
11.spark streaming流式统计单词数量代码
object WordCountAll {
newValues当前批次的出现的单词次数, runningCount表示之前运行的单词出现的结果
* def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)// 将历史前几个批次的值和当前批次的值进行累加返回当前批次最终的结果
Some(newCount)
}*/
**
* String : 单词 hello
* Seq[Int] :单词在当前批次出现的次数
* Option[Int] :历史结果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
}
屏蔽日志
Logger.getLogger("org.apache").setLevel(Level.ERROR)
def main(args: Array[String]) {
必须要开启2个以上的线程,一个线程用来接收数据,另外一个线程用来计算
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
设置sparkjob计算时所采用的序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.rdd.compress", "true") 节约大量的内存内容
如果你的程序出现垃圾回收时间过程,可以设置一下java的垃圾回收参数
同时也会创建sparkContext对象
批次时间 >= 批次处理的总时间 (批次数据量,集群的计算节点数量和配置)
val ssc = new StreamingContext(conf, Seconds(5))
做checkpoint 写入共享存储中
ssc.checkpoint("c://aaa")
创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.175.101", 44444)
updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
打印结果到控制台
results.print()
开始计算
ssc.start()
等待停止
ssc.awaitTermination()
}
}
12.简述map和flatMap的区别和应用场景
map是对每一个元素进行操作,flatmap是对每一个元素操作后并压平
13.计算曝光数和点击数
![](https://img.haomeiwen.com/i3109223/df3950ade6f8ba2f.png)
14.分别列出几个常用的transformation和action算子
转换算子:map,map,filter,reduceByKey,groupByKey,groupBy
行动算子:foreach,foreachpartition,collect,collectAsMap,take,top,first,count,countByKey
15.按照需求使用spark编写以下程序,要求使用scala语言
当前文件a.txt的格式,请统计每个单词出现的次数
A,b,c
B,b,f,e
objectWordCount {
def main(args: Array[String]):Unit= {
valconf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
valsc = new SparkContext(conf)
varsData: RDD[String] = sc.textFile("a.txt")
valsortData: RDD[(String,Int)] = sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
sortData.foreach(print)
}
}
16.spark应用程序的执行命令是什么?
/usr/local/spark-current2.3/bin/spark-submit \
--class com.wedoctor.Application \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 2g \
--queue root.wedw \
--num-executors 200 \
--jars home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar \
/home/pgxl/liuzc/sen.jar
17.Spark应用执行有哪些模式,其中哪几种是集群模式
本地local模式
standalone模式
spark on yarn模式
spark on mesos模式
其中,standalone模式,spark on yarn模式,spark on mesos模式是集群模式
18.请说明spark中广播变量的用途
使用广播变量,每个 Executor 的内存中,只驻留一份变量副本,而不是对 每个 task 都传输一次大变量,省了很多的网络传输, 对性能提升具有很大帮助, 而且会通过高效的广播算法来减少传输代价。
19.以下代码会报错吗?如果会怎么解决 val arr = new ArrayList[String]; arr.foreach(println)
val arr = new ArrayList[String]; 这里会报错,需要改成 val arr: Array[String] = new Array[String](10)
arr.foreach(println)打印不会报空指针
20.写出你用过的spark中的算子,其中哪些会产生shuffle过程
reduceBykey:
groupByKey:
…ByKey:
21.Spark中rdd与partition的区别
22.请写出创建Dateset的几种方式
23.描述一下RDD,DataFrame,DataSet的区别?
1)RDD
优点:
编译时类型安全
编译时就能检查出类型错误
面向对象的编程风格
直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销
无论是集群间的通信, 还是 IO 操作都需要对对象的结构和数据进行序列化和反序列化。
GC 的性能开销,频繁的创建和销毁对象, 势必会增加 GC
2)DataFrame
DataFrame 引入了 schema 和 off-heap
schema : RDD 每一行的数据, 结构都是一样的,这个结构就存储在 schema 中。Spark 通过 schema 就能够读懂数据, 因此在通信和 IO 时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
3)DataSet
DataSet 结合了 RDD 和 DataFrame 的优点,并带来的一个新的概念 Encoder。
当序列化数据时,Encoder 产生字节码与 off-heap 进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark 还没有提供自定义 Encoder 的 API,但是未来会加入。
三者之间的转换:
![](https://img.haomeiwen.com/i3109223/aaa4d50f6f9930df.png)
24.描述一下Spark中stage是如何划分的?描述一下shuffle的概念
25.Spark 在yarn上运行需要做哪些关键的配置工作?如何kill -个Spark在yarn运行中Application
26.通常来说,Spark与MapReduce相比,Spark运行效率更高。请说明效率更高来源于Spark内置的哪些机制?并请列举常见spark的运行模式?
27.RDD中的数据在哪?
RDD中的数据在数据源,RDD只是一个抽象的数据集,我们通过对RDD的操作就相当于对数据进行操作。
28.如果对RDD进行cache操作后,数据在哪里?
数据在第一执行cache算子时会被加载到各个Executor进程的内存中,第二次就会直接从内存中读取而不会区磁盘。
29.Spark中Partition的数量由什么决定
和Mr一样,但是Spark默认最少有两个分区。
30.Scala里面的函数和方法有什么区别
31.SparkStreaming怎么进行监控?
32.Spark判断Shuffle的依据?
父RDD的一个分区中的数据有可能被分配到子RDD的多个分区中
33.Scala有没有多继承?可以实现多继承么?
34.Sparkstreaming和flink做实时处理的区别
35.Sparkcontext的作用
36.Sparkstreaming读取kafka数据为什么选择直连方式
37.离线分析什么时候用sparkcore和sparksq
38.Sparkstreaming实时的数据不丢失的问题
39.简述宽依赖和窄依赖概念,groupByKey,reduceByKey,map,filter,union五种操作哪些会导致宽依赖,哪些会导致窄依赖
40.数据倾斜可能会导致哪些问题,如何监控和排查,在设计之初,要考虑哪些来避免
41.有一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条
42.现有一文件,格式如下,请用spark统计每个单词出现的次数
18619304961,18619304064,186193008,186193009
18619304962,18619304065,186193007,186193008
18619304963,18619304066,186193006,186193010
43.共享变量和累加器
累加器(accumulator)是 Spark 中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
44.当 Spark 涉及到数据库的操作时,如何减少 Spark 运行中的数据库连接数?
使用 foreachPartition 代替 foreach,在 foreachPartition 内获取数据库的连接。
45.特别大的数据,怎么发送到excutor中?
46.spark调优都做过哪些方面?
47.spark任务为什么会被yarn kill掉?
48.Spark on Yarn作业执行流程?yarn-client和yarn-cluster有什么区别?
49.Flatmap底层编码实现?
Spark flatMap 源码:
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
Scala flatMap 源码:
/** Creates a new iterator by applying a function to all values produced by this iterator
* and concatenating the results.
*
* @param f the function to apply on each element.
* @return the iterator resulting from applying the given iterator-valued function
* `f` to each value produced by this iterator and concatenating the results.
* @note Reuse: $consumesAndProducesIterator
*/
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
but slightly shorter bytecode (better JVM inlining!)
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
def next(): B =<span style="color:#ffffff"> <span style="background-color:rgb(255,0,0)">(if (hasNext) cur else empty).next()</span></span>
}
flatMap其实就是将RDD里的每一个元素执行自定义函数f,这时这个元素的结果转换成iterator,最后将这些再拼接成一个
新的RDD,也可以理解成原本的每个元素由横向执行函数f后再变为纵向。画红部分一直在回调,当RDD内没有元素为止。
50.spark_1.X与spark_2.X区别
51.说说spark与flink
52.spark streaming如何保证7*24小时运行机制?
53.spark streaming是Exactly-Once吗?
四.Kafka
1.Kafka名词解释和工作方式
Producer :消息生产者,就是向kafka broker发消息的客户端。
Consumer :消息消费者,向kafka broker取消息的客户端
Topic :咋们可以理解为一个队列。
Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
2.Consumer与topic关系
本质上kafka只支持Topic;
每个group中可以有多个consumer,每个consumer属于一个consumer group;
通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
3.kafka中生产数据的时候,如何保证写入的容错性?
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack
request.required.acks=0
4.如何保证kafka消费者消费数据是全局有序的
伪命题
每个分区内,每条消息都有一个offset,故只能保证分区内有序。
如果要全局有序的,必须保证生产有序,存储有序,消费有序。
由于生产可以做集群,存储可以分片,消费可以设置为一个consumerGroup,要保证全局有序,就需要保证每个环节都有序。
只有一个可能,就是一个生产者,一个partition,一个消费者。这种场景和大数据应用场景相悖。
5.有两个数据源,一个记录的是广告投放给用户的日志,一个记录用户访问日志,另外还有一个固定的用户基础表记录用户基本信息(比如学历,年龄等等)。现在要分析广告投放对与哪类用户更有效,请采用熟悉的技术描述解决思路。另外如果两个数据源都是实时数据源(比如来自kafka),他们数据在时间上相差5分钟,需要哪些调整来解决实时分析问题?
6.Kafka和SparkStreaing如何集成?
7.列举Kafka的优点,简述Kafka为什么可以做到每秒数十万甚至上百万消息的高效分发?
8.为什么离线分析要用kafka?
Kafka的作用是解耦,如果直接从日志服务器上采集的话,实时离线都要采集,等于要采集两份数据,而使用了kafka的话,只需要从日志服务器上采集一份数据,然后在kafka中使用不同的两个组读取就行了
9.Kafka怎么进行监控?
Kafka Manager
10.Kafka与传统的消息队列服务有很么不同
11.Kafka api low-level与high-level有什么区别,使用low-level需要处理哪些细节
12.Kafka的ISR副本同步队列
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
13.Kafka消息数据积压,Kafka消费能力不足怎么处理?
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
14.Kafka中的ISR、AR又代表什么?
ISR:in-sync replica set (ISR),与leader保持同步的follower集合
AR:分区的所有副本
15.Kafka中的HW、LEO等分别代表什么?
LEO:每个副本的最后条消息的offset
HW:一个分区中所有副本最小的offset
16.哪些情景会造成消息漏消费?
先提交offset,后消费,有可能造成数据的重复
17.当你使用kafka-topics.sh创建了一个topic之后,Kafka背后会执行什么逻辑?
1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first
2)触发Controller的监听程序
3)kafka Controller 负责topic的创建工作,并更新metadata cache
18.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
19.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以减少,被删除的分区数据难以处理。
20.Kafka有内部的topic吗?如果有是什么?有什么所用?
__consumer_offsets,保存消费者offset
21.聊一聊Kafka Controller的作用?
负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
22.失效副本是指什么?有那些应对措施?
不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
23.Kafka 都有哪些特点?
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写
24.请简述下你在哪些场景下会选择 Kafka?
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和 Flink
25.Kafka 的设计架构你知道吗?
简单架构如下
![](https://img.haomeiwen.com/i3109223/e43ca9277f1cc711.png)
详细如下
![](https://img.haomeiwen.com/i3109223/20133b53a6863f38.png)
Kafka 架构分为以下几个部分
Producer :消息生产者,就是向 kafka broker 发消息的客户端。
Consumer :消息消费者,向 kafka broker 取消息的客户端。
Topic :可以理解为一个队列,一个 Topic 又分为一个或多个分区。
Consumer Group:这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 Consumer Group。
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker上,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 consumer,kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。
26.Kafka 分区的目的?
分区对于 Kafka 集群的好处是:实现负载均衡。分区对于消费者来说,可以提高并发度,提高效率。
27.你知道 Kafka 是如何做到消息的有序性?
kafka 中的每个 partition 中的消息在写入时都是有序的,而且消息带有offset偏移量,消费者按偏移量的顺序从前往后消费,从而保证了消息的顺序性。但是分区之间的消息是不保证有序的。
28.Kafka 的高可靠性是怎么实现的?
kafka通过分区的多副本机制来保证消息的可靠性。1. 每个分区可以设置多个副本,这些副本分布在不同的broker上;2. 相同partition的多个副本能动态选举leader来对外服务和管理内部数据同步。这样,即使有broker出现故障,受影响的partition也会在其他broker上重新选举出新的leader来继续服务
更具体来说,可参看下文:
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
Producer 往 Broker 发送消息
如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:
acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。
acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃。
acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。
Leader 选举
在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见 《一文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。
所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。
综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:
producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;
29.请谈一谈 Kafka 数据一致性原理
一致性就是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。
![](https://img.haomeiwen.com/i3109223/da9b9467645b2f68.png)
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。
当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。