数客联盟

Spark-Streaming Kafka In Kerbero

2017-07-28  本文已影响1316人  杨光明子

最近在HDP2.6的环境里尝试了Kerberos,在各组件运行正常的情况下最终成功运行spark-streaming应用,总结一下就是一叶障目,不见泰山,坑多梯子少。尤其在国内,关于Kerberos的资料较少,但在生产环境中,Kerberos又是如鲠在喉,无法忽视。

因此分享这篇文章,希望能给还在苦苦爬坑的小伙伴们一点帮助。

确认OCSP各组件的Kerberos工作正常

1. Kafka

FAQ:


2. Hive


3. Phoenix


进行Spark,Kafka针对Kerberos相关配置

1. 先放上最后提交任务的命令

spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 

2. 生成keytab和principal


3. 创建spark读取kafka的jaas配置文件

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="ocsp@ASIAINFO.COM"
  keyTab="./ocsp.keytab"
  renewTicket=true
  storeKey=true
  serviceName="ocsp";
};

4. 配置spark1.6+kafka0.10 jar包

<dependency>
                    <groupId>com.hortonworks</groupId>
                    <artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
                    <version>1.0.1</version>
                    <scope>system</scope>
                    <systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>

5. 修改Spark读取Kafka部分

KafkaUtils.createDirectStream[String, String](
        SSC,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))

KafkaParams配置如下:

val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
            , "key.deserializer" -> classOf[StringDeserializer]
            , "value.deserializer" -> classOf[StringDeserializer]
            , "security.protocol" -> "SASL_PLAINTEXT"
            , "bootstrap.servers" -> "kafka-server1:6667"
            , "group.id" -> "test")

6. 修改Spark写Kafka部分

<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.1.1</version>
</dependency>
val props = new Properties()
      props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      if (MainFrameConf.KERBEROS_ENABLE == "true"){
        props.put("security.protocol","SASL_PLAINTEXT")
      }
new KafkaProducer[String, String](props)
上一篇下一篇

猜你喜欢

热点阅读