PySpark

2020-01-16  本文已影响0人  奇而思

在PySpark中使用现有列创建新列

假如现在有如下DataFrame:


image.png

创建新列,使其变成这样


image.png
做法:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
data = [('x1-y1', 3,'z1'),
        ('x2-y2', 2,'z2'),
        ('x3-y3', 1,'z3')]
test_df = sqlContext.createDataFrame(data, schema=['_1', '_2', '_3'])

test_df = test_df.withColumn('_4', F.regexp_replace('_1', '-', ''))
test_df = test_df.withColumn('_5', F.concat(F.regexp_replace('_1', '-', '='),F.lit('='),F.col('_3')))
test_df.show()

这里使用了pyspark.sql.functions中的concat, regexp_replace, lit, col等函数

StringIndexer对多列进行使用

目前只能通过pipeline的形式

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['date'])) ]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()
上一篇下一篇

猜你喜欢

热点阅读