大数据,机器学习,人工智能玩转大数据大数据

Kudu与Spark 生产最佳实践

2019-05-16  本文已影响8人  bd8941f5f5cc

一.环境

1

22.11.8

32.2.0

41.5.0

5

二.测试代码

1importorg.apache.spark.sql.SparkSession

2importorg.apache.spark.sql.types.{StringType, StructField, StructType}

3importorg.apache.kudu.client._

4importcollection.JavaConverters._

5objectKuduApp {

6def main(args: Array[String]):Unit= {

7valspark = SparkSession.builder().appName("KuduApp").master("local[2]").getOrCreate()

8//Read a table from Kudu

9valdf = spark.read

10.options(Map("kudu.master"->"10.19.120.70:7051","kudu.table"->"test_table"))

11.format("kudu").load

12df.schema.printTreeString()

13//    // Use KuduContext to create, delete, or write to Kudu tables

14//    val kuduContext = new KuduContext("10.19.120.70:7051", spark.sparkContext)

对大数据以及人工智能概念都是模糊不清的,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习qq群:458345782,有大量干货(零基础以及进阶的经典实战)分享给大家,让大家了解到目前国内最完整的大数据高端实战实用学习流程体系 。从java和linux入手,其后逐步的深入到HADOOP-hive-oozie-web-flume-python-hbase-kafka-scala-SPARK等相关知识一一分享!

15//

16//

17//    // The schema is encoded in a string

18//    val schemalString="id,age,name"

19//

20//    // Generate the schema based on the string of schema

21//    val fields=schemalString.split(",").map(filedName=>StructField(filedName,StringType,nullable =true ))

22//    val schema=StructType(fields)

23//

24//

25//    val KuduTable = kuduContext.createTable(

26//    "test_table", schema, Seq("id"),

27//    new CreateTableOptions()

28//      .setNumReplicas(1)

29//      .addHashPartitions(List("id").asJava, 3)).getSchema

30//

31//    val  id  = KuduTable.getColumn("id")

32//    print(id)

33//

34//    kuduContext.tableExists("test_table")

35}

36}

现象:通过spark sql 操作报如下错误:

1Exceptioninthread"main"java.lang.ClassNotFoundException:Failed to find datasource:kudu. Please find packages athttp://spark.apache.org/third-party-projects.html

2at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)

3at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)

4at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)

5at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:301)

6at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

7at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)

8at cn.zhangyu.KuduApp$.main(KuduApp.scala:18)

9at cn.zhangyu.KuduApp.main(KuduApp.scala)

10Causedby:java.lang.ClassNotFoundException:kudu.DefaultSource

11at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

12at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

13at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

14at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

15at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)

16at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)

17at scala.util.Try$.apply(Try.scala:192)

18at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)

19at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)

20at scala.util.Try.orElse(Try.scala:84)

21at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)

22...7more

而通过KuduContext是可以操作的没有报错,代码为上面注解部分

三.解决思路

查询kudu官网:https://kudu.apache.org/docs/developing.html

官网中说出了版本的问题:

如果将Spark 2与Scala 2.11一起使用,请使用kudu-spark2_2.11工件。

kudu-spark版本1.8.0及更低版本的语法略有不同。

有关有效示例,请参阅您的版本的文档。可以在发布页面上找到版本化文档。

spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0

看到了 官网使用的是1.9.0的版本.

但是但是但是:

官网下面说到了下面几个集成问题:

Spark 2.2+在运行时需要Java 8,即使Kudu Spark 2.x集成与Java 7兼容。Spark 2.2是Kudu 1.5.0的默认依赖版本。

当注册为临时表时,必须为名称包含大写或非ascii字符的Kudu表分配备用名称。

包含大写或非ascii字符的列名的Kudu表不能与SparkSQL一起使用。可以在Kudu中重命名列以解决此问题。

<>并且OR谓词不会被推送到Kudu,而是由Spark任务进行评估。只有LIKE带有后缀通配符的谓词才会被推送到Kudu,这意味着它LIKE "FOO%"被推下但LIKE "FOO%BAR"不是。

Kudu不支持Spark SQL支持的每种类型。例如, Date不支持复杂类型。

Kudu表只能在SparkSQL中注册为临时表。使用HiveContext可能无法查询Kudu表。

那就很奇怪了我用的1.5.0版本报错为:找不到类,数据源有问题

但是把kudu改成1.9.0 问题解决

运行结果:

1root

2|--id: string (nullable=false)

3|-- age: string (nullable=true)

4|-- name: string (nullable=true)

四.Spark集成最佳实践

每个群集避免多个Kudu客户端。

一个常见的Kudu-Spark编码错误是实例化额外的KuduClient对象。在kudu-spark中,a KuduClient属于KuduContext。Spark应用程序代码不应创建另一个KuduClient连接到同一群集。相反,应用程序代码应使用KuduContext访问KuduClient使用

