数据算法 Hadoop/Spark大数据处理---第十二章

2018-07-08  本文已影响26人  _Kantin

本章为K-均值聚类

K-均值聚类的算法思想

==(其中K是希望输入数据生成多少个组合簇,输入一般为:N个d维的点和一个目标K,输出为:找到这些簇的位置ui,使得数据点到簇质心的距离最小)==

//一个求曼哈顿距离的java实现
public class EuclideanDistance{
    public static double calculateDistance(Vector center,Vector data){
        double sum = 0.0;
        int length = center.length;
        for(int i=0;i<length;i++){
            sum+=Math.pow((center[i]-data[i]),2)
        }    
        return Math.sqrt(sum)
    }
}

本章实现方式


++基于传统spark来实现++

1. K-均值聚类算法

//k = 期望的簇数 delta=可接受的收敛误差 data=输入的数据
kmeans(k,delta,data){
    //初始化簇质心
    initial_cretroids = pick(k,data);
    //向映射器广播中心
    writeToHDFS(initial_cretroids);
    current_cretroids=  initial_cretroids;
    while(true){
        //1.map()中使用current_cretroids
        //2.reduct() 创建new_cretroids并写至HDFS
        theMapReduceJob();
        new_cretroids = readFromHDFS();
        //当那个簇心的差距很小时,停止求值
        if change(new_cretroids,current_cretroids)<=delta{
            break;
        }else{
            current_cretroids=new_cretroids;
        }
        return =  readFromHDFS();
        return result;
    }
}

2. map端求值

//输入的value为D维的向量
map(Object key,Vector value){
    Vector nearest = null;
    double nearestDistance = Double.MAX_VALUE
    //对每个簇点进行求值
    for(Vector center : centers){
        double distance = EuclideanDistance.calculateDistance(center,value)
        //第一次初始时的赋值
        if(nearest = null){
            nearest = center;
            nearestDistance = distance;
        }else{
            if()nearestDistance>distance){
                 nearest = center;
                 nearestDistance = distance;
            }
        }
    }
    //输入为最近簇nearest,value是一个距离(多维的向量)
    //为那个点发出一个最近簇和距离,所以要combine()
    emit(nearest,value);
}

3. combine对value进行累加求值,在后续的reduce中可用

combine(Vertor key ,Iterable<Vector> values){
    Vector sum = new Vector();
    for(Vector value :values ){
        for(int i=0;i<value.length;i++){
            sum[i]+=value[i];
        }
    }
    //将相同簇的向量的每一维度相加
    emit(key,sum)
}

4. reduce函数计算新的质心

//combine结果不等于reduce结果,combine只发生在买个小map阶段
reduct(Vertor key , Iterable<Vertor> values){
    Vertor newCenter = new Vertor();
    int count=0;
    for(Vertor value : values){
        count++;
        for(int i=0;i<value.length;i++){
            newCenter+=value[i]
        }
    }
    for(int i=0;i<key.length;i++){
        newCenter[i] = newCenter[i]/count
    }
    emit(key.ID ,newCenter);
}

++基于传统scala来实现++

def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: ScalaKMeans <input-path> <k> <max-iterations> [<runs>]")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("KMeans")
    val sc = new SparkContext(sparkConf)
    //输入
    val input = args(0)
    //簇的个数
    val k  = args(1).toInt
    //迭代的次数iterations
    val iterations = args(2).toInt
    //读取输入的文件
    val lines = sc.textFile(input)

    //创建向量点
    val points = lines.map(line =>{
      val tokens = line.split("\\s+")
      Vectors.dense(tokens.map(_.toDouble))
    })
    //用k均值算法进行训练
    val model = KMeans.train(points,k,iterations,KMeans.K_MEANS_PARALLEL)
    //多次迭代之后的k均值模式model,计算它的损失值
    val cost = model.computeCost(points)
  }

上一篇下一篇

猜你喜欢

热点阅读