spark_SQL 学习

2019-11-13  本文已影响0人  BitGuo

历经版本迭代更新,spark sql中原本带有模式信息的RDD即SchemaRDD,在spark1.3之后变成了新的数据结构 DataFrame
RDD是风不是的java对象的集合,RDD无法知道RDD内部存储的数据结构的详细模式信息。
DataFrame是以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供详细的结构信息,也就是我们常说的数据库模式信息。Spark SQL可以清除的知道该数据集中包含哪些列,每列的名称和类型。

从RDD转换得到DataFrame

1.利用反射机制,即toDF()方法

正常的创建了一个RDD对象后,对其调用toDF()方法

peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
#接着必须将其注册为临时表才可以被查询
peopleDF.createOrReplaceTempView("people") 
personsDF = spark.sql("select * from people")
personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
2.利用编程方式
>>>  from pyspark.sql.types import Row
>>>  from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
 
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
 
//定义一个模式字符串
>>> schemaString = "name age"
 
//根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
 
 
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
 
>>> peopleDF = spark.createDataFrame(rowRDD, schema)
 
//必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
 
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
 
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19
上一篇下一篇

猜你喜欢

热点阅读