PySpark NoteBook-6
2018-01-11 本文已影响17人
7125messi
Summary Statistics
df_described = df.describe()
df_described.show()
+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+
|summary| _c0| _c2| _c3| _c4| _c5| _c6|
+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+
| count| 3526154| 382039| 3526154| 1580402| 3526154| 3526154|
| mean|5.503885995001908E11| null| 4.178168090219519|234846.78065481762| 5.134865351881966|354.7084951479714|
| stddev|2.596112361975214...| null|0.34382335723646673|118170.68592261661|3.3833930336063465| 4.01181251079202|
| min| 100002091588| CITIMORTGAGE, INC.| 2.75| 0.85| -1| 292|
| max| 999995696635|WELLS FARGO BANK,...| 6.125| 1193544.39| 34| 480|
+-------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+
from pyspark.sql.functions import skewness, kurtosis
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile
df.select(skewness('_c3')).show()
+------------------+
| skewness(_c3)|
+------------------+
|0.5197993394959904|
+------------------+
from pyspark.sql import Row
columns = df_described.columns
funcs = [skewness, kurtosis]
fnames = ['skew', 'kurtosis']
def new_item(func, column):
return str(df.select(func(column)).collect()[0][0])
new_data = []
for func, fname in zip(funcs, fnames):
row_dict = {'summary':fname}
for column in columns[1:]:
row_dict[column] = new_item(func, column)
new_data.append(Row(**row_dict))
print(new_data)
[Row(_c0='-0.00183847089866', _c2='None', _c3='0.519799339496', _c4='0.758411576756', _c5='0.286480156084', _c6='-2.69765201567', summary='skew'), Row(_c0='-1.19900726351', _c2='None', _c3='0.126057726847', _c4='0.576085602656', _c5='0.195187780089', _c6='24.7237858944', summary='kurtosis')]
df_described.collect()
[Row(summary=u'count', _c0=u'3526154', _c2=u'382039', _c3=u'3526154', _c4=u'1580402', _c5=u'3526154', _c6=u'3526154'),
Row(summary=u'mean', _c0=u'5.503885995001908E11', _c2=None, _c3=u'4.178168090219519', _c4=u'234846.78065481762', _c5=u'5.134865351881966', _c6=u'354.7084951479714'),
Row(summary=u'stddev', _c0=u'2.5961123619752148E11', _c2=None, _c3=u'0.34382335723646673', _c4=u'118170.68592261661', _c5=u'3.3833930336063465', _c6=u'4.01181251079202'),
Row(summary=u'min', _c0=u'100002091588', _c2=u'CITIMORTGAGE, INC.', _c3=u'2.75', _c4=u'0.85', _c5=u'-1', _c6=u'292'),
Row(summary=u'max', _c0=u'999995696635', _c2=u'WELLS FARGO BANK, N.A.', _c3=u'6.125', _c4=u'1193544.39', _c5=u'34', _c6=u'480')]
new_describe = sc.parallelize(new_data).toDF()
new_describe = new_describe.select(df_described.columns)
expanded_describe = df_described.unionAll(new_describe)
expanded_describe.show()
+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+
| summary| _c0| _c2| _c3| _c4| _c5| _c6|
+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+
| count| 3526154| 382039| 3526154| 1580402| 3526154| 3526154|
| mean|5.503885995001908E11| null| 4.178168090219519|234846.78065481762| 5.134865351881966|354.7084951479714|
| stddev|2.596112361975214...| null|0.34382335723646673|118170.68592261661|3.3833930336063465| 4.01181251079202|
| min| 100002091588| CITIMORTGAGE, INC.| 2.75| 0.85| -1| 292|
| max| 999995696635|WELLS FARGO BANK,...| 6.125| 1193544.39| 34| 480|
| skew| -0.00183847089866| None| 0.519799339496| 0.758411576756| 0.286480156084| -2.69765201567|
|kurtosis| -1.19900726351| None| 0.126057726847| 0.576085602656| 0.195187780089| 24.7237858944|
+--------+--------------------+--------------------+-------------------+------------------+------------------+-----------------+