大数据开发

大数据开发:Spark核心数据抽象之DataFrame

2021-05-20  本文已影响0人  成都加米谷大数据

之前对于Spark的核心数据模型,我们早就讲过了RDD,事实上,除了RDD之外,DataFrame、DataSet也是重要的数据抽象概念。今天的大数据开发学习分享,我们就主要来讲讲Spark核心数据抽象之DataFrame。

Spark 的DataFrame,其实依然与RDD有着紧密的联系。相比RDD,DataFrame增加了scheme概念,从这个角度看,DataFrame有点类似于关系型数据库中表的概念。

一、DataFrame特点

DataFrames和Datasets是spark中结构化数据的主要抽象形式。

DataFrames将结构化数据以表格形式表示;

DataFrames类似于RDBMS中的表格;

DataFrames由一组松散类型的行对象组成,行包含一组有序的值,他们通过schema被描述为列,每个DataFrame都有一个对应的schema, schmea定义了列的名称和类型;

列的类型可以是基础类型(integers,strings,floats)或者组合类型(Arrays, lists)。

Datasets将数据表示为指定类型的对象的集合;

Datasets是强类型的,在编译时就将强制进行类型检查,而不是等到运行时间;

schema将对象属性映射为表的行和列;

数据集仅在Scala和Java中定义;

DataFrame是Dataset [Row]的别名,他是包含Row的数据集。

二、DataFrame创建

Spark内部运行当中,从RDD里可以生成DataFrame,同时可以方便地在上面完成各种操作。

1、构建SparkSession

Spark SQL中所有功能的入口点是 SparkSession 类. 要创建一个 SparkSession, 仅使用 SparkSession.builder()就可以了:

from pyspark.sql import SparkSession

spark = SparkSession \

    .builder \

    .appName("Python Spark SQL") \

    .config("spark.some.config.option", "some-value") \

    .getOrCreate()

2、创建Dataframes

在一个 SparkSession中, 应用程序可以从一个已经存在的 RDD 或者 hive表, ,从Spark数据源中创建一个DataFrames。

举个例子,下面就是基于一个JSON文件创建一个DataFrame:

!cat data/people.json

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

df = spark.read.json("data/people.json")

df

df.show()

3、Dataframe 操作

DataFrames 提供了一个特定的语法用在 Scala, Java, Python and R中机构化数据的操作。

在Python中,可以通过(df.age) 或者(df['age'])来获取DataFrame的列。虽然前者便于交互式操作, 但是还是建议用户使用后者, 这样不会破坏列名,也能引用DataFrame的类。

4、注意以下的Select操作

df.printSchema()

df.select("name").show()

df.select(["name",'age']).show()

df.select(df['name'], df['age'] + 1).show()

5、以下操作的filter做数据选择

df.filter(df['age'] > 21).show()df.groupBy("age").count().show()

三、DataFrame操作

(1)展示DataFrame

spark_df.show()

打印DataFrame的Schema信息

spark_df.printSchema()

显示前n行

spark_df.head(5)

显示数据长度与列名

df.count()

df.columns

(2)操作DataFrame列

选择列

ml_dataset=spark_df.select("features", "label")

增加/产生新的一列

from pyspark.sql.functions import *

#注意这个*号,这里是导入了sql.functions中所有的函数,所以下文的abs就是由此而来

df2 = spark_df.withColumn("abs_age", abs(df2.age))

删除列

df3= spark_df.drop("age")

筛选

df4= spark_df.where(spark_df["age"]>20)

关于大数据开发学习,Spark核心数据抽象之DataFrame,以上就为大家做了基本的介绍了。Spark DataFrame作为核心数据抽象,与RDD关联紧密,可以结合到前面讲过的RDD去理解和掌握。

上一篇下一篇

猜你喜欢

热点阅读