PySpark研究机器学习与数据挖掘程序猿日记

利用PySpark 数据预处理(特征化)实战

2017-10-29  本文已影响766人  祝威廉

前言

之前说要自己维护一个spark deep learning的分支,加快SDL的进度,这次终于提供了一些组件和实践,可以很大简化数据的预处理。

模型

这次实际情况是,我手头已经有个现成的模型,基于TF开发,并且算法工程师也提供了一些表给我,有用户信息表,用户行为表。行为表已经关联了内容的文本。现在我需要通过SDL来完成两个工作:

  1. 根据已有的表获取数据,处理成四个向量。
  2. 把数据喂给模型,进行训练

思路整理

四个向量又分成两个部分:

  1. 用户向量部分
  2. 内容向量部分

用户向量部分由2部分组成:

  1. 根据几个用户的基础属性,他们有数值也有字符串,我们需要将他们分别表示成二进制后拼接成一个数组。
  2. 根据用户访问的内容,通过词向量把每篇内容转化为一个向量,再把某个用户看过的所有内容转化为一个向量(都是简单采用加权平均)

内容向量部分组成:

对于文章,我们需要把他表示为一个数字序列(每个词汇由一个数字表示),同时需要放回词向量表,给RNN/CNN使用。

所以处理流程也是比较直观的:

  1. 通过用户信息表,可以得到用户基础属性向量
  2. 通过行为表,可以得到每篇涉及到的内容的数字序列表表示,同时也可以为每个用户算出行为向量。

最后的算法的输入其实是行为表,但是这个时候的行为表已经包含基础信息,内容序列,以及用户的内容行为向量。

实现

现在我们看看利用SDL里提供的组件,如何完成这些数据处理的工作以及衔接模型。

第一个是pyspark的套路,import SDL的一些组件,构建一个spark session:

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, ArrayType, StringType, FloatType
from pyspark.sql.functions import *
import numpy as np

from sparkdl.transformers.tf_text import CategoricalBinaryTransformer, CombineBinaryColumnTransformer, \
    TextAnalysisTransformer, TextEmbeddingSequenceTransformer
from sparkdl.estimators.text_estimator import TextEstimator

session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

读取用户基础信息表,这里我是直接读了一个CSV文件,现实中应该是Hive表。同时罗列有哪些字段是这次要用的,罗列一下:

person_basic_info_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72329.csv", encoding="utf-8",
                                        header=True)
person_basic_info_df.registerTempTable("person_basic_info_df")

# 把所有基础属性罗列出来
person_basic_properties_str = "education,jobtitle..."

person_basic_properties_group = [item for item in
                                 person_basic_properties_str.split(",")]
# 每个属性我们会表示为一个12位的二进制字符串。
person_basic_info_vector_size = len(person_basic_properties_group) * 12

接着我们就可以利用SDL提供的CategoricalBinaryTransformer把这些字段批量转化为二进制。

# 基础信息中字符串字段需要转化为数字
binary_columns = [item + "_binary" for item in person_basic_properties_group]

binary_trans = CategoricalBinaryTransformer(inputCols=person_basic_properties_group,
                                            outputCols=binary_columns,
                                            embeddingSize=12)
combin_trans = CombineBinaryColumnTransformer(inputCols=binary_columns, outputCol="person_info_vector")

person_basic_info_with_all_binary_df = combin_trans.transform(binary_trans.transform(person_basic_info_df)). \
    groupBy("id").agg(first("person_info_vector").alias("person_info_vector"))

CategoricalBinaryTransformer接受inputCols参数, 传递一个数组字段,告诉他哪些字段是需要转化为二进制数值表示的。outputCols指定输出的名字,embeddingSize指定用多少个二进制数字。 所有的CategoricalBinaryTransformer会添加outputCols指定的字段。

因为我们需要把这些字段都拼接成一个字段,这个时候可以利用CombineBinaryColumnTransformer 。方式和CategoricalBinaryTransformer一样,但是输出只有一个字段。这样我们就得到了一个长度为person_basic_info_vector_size 的字段,格式大致这个样子:

[1,0,1,0,0,....]

CategoricalBinaryTransformer 内部的机制是,会将字段所有的值枚举出来,并且给每一个值递增的编号,然后给这个编号设置一个二进制字符串。

现在第一个特征就构造好了。接着,有一些NLP特有的操作了,我们需要对某些内容进行分词
,同时将他们转化为数字序列(比如RNN就需要这种),并且把数字和词还有向量的对应关系给出。分词现在默认采用的是jieba。

person_behavior_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72321.csv", encoding="utf-8",
                                      header=True).sample(True, 0.01).where(col("title").isNotNull()).where(
    col("text_body").isNotNull())

# 通过TextAnalysisTransformer我们对所有需要分词/抽词的字段进行分词
text_columns = ["title", "text_body"]
text_cut_columns = [item + "_cut" for item in text_columns]
tat_trans = TextAnalysisTransformer(inputCols=text_columns, outputCols=text_cut_columns)
tat_df = tat_trans.transform(person_behavior_df)
tat_df.show()

# 通过TextEmbeddingSequenceTransformer把分完词的字段里面的词汇全部替换成数字,这一步分会作为文章的输出
text_sequence_columns = [item + "_seq" for item in text_columns]
test_trans = TextEmbeddingSequenceTransformer(inputCols=text_cut_columns, outputCols=text_sequence_columns)
test_df = test_trans.transform(tat_df)
test_df.show()

# TextEmbeddingSequenceTransformer 有几个属性可以获取词向量相关信息
word_embedding = test_trans.getWordEmbedding()
word2vec_model = test_trans.getW2vModel()
embedding_size = test_trans.getEmbeddingSize()

