PySpark NoteBook-1
使用推断和指定模式的一个例子可能是一个大的,陌生的数据集,你知道你需要加载和重复使用。
第一次加载它使用inferSchema
,然后记下它指定的dtypes。
使用这些信息构建自定义模式custom_schema,以便将来加载数据时避免推理所需的额外处理时间。
from pyspark.sql.types import DateType, TimestampType, IntegerType, FloatType, LongType, DoubleType
from pyspark.sql.types import StructType, StructField
custom_schema = StructType([StructField('_c0', DateType(), True),
StructField('_c1', StringType(), True),
StructField('_c2', DoubleType(), True),
StructField('_c3', DoubleType(), True),
StructField('_c4', DoubleType(), True),
StructField('_c5', IntegerType(), True),
...
StructField('_c27', StringType(), True)])
df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', \
header=False, \
schema=custom_schema, sep='|')
df.count()
> 3526154
df.dtypes
> [('_c0', 'bigint'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'double'), ('_c4', 'double'), ('_c5', 'int'), ('_c6', 'int'), ('_c7', 'int'), ('_c8', 'string'), ('_c9', 'int'), ('_c10', 'string'), ('_c11', 'string'), ('_c12', 'int'), ('_c13', 'string'), ('_c14', 'string'), ('_c15', 'string'), ('_c16', 'string'), ('_c17', 'string'), ('_c18', 'string'), ('_c19', 'string'), ('_c20', 'string'), ('_c21', 'string'), ('_c22', 'string'), ('_c23', 'string'), ('_c24', 'string'), ('_c25', 'string'), ('_c26', 'int'), ('_c27', 'string')]
对于每个配对(Python中的一个“tuple”对象,由圆括号表示),第一个条目是列名,第二个是dtype。
注意,这个数据没有标题(我们在加载时指定了headers = False),所以Spark使用默认的命名约定_c0,_c1,... _cn。
单独重命名一列名字
df_lim = df.select(['_c0','_c1','_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13'])
df_lim = df_lim.withColumnRenamed('_c0','loan_id').withColumnRenamed('_c1','period')
df_lim
> DataFrame[loan_id: bigint, period: string, _c2: string, _c3: double, _c4: double, _c5: int, _c6: int, _c7: int, _c8: string, _c9: int, _c10: string, _c11: string, _c12: int, _c13: string]
批量重命名多列名字
old_names = ['_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13']
new_names = ['servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', \
'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']
for old, new in zip(old_names, new_names):
df_lim = df_lim.withColumnRenamed(old, new)
df_lim.columns
> ['loan_id', 'period', 'servicer_name', 'new_int_rt', 'act_endg_upb', 'loan_age', 'mths_remng', 'aj_mths_remng', 'dt_matr', 'cd_msa', 'delq_sts', 'flag_mod', 'cd_zero_bal', 'dt_zero_bal']
描述列信息
df_described = df_lim.describe('servicer_name', 'new_int_rt', 'loan_age')
df_described.show()
新增列、groupBy、avg运算
df_lim = df_lim.withColumn('loan_length', df_lim['loan_age'] + df_lim['mths_remng'])
df_grp = df_lim.groupBy('servicer_name')
df_avg = df_grp.avg('loan_age', 'mths_remng', 'loan_length')
在这里,我们执行了一个简单的数学运算(在mnths_remng中增加loan_age),然后对servicer_name中的条目执行groupBy操作(更多的是在groupBy中),同时要求它计算三个数字列的平均值 每个服务人员。然而,如果你真的运行了代码,你可能会注意到代码块几乎立即完成 - 尽管有超过350万行的数据。 这是惰性计算的一个例子 - 这里没有任何实际的计算。 目前,我们只是创建一个指令列表。 所有PySpark确实是确保它们是有效的指示。
df_avg.show()
+--------------------+--------------------+------------------+------------------+
| servicer_name| avg(loan_age)| avg(mths_remng)| avg(loan_length)|
+--------------------+--------------------+------------------+------------------+
| QUICKEN LOANS INC.|-0.08899247348614438| 358.5689787889155|358.47998631542936|
|NATIONSTAR MORTGA...| 0.39047125841532887| 359.5821853961678| 359.9726566545831|
| null| 5.6264681794400015|354.21486809483747| 359.8413362742775|
|WELLS FARGO BANK,...| 0.6704475572258285|359.25937820293814|359.92982576016396|
|FANNIE MAE/SETERU...| 9.333333333333334| 350.6666666666667| 360.0|
|DITECH FINANCIAL LLC| 5.147629653197582| 354.7811008590519|359.92873051224944|
|SENECA MORTGAGE S...| -0.2048814025438295|360.20075627363354| 359.9958748710897|
|SUNTRUST MORTGAGE...| 0.8241234756097561| 359.1453887195122| 359.969512195122|
|ROUNDPOINT MORTGA...| 5.153408024034549| 354.8269387244163|359.98034674845087|
| PENNYMAC CORP.| 0.14966740576496673| 359.8470066518847|359.99667405764967|
|PHH MORTGAGE CORP...| 0.9780420860018298|359.02195791399816| 360.0|
|MATRIX FINANCIAL ...| 6.566794707639778| 353.4229620145113|359.98975672215107|
| OTHER| 0.11480465916297489| 359.8345750772193|359.94937973638224|
| CITIMORTGAGE, INC.| 0.338498789346247|359.41670702179175| 359.755205811138|
|PINGORA LOAN SERV...| 7.573573382530696|352.40886824861633| 359.982441631147|
|JP MORGAN CHASE B...| 1.6553418987669224| 358.3384495990342|359.99379149780117|
| PNC BANK, N.A.| 1.1707779886148009|358.78747628083494| 359.9582542694497|
|FREEDOM MORTGAGE ...| 8.56265812109968|351.29583403609377|359.85849215719344|
+--------------------+--------------------+------------------+------------------+
这需要更长的时间来运行,因为当你执行show时,你要求返回一个数据帧,这意味着 Spark返回并且结束了之前的三个操作你可以完成任意数量的中间 类似于调用show之前的那些步骤,他们都将是几乎立即完成的懒惰的操作,直到show将它们全部运行。
df_sum = df_grp.sum('new_int_rt', 'loan_age', 'mths_remng', 'cd_zero_bal', 'loan_length')
df_sum.show()
+--------------------+--------------------+-------------+---------------+----------------+----------------+
| servicer_name| sum(new_int_rt)|sum(loan_age)|sum(mths_remng)|sum(cd_zero_bal)|sum(loan_length)|
+--------------------+--------------------+-------------+---------------+----------------+----------------+
| QUICKEN LOANS INC.| 101801.76500000055| -2081| 8384777| null| 8382696|
|NATIONSTAR MORTGA...| 40287.497999999934| 3770| 3471766| 2| 3475536|
| null|1.3139130895007337E7| 17690263| 1113692280| 16932| 1131382543|
|WELLS FARGO BANK,...| 187326.36499999996| 29436| 15773283| null| 15802719|
|FANNIE MAE/SETERU...| 26.6| 56| 2104| null| 2160|
|DITECH FINANCIAL LLC| 39531.70999999991| 48537| 3345231| 41| 3393768|
|SENECA MORTGAGE S...| 24093.55999999997| -1192| 2095648| null| 2094456|
|SUNTRUST MORTGAGE...| 21530.767999999884| 4325| 1884795| null| 1889120|
|ROUNDPOINT MORTGA...| 67708.25999999994| 82336| 5669070| 74| 5751406|
| PENNYMAC CORP.| 15209.139999999992| 540| 1298328| null| 1298868|
|PHH MORTGAGE CORP...| 9086.066000000006| 2138| 784822| null| 786960|
|MATRIX FINANCIAL ...| 19212.93299999999| 30772| 1656140| 16| 1686912|
| OTHER| 904855.043999986| 25163| 78868902| 21| 78894065|
| CITIMORTGAGE, INC.| 16939.329999999998| 1398| 1484391| null| 1485789|
|PINGORA LOAN SERV...| 64224.70499999985| 119049| 5539515| 111| 5658564|
|JP MORGAN CHASE B...| 50187.154999999984| 19197| 4155651| null| 4174848|
| PNC BANK, N.A.| 6911.725| 1851| 567243| 1| 569094|
|FREEDOM MORTGAGE ...| 24800.60499999998| 50768| 2082833| 60| 2133601|
+--------------------+--------------------+-------------+---------------+----------------+----------------+
持久化,增加运算速度
df_keep = df_lim.withColumn('loan_length', df_lim['loan_age'] + df_lim['mths_remng'])
df_keep.persist()
df_grp = df_keep.groupBy('servicer_name')
df_avg = df_grp.avg('loan_age', 'mths_remng', 'loan_length')
df_avg.show()
df_sum = df_grp.sum('new_int_rt', 'loan_age', 'mths_remng', 'cd_zero_bal', 'loan_length')
df_sum.show()
这比显示平均值的计算速度快得多。 这是因为Spark将中间结果保存为persist(),从计算平均值开始,因此只需运行之后的代码即可。
如果不存在分支,则不需要坚持。 实际上,正如我们所看到的那样,persist为进程增加了一些开销,如果不打算利用分支点,实际上是一个障碍。 作为一个良好的习惯,为了释放更多的资源,你可以调用一个持久化对象上的.unpersist()来完成这个任务。
df_keep.unpersist()
还要注意`cache()本质上是persist()的一个同义词,除了指定将检查点存储在内存中以便最快的调用,而持久化允许Spark在必要时将一些检查点交换到磁盘。 很显然cache()只有在你强迫它保存的数据帧足够小以至于它可以放入每个节点的内存时才起作用,所以小心使用它。