10.1 Pyspark(01)

2023-09-29  本文已影响0人  山猪打不过家猪

pyspark视频合集

0.创建dataframe

data=[(1,'jay'),(2,'tom')]
schema=['id', 'name']

df = spark.createDataFrame(data=data,schema=schema)
df.show()

1. 读取/写入csv

df = spark.read.csv(path='dbfs://Filestore/data/employee1.csv',head= True)
df.display()
df.printSchema()
df = spark.read.csv(path=['dbfs://Filestore/data/employee1.csv','dbfs://Filestore/data/employee2.csv'],head= True)
df = spark.read.csv(path='dbfs://Filestore/data/',header = True)
from pyspark.sql.types import *

schema = StructType().add(field = 'id', data_type=IntegerType()).
add(field = 'name', data_type=StringType()).
add(field = 'gender', data_type=StringType()).
add(field = 'salary', data_type=StringType())
df = spark.read.csv(path='dbfs://Filestore/data/',schema = schema,header = True)
df.printSchema()
df.write.csv(path='dbfs://Filestore/data/',header = True,mode='ignore')

2.读取/写入json到dataframe

df =spark.read.json(path='dbfs://Filestore/data/emps.json' multiline=True)

3.读取/写parquet

df= spark.read.parquet(path='dbfs://Filestore/data/demo.parquet')
df= spark.write.parquet(path='dbfs://Filestore/data/demo2.parquet')

4.显示数据

df.show(truncate =False)

5.withcolumn对列进行操作

添加新列

df = df.withcolumn('age_plus_10',df['age']+10)

替换之前的列

df = df.withcolumn('age',df['age']**2)

删除列

df = df.drop('age_plus_10')

类型转换

将整列转换类型

# 将 "count" 列的数据类型从字符串转换为整数
df = df.withColumn("count", df["count"].cast("integer"))

# 将 "amount" 列的数据类型从浮点数转换为整数
df = df.withColumn("amount", df["amount"].cast("integer"))

重命名列名

df.withColumnRenamed("gender","sex")

条件处理

from pyspark.sql.functions import when

df = df.withColumn("age_group", when(df["age"] < 30, "Young").otherwise("Old"))

日期处理

from pyspark.sql.functions import datediff, year, month
df = df.withColumn("days_since_birth", datediff(current_date(), df["birth_date"]))

窗口函数

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# 定义窗口规范
window_spec = Window.partitionBy("department").orderBy("salary")

# 使用窗口函数为每个部门计算排名并创建新列
df = df.withColumn("rank_in_department", row_number().over(window_spec))

使用UDF用户自定义函数

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 定义一个 Python 函数
def double_age(age):
    return age * 2

# 注册 UDF
double_age_udf = udf(double_age, IntegerType())

# 使用 UDF 创建一个新列
df = df.withColumn("double_age", double_age_udf(df["age"]))

6. 定义结构

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = [(1,'Maheer','Shark'),3000),(2,('Wafa','Shaik'),4000]

