Spark中文文本分类

2021-04-25  本文已影响0人  郭彦超

文本分类是指将一篇文章归到事先定义好的某一类或者某几类,互联网时代到来,数据以指数级增长,自媒体的兴起,让文本的增长更是突飞猛进,文档作为一种非结构化的数据(MySQL 中存放的是结构化数据),对于它的分析本来就存在一定的难度,再加上数据量的猛增,让原本 Python 的单机机器学习也压力倍增,显得力不从心。。
本文介绍使用Spark MLlib提供的朴素贝叶斯(Naive Bayes)及随机森林算法,完成对中文文本的分类过程。主要包括中文分词、文本向量化表示(TF-IDF、word2vec)、模型训练、分类预测等。

中文分词

对于中文文本分类而言,需要先对文章进行分词,我使用的是Hanlp中文分析工具

<dependency>
    <groupId>com.hankcs</groupId>
    <artifactId>hanlp</artifactId>
    <version>portable-1.8.1</version>
</dependency>

中文特征向量化处理

对文本特征处理,即文本向量化的过程。常用的特征处理方法有:


TF-IDF 从字面意思来看分为 TF 和 IDF,TF 的意思是 Term Frequency,也就是词在文章中出现的频率,可以简单的认为是:一个词在文章中出现的频率越高,代表这个词越重要。比如:“坦克”这个词在军事类文章中出现了很多次,那么这个词对这类文章就会很重要,可能经济类的文章也会偶尔出现“坦克”,但肯定不会出现很多,那么这个词对经济类文章相对而言就不是那么重要。

IDF 的意思是 Inverse Document Frequency,也就是逆文本频率,可以认为是:一些词在一类文章中出现很多,如“坦克”,但在其他经济、政治类文章中很少出现,那么这个词就具有很好的分类能力,但相反,一些词在很多文章中都出现,如“有的”、“我们”等,它们虽然在很多文章中都出现了,但并没有很好的分类的能力,这个时候逆词频就发挥作用了,你出现的越多,你的比重反而下降了。

分好词后,每一个词都作为一个特征,但需要将中文词语转换成Double型来表示,通常使用该词语的TF-IDF值作为特征值,Spark提供了全面的特征抽取及转换的API,非常方便,详见:TF-IDF的API

0,苹果 官网 苹果 宣布
1,苹果 梨 香蕉

举个例子,“苹果”在 1 篇文章共 1000 个词中总共出现了 10 次,那么“苹果”的 TF 就是 10/1000 = 0.01,“苹果”在 10000 篇文章中只在 10 篇里面出现过,那么“苹果”的 IDF 就是lg(10000/10) = 3,那么“苹果”的 TF-IDF 值就是 0.01*3 = 0.03。
TFIDF特征处理如下:

case class RawDataRecord(category: String, text: String)
 
//将原始数据映射到DataFrame中,字段category为分类编号,字段text为分好的词,以空格分隔
srcDF.select("category", "text").take(2).foreach(println)
[0,苹果 官网 苹果 宣布]
[1,苹果 梨 香蕉]
//将分好的词转换为数组
var tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
var wordsData = tokenizer.transform(srcDF)
 
wordsData.select($"category",$"text",$"words").take(2).foreach(println)
[0,苹果 官网 苹果 宣布,WrappedArray(苹果, 官网, 苹果, 宣布)]
[1,苹果 梨 香蕉,WrappedArray(苹果, 梨, 香蕉)]
 
//将每个词转换成Int型,并计算其在文档中的词频(TF)
var hashingTF = 
new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
var featurizedData = hashingTF.transform(wordsData)

这里将中文词语转换成INT型的Hashing算法,类似于Bloomfilter,上面的setNumFeatures(100)表示将Hash分桶的数量设置为100个,这个值默认为2的20次方,即1048576,可以根据你的词语数量来调整,一般来说,这个值越大,不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存,和Bloomfilter是一个道理。

featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[2.0,1.0,1.0])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[1.0,1.0,1.0])]

结果中,“苹果”用23来表示,第一个文档中,词频为2,第二个文档中词频为1.

