人工智能

2.Spark机器学习基础——无监督学习

2021-02-01  本文已影响0人  许志辉Albert

Spark机器学习基础——无监督学习

1.1 K-means

from __future__ import print_function
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import SparkSession
!head -5 data/mllib/sapmle_kmeans_data.txt
1
spark = SparkSession\
        .builder\
        .appName("KMeansExample")\
        .getOrCreate()

dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

#训练K-means聚类模型
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

#预测(即分配聚类中心)
predictions = model.transform(dataset)

#根据Silhousette得分评估
evaluator = ClusteringEvaluator()
silhousette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

#输出预测结果
print("predicted Center: ")
for center in predictions[['prediction']].collect():
    print(center.asDict())

# 聚类中心
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

spark.stop()
2

1.2GMM模型

from __future__ import print_function
from pyspark.ml.clustering import GaussianMixture
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("GaussianMixtureExample")\
        .getOrCreate()

dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(0)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)

spark.stop()
3

1.3关联规则

!head -5 data/mllib/sample_fpgrowth.txt
4
from pyspark.mllib.fpm import FPGrowth
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("FPGrowthExample")\
        .getOrCreate()

data = spark.sparkContext.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)

spark.stop()
5
from pyspark.ml.fpm import FPGrowth
spark = SparkSession\
        .builder\
        .appName("FPGrowthExample")\
        .getOrCreate()

df = spark.createDataFrame([
    (0, [1, 2, 5]),
    (1, [1, 2, 3, 5]),
    (2, [1, 2])
], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()

spark.stop()
6

1.4LDA主体模型

from __future__ import print_function
from pyspark.ml.clustering import LDA
from pyspark.sql import SparkSession

! head -5 data/mllib/sample_lda_libsvm_data.txt
7
spark = SparkSession \
        .builder \
        .appName("LDAExample") \
        .getOrCreate()

# 加载数据
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# 训练LDA模型
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp)+"\n")

# 输出主题
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# 数据集解析
print("transform dataset:\n")
transformed = model.transform(dataset)
transformed.show(truncate=False)

spark.stop()
8 9

1.5PCA降维

from __future__ import print_function
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("PCAExample")\
        .getOrCreate()

# 构建一份fake data
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

# PCA降维
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

spark.stop()
10

1.6 word2vec词嵌入

from __future__ import print_function
from pyspark.ml.feature import Word2Vec
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("Word2VecExample")\
        .getOrCreate()

# 输入是bag of words形式
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# 设置窗口长度等参数,词嵌入学习
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

# 输出词和词向量
model.getVectors().show()

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

spark.stop()
11
上一篇下一篇

猜你喜欢

热点阅读