PySpark
2020-01-16 本文已影响0人
奇而思
在PySpark中使用现有列创建新列
假如现在有如下DataFrame:

创建新列,使其变成这样

做法:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
data = [('x1-y1', 3,'z1'),
('x2-y2', 2,'z2'),
('x3-y3', 1,'z3')]
test_df = sqlContext.createDataFrame(data, schema=['_1', '_2', '_3'])
test_df = test_df.withColumn('_4', F.regexp_replace('_1', '-', ''))
test_df = test_df.withColumn('_5', F.concat(F.regexp_replace('_1', '-', '='),F.lit('='),F.col('_3')))
test_df.show()
这里使用了pyspark.sql.functions中的concat, regexp_replace, lit, col等函数
StringIndexer对多列进行使用
目前只能通过pipeline的形式
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['date'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)
df_r.show()