Spark SQL rdd转换dataframe 执行sql

2020-08-23  本文已影响0人  nil_ddea

Spark SQL中所有功能的入口点是 SparkSession
可以利用创建rdd的SparkContext.SparkConf创建
pyspark中的api是:

from pyspark import SparkSession, SparkConf
SparkSession.getOrCreate(SparkConf)

将rdd转换到dataframe时 需要给rdd中的数据指定schema

from pyspark.sql.types import StructField, StructType, StringType, TimestampType, IntegerType, FloatType
Schema = StructType([
    StructField("hostID", IntegerType(), True),
    StructField("uid", StringType(), True),
    StructField("ts", IntegerType(), True),
    StructField("price", FloatType(), True),
    StructField("nickname", StringType(), True)
])
df = SparkSession.createDataFrame(rdd, schema=douyuGiftSchema)
df.createOrReplaceTempView("table1")

指定schema后将rdd映射在dataframe中,createOrReplaceTempView函数利用dataframe创建一个tmpView,类似关系型数据库的表,只在同一个连接(SparkSession)中可见。

result = SparkSession.sql("SELECT hostID,count(distinct(uid)) AND price >0 FROM table1 GROUP BY hostID")

在这个SparkSession中可以执行sql查询,指定table为已创建的tmpView,返回一个结果集。

上一篇 下一篇

猜你喜欢

热点阅读