数据算法 Hadoop/Spark大数据处理---第十四章
2018-07-08 本文已影响19人
_Kantin
本章为朴素贝叶斯算法
朴素贝叶斯算法的思想
image本章实现方式
- 1.基于spark来实现
- 2.基于传统Scala来实现
++基于传统spark来实现++
1. 先构建分类器
image //获得训练数据集的数量
long trainingDataSize = training.count();
JavaPairRDD<Tuple2<String, String>, Integer> pairs = training.flatMapToPair(new PairFlatMapFunction<String, Tuple2<String, String>, Integer>() {
@Override
public Iterator<Tuple2<Tuple2<String, String>, Integer>> call(String rec) throws Exception {
List<Tuple2<Tuple2<String, String>, Integer>> result =
new ArrayList<Tuple2<Tuple2<String, String>, Integer>>();
String[] tokens = rec.split(",");
//获得分类的结果
int classification = tokens.length - 1;
String theClassfication = tokens[classification];
for (int i = 0; i < (classification - 1); i++) {
Tuple2<String, String> K = new Tuple2<>(tokens[i], theClassfication);
result.add(new Tuple2<Tuple2<String, String>, Integer>(K, 1));
}
Tuple2<String, String> V = new Tuple2<>("CLASS", theClassfication);
result.add(new Tuple2<>(V, 1));
return result.iterator();
}
image
});
JavaPairRDD<Tuple2<String, String>, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i1;
}
});
image
Map<Tuple2<String, String>, Integer> countsMap = counts.collectAsMap();
//创建可能的分类概率表PT和所有的分类CLASSIFICATIONS
//这些是用于存储在HDFS中供调用的
HashMap<Tuple2<String, String>, Double> PT = new HashMap<>();
List<String> CLASSIFICATIONS = new ArrayList<>();
for(Map.Entry<Tuple2<String, String>, Integer> entry : countsMap.entrySet()){
Tuple2<String, String> k = entry.getKey();
//获得分类的目录
String classification = k._2;
if(k._1.equals("CLASS")){
PT.put(k, (double) (entry.getValue()/trainingDataSize));
}else{
Tuple2<String, String> k2 = new Tuple2<>("CLASS", classification);
Integer count = countsMap.get(k2);
if(count == null){
PT.put(k,0.0);
}else{
//count获取的是一个类目的总数
PT.put(k, (double) (entry.getValue()/count.intValue()));
}
}
}
image
//将PT和CLASSFICATION存储到HDFS中去
List<Tuple2<PairOfStrings, DoubleWritable>> list = toWritableList(PT);
JavaRDD<Tuple2<PairOfStrings, DoubleWritable>> ptRDD = ctx.parallelize(list);
ptRDD.saveAsHadoopFile("/naivebayes/pt", // name of path
PairOfStrings.class, // key class
DoubleWritable.class, // value class
SequenceFileOutputFormat.class // output format class
);
JavaRDD<String> classificationsRDD = ctx.parallelize(CLASSIFICATIONS);
classificationsRDD.saveAsTextFile("/naivebayes/classes"); // name of path
// done
ctx.close();
System.exit(0);
}
//把pt的key按照PairOfStrings和值DoubleWritable的方式进行保存
static List<Tuple2<PairOfStrings, DoubleWritable>> toWritableList(HashMap<Tuple2<String, String>, Double> pt) {
List<Tuple2<PairOfStrings, DoubleWritable>> list =
new ArrayList<Tuple2<PairOfStrings, DoubleWritable>>();
for(Map.Entry<Tuple2<String,String>,Double> entry : pt.entrySet()){
list.add(new Tuple2<PairOfStrings, DoubleWritable>( new PairOfStrings(entry.getKey()._1,entry.getKey()._2);
new DoubleWritable(entry.getValue());
}
return list;
}
image
2. 测试训练分类器
//从HDFS中将刚刚保存的ptRDDq取出
JavaPairRDD<PairOfStrings, DoubleWritable> ptRDD = ctx.hadoopFile(nbProbTablePath, SequenceFileInputFormat.class, PairOfStrings.class, DoubleWritable.class);
//将ptRDD反转回正常的分类的概率表
JavaPairRDD<Tuple2<String,String>, Double> classifierRDD = ptRDD.mapToPair(
new PairFunction<
Tuple2<PairOfStrings,DoubleWritable>, // T
Tuple2<String,String>, // K2,
Double // V2
>() {
@Override
public Tuple2<Tuple2<String,String>,Double> call(Tuple2<PairOfStrings,DoubleWritable> rec) {
PairOfStrings pair = rec._1;
Tuple2<String,String> K2 = new Tuple2<String,String>(pair.getLeftElement(), pair.getRightElement());
Double V2 = rec._2.get();
return new Tuple2<Tuple2<String,String>,Double>(K2, V2);
}
});
//将这个classifierRDD转换成Map类型
Map<Tuple2<String, String>, Double> classifier = classifierRDD.collectAsMap();
Broadcast<Map<Tuple2<String, String>, Double>> broadcastClassifier = ctx.broadcast(classifier);
//将分类的结果也提取出来
JavaRDD<String> classesRDD = ctx.textFile("/naivebayes/classes", 1);
List<String> CLASSES = classesRDD.collect();
final Broadcast<List<String>> broadcastClasses = ctx.broadcast(CLASSES);
JavaPairRDD<String,String> classified = newdata.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String rec) throws Exception {
//获得分类的概率结果
Map<Tuple2<String,String>, Double> CLASSIFIER = broadcastClassifier.value();
//获得分类的结果
List<String> CLASSES = broadcastClasses.value();
String[] arrtibutes = rec.split(",");
String selectedClass = null;
double maxPosterior = 0.0;
for(String aClass : CLASSES){
double posterior = CLASSIFIER.get(new Tuple2<>("CLASS",aClass));
for (int i=0;i<arrtibutes.length;i++){
Double probability = CLASSIFIER.get(new Tuple2<>(arrtibutes[i], aClass));
if(probability == null){
probability=0.0;
break;
}else{
posterior*=probability;
}
if (selectedClass ==null){
maxPosterior = posterior;
selectedClass = aClass;
}else{
if (posterior> maxPosterior){
maxPosterior = posterior;
selectedClass = aClass;
}
}
}
}
return new Tuple2<String,String>(rec, selectedClass);
}
});
++基于传统scala来实现++
1. 先构建分类器
val training = sc.textFile(input)
//获得训练数据的个数
val traningDataSize = training count
//对训练数据进行拆分,CLASS+class 1 value+class 1
val pairs = training.flatMap(line =>{
val tokens = line.split(",")
val theClassification = tokens.last
(("CLASS", theClassification), 1) :: tokens.init.map(token => ((token, theClassification), 1)).toList
})
val counts = pairs reduceByKey (_+_)
val countsAsMap = counts collectAsMap
val pt = countsAsMap.map(tuple =>{
if(tuple._1._1 =="CLASS") (tuple._1,(tuple._2/traningDataSize.toDouble)) else{
val count = countsAsMap.getOrElse(("CLASS",tuple._1._2),0)
if(count ==0)(tuple._1,0d) else(tuple._1,(tuple._2/count.toDouble))
}
})
val ptRDD = sc.parallelize(pt.toList)
pt.foreach(f => println(s"${f._1._1},${f._1._2},${f._2}"))
ptRDD.saveAsObjectFile(output + "/naivebayes/pt")
}
2. 对数据进行分类
val newdata = sc.textFile(input)
//从HDFS表中获取分类的概率
val classifierRDD = sc.objectFile[Tuple2[Tuple2[String, String], Double]](nbProbabilityTablePath)
//获取原先保存的分类表和分类类目
val classifier = classifierRDD.collectAsMap();
val broadcastClassifier = sc.broadcast(classifier);
val classesRDD = sc.textFile(classesPath)
val broadcastClasses = sc.broadcast(classesRDD.collect())
val classified = newdata.map(rec =>{
val classifier = broadcastClassifier.value
val classes = broadcastClasses.value
val attributes = rec.split(",")
val class_score = classes.map(aClass =>{
val posterior = classifier.getOrElse(("CLASS", aClass), 1d)
//获得每个属性的概率
val probabilitied = attributes.map(attribute =>{
classifier.getOrElse((attribute,aClass),0d)
})
//把概率累加起来
(aClass,probabilitied.product*posterior)
})
//获得最大的那个class
val maxClass = class_score.maxBy(_._2)
//获得分类的类型
(rec,maxClass._1)
})