pyspark学习

PySpark NoteBook-5

2018-01-11  本文已影响19人  7125messi

dtypes, udf, drop, groupBy, agg, withColumn, dateFormat, select

import datetime
from pyspark.sql import Row
from pyspark.sql.functions import col

row = Row("date", "name", "production")

df = sc.parallelize([
    row("08/01/2014", "Kim", 5),
    row("08/02/2014", "Kim", 14),
    row("08/01/2014", "Bob", 6),
    row("08/02/2014", "Bob", 3),
    row("08/01/2014", "Sue", 0),
    row("08/02/2014", "Sue", 22),
    row("08/01/2014", "Dan", 4),
    row("08/02/2014", "Dan", 4),
    row("08/01/2014", "Joe", 37),
    row("09/01/2014", "Kim", 6),
    row("09/02/2014", "Kim", 6),
    row("09/01/2014", "Bob", 4),
    row("09/02/2014", "Bob", 20),
    row("09/01/2014", "Sue", 11),
    row("09/02/2014", "Sue", 2),
    row("09/01/2014", "Dan", 1),
    row("09/02/2014", "Dan", 3),
    row("09/02/2014", "Joe", 29)
    ]).toDF()
df.show()
+----------+----+----------+
|      date|name|production|
+----------+----+----------+
|08/01/2014| Kim|         5|
|08/02/2014| Kim|        14|
|08/01/2014| Bob|         6|
|08/02/2014| Bob|         3|
|08/01/2014| Sue|         0|
|08/02/2014| Sue|        22|
|08/01/2014| Dan|         4|
|08/02/2014| Dan|         4|
|08/01/2014| Joe|        37|
|09/01/2014| Kim|         6|
|09/02/2014| Kim|         6|
|09/01/2014| Bob|         4|
|09/02/2014| Bob|        20|
|09/01/2014| Sue|        11|
|09/02/2014| Sue|         2|
|09/01/2014| Dan|         1|
|09/02/2014| Dan|         3|
|09/02/2014| Joe|        29|
+----------+----+----------+
df.dtypes
[('date', 'string'), ('name', 'string'), ('production', 'bigint')]
from pyspark.sql.functions import udf
def split_date(whole_date):
    try:
        mo, day, yr = whole_date.split('/')
    except ValueError:
        return 'error'
    return mo + '/' + yr

udf_split_date = udf(split_date)

df_new = df.withColumn('month_year', udf_split_date('date'))

df_new.show()

+----------+----+----------+----------+
|      date|name|production|month_year|
+----------+----+----------+----------+
|08/01/2014| Kim|         5|   08/2014|
|08/02/2014| Kim|        14|   08/2014|
|08/01/2014| Bob|         6|   08/2014|
|08/02/2014| Bob|         3|   08/2014|
|08/01/2014| Sue|         0|   08/2014|
|08/02/2014| Sue|        22|   08/2014|
|08/01/2014| Dan|         4|   08/2014|
|08/02/2014| Dan|         4|   08/2014|
|08/01/2014| Joe|        37|   08/2014|
|09/01/2014| Kim|         6|   09/2014|
|09/02/2014| Kim|         6|   09/2014|
|09/01/2014| Bob|         4|   09/2014|
|09/02/2014| Bob|        20|   09/2014|
|09/01/2014| Sue|        11|   09/2014|
|09/02/2014| Sue|         2|   09/2014|
|09/01/2014| Dan|         1|   09/2014|
|09/02/2014| Dan|         3|   09/2014|
|09/02/2014| Joe|        29|   09/2014|
+----------+----+----------+----------+
df_new = df_new.drop('date')

df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'})

df_agg.show()
+----------+----+---------------+
|month_year|name|sum(production)|
+----------+----+---------------+
|   09/2014| Sue|             13|
|   09/2014| Kim|             12|
|   09/2014| Bob|             24|
|   09/2014| Joe|             29|
|   09/2014| Dan|              4|
|   08/2014| Kim|             19|
|   08/2014| Joe|             37|
|   08/2014| Dan|              8|
|   08/2014| Sue|             22|
|   08/2014| Bob|              9|
+----------+----+---------------+
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime

dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
    
df_d = df.withColumn('new_date', dateFormat(col('date')))
df_d.dtypes
[('date', 'string'),
 ('name', 'string'),
 ('production', 'bigint'),
 ('new_date', 'date')]

df_d.select('new_date').take(1)
[Row(new_date=datetime.date(2014, 1, 1))]
上一篇下一篇

猜你喜欢

热点阅读