//计算TF-IDF值
var idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
var idfModel = idf.fit(featurizedData)
var rescaledData = idfModel.transform(featurizedData)
rescaledData.select($"category", $"words", $"features").take(2).foreach(println)
 
[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]
 
//因为一共只有两个文档,且都出现了“苹果”,因此该词的TF-IDF值为0.

特征转换

最后将上述数据转换为Bayes输入格式

var trainDataRdd = rescaledData.select($"category",$"features").map {
    case Row(label: String, features: Vector) =>
    LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
}

每一个LabeledPoint中,特征数组的长度为100(setNumFeatures(100)),”官网”和”宣布”对应的特征索引号分别为81和96,因此,在特征数组中,第81位和第96位分别为它们的TF-IDF值。

模型训练

数据准备好了,接下来进行模型训练及分类预测,代码:

%spark
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import com.hankcs.hanlp.HanLP;
import scala.collection.JavaConversions._
import com.hankcs.hanlp.seg.common.Term;
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{TrainValidationSplit, CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.classification.{RandomForestClassifier, GBTClassifier}
// import spark.implicits._

val df = spark.read.option("header",true).csv("/data/stat/recommend/ireg2/2021-05-12.csv")


case class wordFearture(category:String, wordsFearture:String)


val wordsSet = df.where("category_id is not null and name<>'会员免费'").rdd.map(row=>{
//   try {
    //  print(row)
     var words = ""
    
     val ls = HanLP.segment(row.getAs("title").toString())
     for(item <- ls){
        if(item.word.length>=1 && !item.word.startsWith("%")){
            words = words + item.word + " "
        }
     }
     wordFearture(row.getAs[String]("name") , words)
//   }catch {
//         //如果解析报错赋予空值
//      case e:Exception=> print(e)
//   }
 })

val wordsDF = spark.createDataFrame(wordsSet)
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("label")
  .fit(wordsDF)
//val indexed = indexer.transform(wordsDF)

val tokenizer = new Tokenizer().setInputCol("wordsFearture").setOutputCol("words")
 
//TF IDF

val hashingTF = new HashingTF()
   .setInputCol("words").setOutputCol("rawFeatures")

//将上一步的 TF 计算 IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")

val VECTOR_SIZE = 512
//word2vec
val word2Vec = new Word2Vec()
  .setInputCol("words")
  .setOutputCol("features")
  .setVectorSize(VECTOR_SIZE)
  .setMinCount(1)

val nb = new NaiveBayes()
val layers = Array[Int](VECTOR_SIZE,6,5,indexer.labels.size)
// val md = new MultilayerPerceptronClassifier().setLayers(layers).setBlockSize(512).setSeed(1234L).setMaxIter(128).setFeaturesCol("features").setPredictionCol("prediction")

val md = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(20)
  .setMaxDepth(5)

val converter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictionName") 
  .setLabels(indexer.labels) 

//贝叶斯分类  
//val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, hashingTF, idf, nb, converter))
//随机森林分类树
val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, word2Vec, md, converter))

//网格参数使得超参数调优更加的方便,只需要在网格中加入可能的参数
val paramGrid = new ParamGridBuilder()
  .addGrid(nb.smoothing, Array(0.5, 1,1.5))
  .build()

//将所有的步骤加入到 TrainValidationSplit 中,包括 训练器、评估方法、模型的网格参数、并行度等
// val cv = new TrainValidationSplit()
//   .setEstimator(pipeline)
//   .setEvaluator(new MulticlassClassificationEvaluator)
//   .setEstimatorParamMaps(paramGrid)
//   .setTrainRatio(0.7)
//   .setParallelism(2)
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new MulticlassClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel
  
val Array(training, test) = wordsDF.randomSplit(Array(0.8, 0.2), seed = 12345)

// val model = cv.fit(training)
val model = pipeline.fit(training)

val predictions = model.transform(test)

//评估模型
val evaluator = new MulticlassClassificationEvaluator()
   .setLabelCol("label")
   .setPredictionCol("prediction")
   .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
 
import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.sql.functions._
predictions.select($"category", $"predictionName",round(element_at(vector_to_array($"probability"),1),4)).show

上一篇下一篇

猜你喜欢

热点阅读