structName=StructType([StructField(‘firstName',StringType()), StructField('lastName',StringType())])

schema = StructType([StructField(name='id', datatype=IntegerType()),StructField(name='name', dataType=structName,StructField(name='salary',dataType=IntegerType())])

df=spark.createDataframe(data,schema)
display(df)
df.printSchema()
data = [('abc',[1,2]),('mon',[4,5]),('xyz',[6,7])]

schema = StuctType(StructField('id',StringType()),StructField('number',ArrayType(IntegerType()))])

df = spark.createDataFrame(data,schema)
df.show()
df.printSchema()
display(df)
df.withcolumn('firstNumber',df.numbers[0]).show()

7.explode, split,array

from pyspark.sql.functions import explode
# 创建一个包含数组的 DataFrame
data = [(1, [1, 2, 3]), (2, [4, 5]), (3, [6])]
df = spark.createDataFrame(data, ["id", "values"])

# 使用 explode 将数组列 "values" 拆分为多行
exploded_df = df.select("id", explode("values").alias("value"))
exploded_df.show()
from pyspark.sql.functions import split

# 创建一个包含字符串的 DataFrame
data = [(1, "apple,banana,cherry"), (2, "grape,kiwi"), (3, "orange")]
df = spark.createDataFrame(data, ["id", "fruits"])

# 使用 split 将字符串列 "fruits" 拆分为数组
df = df.withColumn("fruit_list", split(df["fruits"], ","))

df.show()

8. 创建定义字典结构的DataFrame

from pyspark.sql.types import StructType, StructField, StringType, MapType

data =[('maher',{'hair':'black','eye':'brown'}),('wafa',{'hair':'black','eye':'blue'})]
schema = StructType([StructField('name':StringType()),StructField('properties',MapType(StringType(),StringType()))])
df = spark.createDataFrame(data,schema)
df.show(truncate = False)
df.printSchema()
display(df)
df1 = df.withColunm('hair',df.properties.getItem('hair'))
df1.show(truncate=False)
df1=df.withColumn('keys',map_keys(df.properties))
df1.show(truncate=False)
image.png
df1=df.withColumn('keys',map_values(df.properties))
df1.show(truncate=False)
image.png

9.Row()来创建DataFrame

from pyspark.sql import Row

row1 = Row(name='maheer', salary= 2000)
row2 = Row(name= 'wafa', salary= 30000)
data = [row1,row2]
df = spark.createDataFrame(data)
df.show()
from pyspark.sql import Row

data = [Row(name='maheer',prop=Row(hair='black',eye='blue')),Row(name='wafa',prop=Row(hair='grey',eye='black'))]

df=spark.createDataFrame(data)
df.printSchema()

10.列操作

from pyspark.sql.functions import col
#way1:
df.select(df.gender).show()
#way2:
df.select(df['gender'].show()
#way3:
df.select(col('gender')).show()
from pyspark.sql.functions import col

df.select(df.props.harir).show()
df.select(df['props.hair']).show()
df.select(col('props.hair')).show()

11.when()&otherwise()

df1 = df.select(df.id,df.name,when(df.gender='M','male').when(df.gender=='F','female').otherwise('unknown').alias(''gender'))
df1.show()
image.png

12.alias,cast

df.select(df.id.alias('emp_id'),df.name.alias('emp_name').show()
df.withcolumn('id',df.id.cast('int'))

13.filter,where

df.filter(df.age>3).show()
df.filter(df.name.like('ga%')).show()
df.where(df.age>3).show()

14.去重

distinct_df = df.distinct(["first_name", "last_name", "age"])
df.dropDuplicates(["first_name", "last_name", "age"])

15. 排序

df.orderby(df.age.desc(),df.salary.asc()).show()

16. 分组groupBy

df1= df.groupBy('dep','gender').count()
df2= df.groupBy('dep').min('salary')
image.png
from pyspark.sql.funtions import count,min,max

df.groupBy('dep').agg(count('*).alias('countOfEmps'),min('salary').alias('minSal'),max('salary').alias('maxSal')).show()
image.png

17.合并相同格式DataFrame,union&unionAll

newDf = df1.union(df2)
image.png
newDf = df1.unionAll(df2)
newDf.distinct().show()

18.合并不同格式的DataFrame,unionByName()

newDf = df1.unionByName(df2,allowMissingColumns=True).show()
image.png

19.Select()选择列

df.select('id','name').show()
df.select(df.id,df.name).show()
df.select(df['id'],df['name']).show()

20. 内连接,左连接

empDf.join(depDf,empDf.dep==depDf.id,'inner').show()
image.png
empDf.join(depDf,empDf.dep==depDf.id,'left').show()
image.png

21.leftsemi,leftanti

empDf.join(depDf,empDf.dep==depDf.id,'leftsemi').show()
image.png
empDf.join(depDf,empDf.dep==depDf.id,'leftanti').show()
image.png

22. 行转列pivot

df.groupBy('dep').pivot('gender').count().show()
image.png
df.groupBy('dep').pivot('gender',['male']).count().show()
from pyspark.sql.functions import expr
unpivotDf = df.select('dep',expr("stack(2,'M',male,'F','female') as (gender,count)")).show()

23. 填充空值

df.fillna('unkown').show()
df.na.fill('unknown',['dep','gender']).show()

24. 创建临时表

df.createOrReplaceTempView('employees')
%sql
select id, name from employees

25. UDF自定义

@udf(returnType=IntegerTyep())
def totalPayment(a,b):
    return a+b

df.withColumn('totalPay',TotalPayment(df.salary,df.bonus).show()

26.使用partitionBy分区存储parquet

df.write.parqeut('/FileStore/employees',mode ='overwrite', partitionBy = 'dep')
image.png

27.使用MapType/StructType解析Json

data = [('maheer','{"hair": "black", "eye": "brown"}')]
schema = ['id', 'props']
df = spark.createDataFrame(data, shema)
df.show(truncate=False)
image.png
from pyspark.sql.funtions import from_json
from pyspark.sql.types import MapType,StringType

MapTypeSchema = MapType(StringType(), StringType())
df1 = df.withColumn('propMap', from_json(df.props, MapTypeSchema))
df1.show()
df1.printSchema()
image.png
df2 = df1.withColumn('hair', df1.propsMap.hair).withColumn('eye', df1.propsMap.eye)
df2.show()
image.png
StructTypeSchema = StructType([StructField('hair', StringType()), StructField('eye', StringType())])
df1 = df.withColumn('propsStruct', from_json(df.props, StructTypeSchema)
df1.show(truncate = False)
df1.printSchema()

df2 = df1.withColumn('hair', df1.propsStruct.hair).withColumn('eye', df1.propsStruct.eye)
df2.show()
df1 = df.withColumn('propJsonString', to_json(df.properties))
image.png
from pyspark.sql.funtions import jons_tuple

df.select('name', json_tuple(df.props, 'eye', 'skin').alias('eye', 'skin')).show()

image.png
df1 = df.select('name', get_json_object('props', '$.gender').alias('gender'))
df1.show()

28.时间转换

df1  = df.withColumn('currentDate', current_data())
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))
df2.show()
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))

29. 时间加减

from pyspark.sql.functions import datediff

spark.createDataFrame(['2023-01-13','2023-03-12')], ['d1', 'd2'])

#时间相减
df.withColumn('datediff', datediff(df.d2, df.d1)).show()
#月份相减
df.withColumn('monthsBetween', monts_between(df.d2, df.d1)).show()
#增加月份
df.withColumn('addmonths', add_months(df.d2, 3)).show()
#减少月份
df.withColumn('submonths', add_months(df.d2, -3)).show()
上一篇 下一篇

猜你喜欢

热点阅读