Spark-Streaming Kafka In Kerbero
最近在HDP2.6的环境里尝试了Kerberos,在各组件运行正常的情况下最终成功运行spark-streaming应用,总结一下就是一叶障目,不见泰山,坑多梯子少。尤其在国内,关于Kerberos的资料较少,但在生产环境中,Kerberos又是如鲠在喉,无法忽视。
因此分享这篇文章,希望能给还在苦苦爬坑的小伙伴们一点帮助。
- 我们的HDP为单用户ocsp安装,多用户需要根据以下步骤进行细微修改
确认OCSP各组件的Kerberos工作正常
1. Kafka
-
使用kafka-topics.sh创建topic
-
使用kafka producer和consumer需要先kinit
kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM
-
使用producer发送消息,consumer消费消息
-
kafka producer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASL
-
kafka consumer
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667
-
FAQ:
- 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
- 否则:
-
kafka producer 报错:
[2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
-
kafka consumer 报错:
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
-
2. Hive
- kinit
- 使用beeline登录
3. Phoenix
- 使用sqlline与principal,keytab登录
进行Spark,Kafka针对Kerberos相关配置
1. 先放上最后提交任务的命令
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
-
--principal与--keytab这两个参数为spark需要的Kerberos认证信息
-
--driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"为driver连接kafka用到的认证信息,因此使用本地绝对路径
-
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"为executor连接kafka用到的Kerberos认证信息,因此使用container中的相对路径./
-
jaas文件中定义了principal与keytab,由于我们使用了yarn-client模式,driver需要的文件在本地文件系统,executor需要的文件需要我们使用--files的方式上传,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"
-
有的文档中说--files中传keytab文件会与spark本身的--keytab 冲突,其实是因为他们对spark和kafka使用了相同的principal和keytab,在上述命令中我为了清晰起见,让spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab,让spark连接kafka时使用了principal ocsp/ASIAINFO.COM(principal其实是在jaas文件中指定的,3中详细讲jaas文件) keytab ocsp.keytab,当spark提交任务时,yarn会将--keytab后面的keytab文件与--files里的文件先后上传,即 hdfs.headless.keytab与ocsp.keytab均会被上传,spark与kafka各取所需,即可正常工作。当spark与kafka要使用相同的keytab文件时,比如都用ocsp.keytab,那么yarn会先后上传两次ocsp.keytab,在spark正使用的时候更新了keytab,造成异常退出
-
因此如果spark与kafka需要使用相同的keytab文件,我们只需要在--files里不要上传keytab即可避免冲突
spark-submit --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar
- 还有一个问题是本例中drvier和executor使用了相同的kafka_client_jaas.conf,这也会造成一些问题,3中会详细说明
2. 生成keytab和principal
- 在KDC Server上执行
kadmin -p admin/admin@ASIAINFO.COM
- 生成principal,principal最好使用ocsp的用户名+domain
addprinc -randkey ocsp/ASIAINFO.COM
- 生成keytab
ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM
- 将keytab文件copy到spark driver所在的机器(因为OCSP默认使用yarn-client模式)
3. 创建spark读取kafka的jaas配置文件
- 配置文件kafka_client_jaas.conf样例如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=false
useKeyTab=true
principal="ocsp@ASIAINFO.COM"
keyTab="./ocsp.keytab"
renewTicket=true
storeKey=true
serviceName="ocsp";
};
-
其中useTicketCache指从系统的cash中读取credential信息,useKeyTab指从指定的keyTab文件读取credential
-
principal和keytab用第二步生成的principal与keytab,注意:k�eytab的路径
- 如果这个conf文件是给driver读取,则我们要用keytab文件在本地的绝对路径
- 如果这个conf文件是executor读取,则我们要用keytab文件在container中的相对路径,即./ocsp.keytab
- 如果为了方便起见,drvier与executor要使用相同的jaas文件,路径配置为./ocsp.keytab,我们需要将keytab文件copy到运行spark-submit的当前路径
- 如果driver和executor要使用不同的jaas文件,则driver的jaas文件中,keytab应为本地绝对路径,executor的jaas文件中,keytab应为相对路径./
4. 配置spark1.6+kafka0.10 jar包
- 在我们的应用中有两部分需要修改,一个是处理之前从kafka读取数据,一个是处理结束后向kafka写数据
- 由于kafka0.10版本后才支持Kerberos,而官方Spark2.* 之后才适配kafka0.10,但我们目前使用HDP2.6中spark1.6与2.* 双版本,Spark 1.6 + Kafka 0.10就需要使用HDP提供的spark-kafka-0-10-connector包,官网说明如下:https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_spark-component-guide/content/using-spark-streaming.html#spark-streaming-jar
- 通过官方maven的方式没有添加成功,可能是网络原因,因此我是从https://github.com/hortonworks-spark/skc下载源码,本地编译,然后添加本地jar包进我们的项目
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
<version>1.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>
5. 修改Spark读取Kafka部分
- 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
- 我们使用的DirectApi读取kafka
KafkaUtils.createDirectStream[String, String](
SSC,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))
KafkaParams配置如下:
val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "security.protocol" -> "SASL_PLAINTEXT"
, "bootstrap.servers" -> "kafka-server1:6667"
, "group.id" -> "test")
6. 修改Spark写Kafka部分
- 写kafka调用的是kafka官方的库
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>
- 代码中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
val props = new Properties()
props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
if (MainFrameConf.KERBEROS_ENABLE == "true"){
props.put("security.protocol","SASL_PLAINTEXT")
}
new KafkaProducer[String, String](props)