Kudu:Spark SQL操作Kudu
2020-12-16 本文已影响0人
xiaogp
摘要:Spark SQL
,Kudu
参考https://github.com/xieenze/SparkOnKudu/blob/master/src/main/scala/com/spark/test/KuduCRUD.scala
依赖
引入spark-core_2.11
,spark-sql_2.11
,kudu-spark2_2.11
,hadoop-client
依赖包
<!--Spark 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.6.0-cdh5.14.2</version>
</dependency>
<!-- Hadoop 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
读取数据
指定kudu.master"
,kudu.table
,如果读取超时加入kudu.operation.timeout.ms
参数
import org.apache.kudu.spark.kudu._
val df = spark.read.options(Map(
"kudu.master" -> "cloudera01:7051,cloudera02:7051,cloudera03:7051",
"kudu.table" -> "impala::default.students",
"kudu.operation.timeout.ms" -> "1000000")).kudu
或者
val df = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.table", "impala::default.students")
.option("kudu.master","cloudera01:7051")
.option("kudu.operation.timeout.ms", "100000").load
写入数据
写入数据可以使用dataframe的write
方法,也可以使用kuduContext
的updateRows
,insertRows
,upsertRows
,insertIgnoreRows
方法
(1)无key则插入,有key则更新
直接调用dataframe的write方法指定kudu.master
,kudu.table
,只支持append
模式,对已有key的数据自动更新
val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
df.write.options(Map(
"kudu.master" -> "cloudera01:7051",
"kudu.table" -> "impala::test_gp.student",
"kudu.operation.timeout.ms" -> "1000000")).mode("append").kudu
调用kuduContext的upsertRows
方法,效果和dataframe调用write append模式一样
val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
// 写入
val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
val sc = spark.sparkContext
val kuduMasters = "cloudera01:7051"
val kuduContext = new KuduContext(kuduMasters, sc)
kuduContext.upsertRows(df, "impala::test_gp.student")
(2)只插入数据
调用kuduContext insertRows
,insertIgnoreRows
方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key
val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
// 写入
val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
val sc = spark.sparkContext
val kuduMasters = "cloudera01:7051"
val kuduContext = new KuduContext(kuduMasters, sc)
kuduContext.insertRows(df, "impala::test_gp.student")
// kuduContext.insertIgnoreRows(df, "impala::test_gp.student")
(3)只更新数据
调用kuduContext updateRows
方法,对已经存在的key数据做更新,如果key不存在直接报错
val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
// 写入
val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
val sc = spark.sparkContext
val kuduMasters = "cloudera01:7051"
val kuduContext = new KuduContext(kuduMasters, sc)
kuduContext.updateRows(df, "impala::test_gp.student")
建表和删除表
使用已有dataframe的schema建表
def createTable1(spark: SparkSession, kuduContext: KuduContext) = {
import spark.implicits._
import scala.collection.JavaConverters._
val df = Seq(("王二", 3), ("李振", 4)).toDF("name", "age")
val kuduTableSchema = df.schema
val kuduTablePrimaryKey = Seq("name") // 定义主键
val kuduTableOptions = new CreateTableOptions()
kuduTableOptions.
setNumReplicas(1).// 设置副本数量
addHashPartitions(List("name").asJava,20) //设置分区 数据分到几台机器上
//4 创建表
kuduContext.createTable("impala::test_gp.student_info", kuduTableSchema,
kuduTablePrimaryKey, kuduTableOptions)
}
使用StructType
自定义schema
def createTable2(spark: SparkSession, kuduContext: KuduContext) = {
import scala.collection.JavaConverters._
val schema = new StructType()
.add(StructField("name", StringType, false))
.add(StructField("age", IntegerType, true))
val kuduTablePrimaryKey = Seq("name") // 定义主键
val kuduTableOptions = new CreateTableOptions()
kuduTableOptions.
setNumReplicas(1). // 设置副本数量
addHashPartitions(List("name").asJava, 20) //设置分区 数据分到几台机器上
//4 创建表
kuduContext.createTable("impala::test_gp.student_info", schema,
kuduTablePrimaryKey, kuduTableOptions)
}
删除表和判断表是否存在
def deleteTable(kuduContext: KuduContext): Unit = {
val kuduTableName: String = "impala::test_gp.student_info"
// 如果不存在直接删除会报错
if (kuduContext.tableExists(kuduTableName)) {
kuduContext.deleteTable(kuduTableName)
}
}