大数据学习

Kudu:Spark SQL操作Kudu

2020-12-16  本文已影响0人  xiaogp

摘要:Spark SQLKudu

参考https://github.com/xieenze/SparkOnKudu/blob/master/src/main/scala/com/spark/test/KuduCRUD.scala

依赖

引入spark-core_2.11spark-sql_2.11kudu-spark2_2.11hadoop-client依赖包

<!--Spark 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

<!-- kudu -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.6.0-cdh5.14.2</version>
        </dependency>

<!-- Hadoop 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

读取数据

指定kudu.master"kudu.table,如果读取超时加入kudu.operation.timeout.ms参数

import org.apache.kudu.spark.kudu._

val df = spark.read.options(Map(
      "kudu.master" -> "cloudera01:7051,cloudera02:7051,cloudera03:7051",
      "kudu.table" -> "impala::default.students",
      "kudu.operation.timeout.ms" -> "1000000")).kudu

或者

val df = spark.read.format("org.apache.kudu.spark.kudu")
    .option("kudu.table", "impala::default.students")
    .option("kudu.master","cloudera01:7051")
    .option("kudu.operation.timeout.ms", "100000").load

写入数据

写入数据可以使用dataframe的write方法,也可以使用kuduContextupdateRowsinsertRowsupsertRowsinsertIgnoreRows方法

(1)无key则插入,有key则更新

直接调用dataframe的write方法指定kudu.masterkudu.table,只支持append模式,对已有key的数据自动更新

val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
df.write.options(Map(
      "kudu.master" -> "cloudera01:7051",
      "kudu.table" -> "impala::test_gp.student",
      "kudu.operation.timeout.ms" -> "1000000")).mode("append").kudu

调用kuduContext的upsertRows方法,效果和dataframe调用write append模式一样

    val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
    // 写入
    val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
    val sc = spark.sparkContext
    val kuduMasters = "cloudera01:7051"
    val kuduContext = new KuduContext(kuduMasters, sc)
    kuduContext.upsertRows(df, "impala::test_gp.student")
(2)只插入数据

调用kuduContext insertRowsinsertIgnoreRows方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key

    val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
    // 写入
    val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
    val sc = spark.sparkContext
    val kuduMasters = "cloudera01:7051"
    val kuduContext = new KuduContext(kuduMasters, sc)
    kuduContext.insertRows(df, "impala::test_gp.student")
    // kuduContext.insertIgnoreRows(df, "impala::test_gp.student")
(3)只更新数据

调用kuduContext updateRows方法,对已经存在的key数据做更新,如果key不存在直接报错

    val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
    // 写入
    val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
    val sc = spark.sparkContext
    val kuduMasters = "cloudera01:7051"
    val kuduContext = new KuduContext(kuduMasters, sc)
    kuduContext.updateRows(df, "impala::test_gp.student")

建表和删除表

使用已有dataframe的schema建表

def createTable1(spark: SparkSession, kuduContext: KuduContext) = {
    import spark.implicits._
    import scala.collection.JavaConverters._
    val df = Seq(("王二", 3), ("李振", 4)).toDF("name", "age")
    val kuduTableSchema = df.schema
    val kuduTablePrimaryKey = Seq("name")  // 定义主键
    val kuduTableOptions = new CreateTableOptions()
    kuduTableOptions.
      setNumReplicas(1).// 设置副本数量
      addHashPartitions(List("name").asJava,20)  //设置分区 数据分到几台机器上
    //4 创建表
    kuduContext.createTable("impala::test_gp.student_info", kuduTableSchema,
      kuduTablePrimaryKey, kuduTableOptions)
  }

使用StructType自定义schema

def createTable2(spark: SparkSession, kuduContext: KuduContext) = {
    import scala.collection.JavaConverters._
    val schema = new StructType()
      .add(StructField("name", StringType, false))
      .add(StructField("age", IntegerType, true))
    val kuduTablePrimaryKey = Seq("name") // 定义主键
    val kuduTableOptions = new CreateTableOptions()
    kuduTableOptions.
      setNumReplicas(1). // 设置副本数量
      addHashPartitions(List("name").asJava, 20) //设置分区 数据分到几台机器上
    //4 创建表
    kuduContext.createTable("impala::test_gp.student_info", schema,
      kuduTablePrimaryKey, kuduTableOptions)
  }


删除表和判断表是否存在

def deleteTable(kuduContext: KuduContext): Unit = {
    val kuduTableName: String = "impala::test_gp.student_info"
    // 如果不存在直接删除会报错
    if (kuduContext.tableExists(kuduTableName)) {
      kuduContext.deleteTable(kuduTableName)
    }
  }
上一篇 下一篇

猜你喜欢

热点阅读