P2 pyspark项目(处理产品信息)

2023-12-29  本文已影响0人  山猪打不过家猪

0.重点

  1. 上传本地文件到dfs
  2. 创建schema 读取csv文件
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('product_name',StringType(),True),
            StructField('price',FloatType(),True),
])
  1. pyspark的连表和分组,表1.join(表2,字段,方式)
  2. databricks的数据显示

1. 项目需求

image.png
image.png

2. 项目前准备

上传2个table到databricks 文件系统


image.png

3. Pyspark

3.1 创建dataframe

from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('customer_id',StringType(),True),
            StructField('order_data',DateType(),True),
            StructField('location',StringType(),True),
            StructField('source_order',StringType(),True)
])

df = spark.read.csv('/FileStore/tables/sales_csv.txt',header=True,schema = schema)
display(df)

3.2 创建年月季的列

from pyspark.sql.functions import year,month,quarter

df = df.withColumn('order_year',year(df.order_date))
df = df.withColumn('order_month',month(df.order_date))
df = df.withColumn('order_quarter',quarter(df.order_date))
display(df)

3.3 创建另一个menu表

from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('product_name',StringType(),True),
            StructField('price',FloatType(),True),
])

df_menu = spark.read.csv('/FileStore/tables/menu_csv.txt',header=True,schema = schema)
display(df_menu)

3.4 根据KPI的需求,处理数据

  1. Total Amount spent by each custormer
from pyspark.sql.functions import col, sum

total_amount_spent = (df.join(df_menu,'product_id').groupBy('customer_id').sum('price').orderBy('customer_id'))
display(total_amount_spent)
  1. Total amount spent by each food category
total_amount_food_category = (df.join(df_menu,'product_id').groupBy('product_name').sum('price'))
display(total_amount_food_category)
  1. Year Sales
year_sales = df.join(df_menu,'product_id').groupBy('order_year').sum('price')
display(year_sales)
  1. Quarterly Sales
quarter_sales = df.join(df_menu,'product_id').groupBy('order_quarter').sum('price').orderBy('order_quarter')
display(quarter_sales)
  1. Total number of order by each category
from pyspark.sql.functions import count,desc

times_purchased = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count'))
display(times_purchased)
image.png
  1. Top 5 ordered items
Top_5 = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count')).limit(5).drop('product_count')
display(Top_5)
image.png
  1. Frequency of customer visited

  2. Total sales by each country

上一篇 下一篇

猜你喜欢

热点阅读