# 广播出去,方便在自定义函数里使用
word_index2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word_index"], item["vector"]) for item in word_embedding]))

# 把标题和正文拼接
person_behavior_vector_seq_cctf = CombineBinaryColumnTransformer(inputCols=text_sequence_columns,
                                                                 outputCol="person_behavior_vector_seq")
person_behavior_vector_seq_df = person_behavior_vector_seq_cctf.transform(test_df)

这样就完成了文本到数字序列的转化了,并且通过TextEmbeddingSequenceTransformer获取词向量表数据。接下来,我们看看如何做一个复杂的自定义操作,这个操作主要是在行为表,把数字序列转化词向量,然后做加权平均。这个时候,每篇文章已经可以用一个向量表示了。

# 定义一个函数,接受的是一个数字序列,然后把数字转化为vector,然后做
# 加权平均
def avg_word_embbeding(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        if item in word_index2v_mapping_br.value:
            result = result + np.array(word_index2v_mapping_br.value[item])
    return (result / len(word_seq)).tolist()

# 注册成udf函数
avg_word_embbeding_udf = udf(avg_word_embbeding, ArrayType(FloatType()))
# 添加一个person_behavior_article_vector新列
person_behavior_vector_df = person_behavior_vector_seq_df.withColumn(
    "person_behavior_article_vector",
    avg_word_embbeding_udf(
        "person_behavior_vector_seq"))

现在根据用户id做groupby 然后把多篇文章的文章向量合并成一个,然后把数字转换为向量,做加权平均。这个时候,每个用户终于有一个行为向量了。

# 我们根据用户名groupby ,把用户看过的所有文章聚合然后计算一个向量

def avg_word_embbeding_2(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        result = result + np.array(item)
    return (result / len(word_seq)).tolist()


avg_word_embbeding_2_udf = udf(avg_word_embbeding_2, ArrayType(FloatType()))

person_behavior_vector_all_df = person_behavior_vector_df.groupBy("id").agg(
    avg_word_embbeding_2_udf(collect_list("person_behavior_article_vector")).alias("person_behavior_vector"))

现在,我们拿到了用户基础信息向量,访问内容向量。 当然还有之前计算出来的访问内容的数字序列,但是分在不同的表里(dataframe),我们把他们拼接成一个:

pv_df = person_basic_info_with_all_binary_df.select("id", "person_info_vector").alias("pv")
cv_df = person_behavior_vector_all_df.select("id", "person_behavior_vector").alias(
    "cv")
person_vector_df = cv_df.join(
    pv_df,
    col("pv.id") == col("cv.id"), "left"
)

person_df = person_vector_df.select("pv.id", "pv.person_info_vector", "cv.person_behavior_vector").where(
    col("id").isNotNull())

这里是标准的spark dataframe的join操作。

我们假设做的是一个二分类问题,到目前为止,我们还没有分类字段,为了简单起见我随机填充了分类,利用前面的办法,自定义一个UDF函数,添加了一个like_or_not_like 列。最后返回df的时候,过滤掉去胳膊少腿的行。

def like_or_not_like():
    return [0, 1] if np.random.uniform() < 0.5 else [1, 0]

like_or_not_like_udf = udf(like_or_not_like, ArrayType(IntegerType()))
result_df = person_behavior_vector_df.join(person_df, person_behavior_vector_df["id"] == person_df["id"],
                                           "left").withColumn("like_or_not_like", like_or_not_like_udf()).drop(
    person_df["id"]).where(
    col("person_info_vector").isNotNull()).where(
    col("person_behavior_vector").isNotNull()).where(
    col("person_behavior_vector_seq").isNotNull())

word2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word"], item["vector"]) for item in word_embedding]))

现在我们获得了所有的向量,我们可以把数据喂给算法了,这个主要通过TextEstimator来完成。

estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64, "word_embedding_bs": word2v_mapping_br.value}],
                          mapFnParam=map_fun)
estimator.fit(result_df).collect()

word embbeding表,我们通过fitParam参数传递给tf程序,然后tf所有的代码都在map_fun里,我们简单看看tf怎么拿到数据:

def map_fun(args={}, ctx=None, _read_data=None):
    import tensorflow as tf
    import numpy as np
    import datetime
    import os
    import time
    from sklearn.utils import Bunch
    FLAGS = Bunch(**args["params"]["fitParam"])
    embedded_vec = FLAGS.word_embedding_bs

    def config_default_value(name, value, desc):
        FLAGS.setdefault(name, value)

    # 产生数据
    def training_batch_generator(batch_size):
        for items in _read_data(max_records=batch_size):
            x_basic_info = [item["person_info_vector"] for item in items]
            x_subs = [item["person_subs"] for item in items]
            x_personas = [item["person_behavior_vector"] for item in items]
            x_contents = [item["person_behavior_vector_seq"] for item in items]
            y = [item["like_or_not_like"] for item in items]
            yield np.array(x_basic_info), np.array(x_subs), np.array(x_personas), np.array(x_contents), np.array(y)

现在通过training_batch_generator你已经可以拿到训练数据了。

如何执行

虽然已经简化了处理,但是代码还是不少,为了方便调试,建议使用pyspark shell。运行指令如下:

export PYTHONIOENCODING=utf8;./bin/pyspark --py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar --jars spark-deep-learning-assembly-0.1.0-spark2.1.jar --master "local[*]"

然后把代码黏贴进去就可以了。

上一篇下一篇

猜你喜欢

热点阅读