1KuduContext#syncClient。

2//UseKuduContexttocreate,delete,orwritetoKudutables

3val kuduContext =newKuduContext("10.19.120.70:7051", spark.sparkContext)

4vallist= kuduContext.syncClient.getTablesList.getTablesList

5if(list.iterator().hasNext){

6print(list.iterator().next())

7}

要诊断KuduClientSpark作业中的多个实例,请查看主服务器的日志中的符号,这些符号会被来自不同客户端的许多GetTableLocations或 GetTabletLocations请求过载,通常大约在同一时间。这种症状特别适用于Spark Streaming代码,其中创建KuduClient每个任务将导致来自新客户端的主请求的周期性波。

五.Spark操作kudu(Scala demo)

1packagecn.zhangyu

2importorg.apache.kudu.spark.kudu._

3importorg.apache.spark.sql.{Row, SparkSession}

4importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

5importorg.slf4j.LoggerFactory

6importorg.apache.kudu.client._

7importcollection.JavaConverters._

8objectSparkTest {

9//kuduMasters and tableName

10valkuduMasters ="192.168.13.130:7051"

11valtableName ="kudu_spark_table"

12//table column

13validCol ="id"

14valageCol ="age"

15valnameCol ="name"

16//replication

17valtableNumReplicas = Integer.getInteger("tableNumReplicas",1)

18vallogger = LoggerFactory.getLogger(SparkTest.getClass)

19def main(args: Array[String]):Unit= {

20//create SparkSession

21valspark = SparkSession.builder().appName("KuduApp").master("local[2]").getOrCreate()

22//create kuduContext

23valkuduContext = new KuduContext(kuduMasters,spark.sparkContext)

24//schema

25valschema = StructType(

26List(

27StructField(idCol, IntegerType,false),

28StructField(nameCol, StringType,false),

29StructField(ageCol,StringType,false)

30)

31)

32vartableIsCreated =false

33try{

34// Make sure the table does not exist

35if(kuduContext.tableExists(tableName)) {

36thrownew RuntimeException(tableName +": table already exists")

37}

38//create

39kuduContext.createTable(tableName, schema, Seq(idCol),

40new CreateTableOptions()

41.addHashPartitions(List(idCol).asJava,3)

42.setNumReplicas(tableNumReplicas))

43tableIsCreated =true

44importspark.implicits._

45//write

46logger.info(s"writing to table '$tableName'")

47valdata= Array(Person(1,"12","zhangsan"),Person(2,"20","lisi"),Person(3,"30","wangwu"))

48valpersonRDD = spark.sparkContext.parallelize(data)

49valpersonDF = personRDD.toDF()

50kuduContext.insertRows(personDF,tableName)

51//useing SparkSQL read table

52valsqlDF = spark.sqlContext.read

53.options(Map("kudu.master"-> kuduMasters,"kudu.table"-> tableName))

54.format("kudu").kudu

55sqlDF.createOrReplaceTempView(tableName)

56spark.sqlContext.sql(s"SELECT * FROM$tableName").show

57//upsert some rows

58valupsertPerson = Array(Person(1,"10","jack"))

59valupsertPersonRDD = spark.sparkContext.parallelize(upsertPerson)

60valupsertPersonDF = upsertPersonRDD.toDF()

61kuduContext.updateRows(upsertPersonDF,tableName)

62//useing RDD read table

63valreadCols = Seq(idCol,ageCol,nameCol)

64valreadRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, readCols)

65valuserTuple = readRDD.map { case Row( id:Int,age: String,name: String) => (id,age,name) }

66println("count:"+userTuple.count())

67userTuple.collect().foreach(println(_))

68//delete table

69kuduContext.deleteTable(tableName)

70}catch{

71// Catch, log and re-throw. Not the best practice, but this is a very

72// simplistic example.

73case unknown : Throwable => logger.error(s"got an exception: "+ unknown)

74throwunknown

75}finally{

76// Clean up.

77if(tableIsCreated) {

78logger.info(s"deleting table '$tableName'")

79kuduContext.deleteTable(tableName)

80}

81logger.info(s"closing down the session")

82spark.close()

83}

84}

85}

86caseclassPerson(id:Int,age: String,name: String)

对大数据以及人工智能概念都是模糊不清的,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习qq群:458345782,有大量干货(零基础以及进阶的经典实战)分享给大家,让大家了解到目前国内最完整的大数据高端实战实用学习流程体系 。从java和linux入手,其后逐步的深入到HADOOP-hive-oozie-web-flume-python-hbase-kafka-scala-SPARK等相关知识一一分享!

上一篇 下一篇

猜你喜欢

热点阅读