pyspark分解机(Factorization Machine

2021-08-04  本文已影响0人  米斯特芳

FM算法主要分三类

  1. SGD(随机梯度下降)
  2. ALS(交替最小二乘法)
  3. MCMC(马尔科夫链蒙特卡罗法)

ALS已经介绍过,pyspark中求解使用的是AdamW(默认)和梯度下降法

模型优点

可用于高度稀疏数据场景;具有线性的计算复杂度

算法介绍

模型公式

\begin{align} y = \sigma(w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j )\end{align}
前2项为全局偏差和线性项,第三项为交互项

分类

from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("FMClassifierExample") \
    .getOrCreate()

data = spark.read.format("libsvm").load("sample_libsvm_data.txt")

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
# stepSize:每次迭代优化的步进
# factorSize:默认8.表示获取成对交互项使用的因子向量维度。个人认为是每个样本转为8维向量,基于此计算成对交互项(看公式应该是内积)
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# 多类分类评估器
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
# 对predictions中设定列执行评估
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]# 注意一下pipeline的模型阶段使用方式
print("Factors: " + str(fmModel.factors))# 表示计算交互项时各样本使用的因子向量
print("Linear: " + str(fmModel.linear))# 表示公式中线性项的系数w
print("Intercept: " + str(fmModel.intercept))# 表示全局偏差

回归

from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("FMRegressorExample") \
    .getOrCreate()

data = spark.read.format("libsvm").load("sample_libsvm_data.txt")

# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
上一篇下一篇

猜你喜欢

热点阅读