数据算法 Hadoop/Spark大数据处理---第十二章
2018-07-08 本文已影响26人
_Kantin
本章为K-均值聚类
K-均值聚类的算法思想
==(其中K是希望输入数据生成多少个组合簇,输入一般为:N个d维的点和一个目标K,输出为:找到这些簇的位置ui,使得数据点到簇质心的距离最小)==
- 1.随机选择的初始质心,非质心点指派给最近的质心而形成新的簇
- 2.得到簇之后,重写计算质心:假设k-means聚类过程中,得到某一个簇的集合Ci={p(x1,y1), p(x2,y2), …,p(xn,yn)},则簇Ci的质心,质心x坐标为(x1+x2+ …+xn)/n,质心y坐标为(y1+y2+ …+yn)/n。
-
3.算法的终止条件:质心在每一轮迭代中会发生变化,然后需要重新将非质心点指派给最近的质心而形成新的簇,如果只有==很少的一部分==点在迭代过程中,还在改变簇(如:更新一次质心,有些点从一个簇移动到另一个簇),那么满足这样一个收敛条件,可以提前结束迭代过程。
image -
4.使用的距离函数是曼哈顿距离求值函数
image
//一个求曼哈顿距离的java实现
public class EuclideanDistance{
public static double calculateDistance(Vector center,Vector data){
double sum = 0.0;
int length = center.length;
for(int i=0;i<length;i++){
sum+=Math.pow((center[i]-data[i]),2)
}
return Math.sqrt(sum)
}
}
本章实现方式
- 1.基于Mapreduce的伪代码实现
- 2.基于传统Scala来实现
++基于传统spark来实现++
1. K-均值聚类算法
//k = 期望的簇数 delta=可接受的收敛误差 data=输入的数据
kmeans(k,delta,data){
//初始化簇质心
initial_cretroids = pick(k,data);
//向映射器广播中心
writeToHDFS(initial_cretroids);
current_cretroids= initial_cretroids;
while(true){
//1.map()中使用current_cretroids
//2.reduct() 创建new_cretroids并写至HDFS
theMapReduceJob();
new_cretroids = readFromHDFS();
//当那个簇心的差距很小时,停止求值
if change(new_cretroids,current_cretroids)<=delta{
break;
}else{
current_cretroids=new_cretroids;
}
return = readFromHDFS();
return result;
}
}
2. map端求值
//输入的value为D维的向量
map(Object key,Vector value){
Vector nearest = null;
double nearestDistance = Double.MAX_VALUE
//对每个簇点进行求值
for(Vector center : centers){
double distance = EuclideanDistance.calculateDistance(center,value)
//第一次初始时的赋值
if(nearest = null){
nearest = center;
nearestDistance = distance;
}else{
if()nearestDistance>distance){
nearest = center;
nearestDistance = distance;
}
}
}
//输入为最近簇nearest,value是一个距离(多维的向量)
//为那个点发出一个最近簇和距离,所以要combine()
emit(nearest,value);
}
3. combine对value进行累加求值,在后续的reduce中可用
combine(Vertor key ,Iterable<Vector> values){
Vector sum = new Vector();
for(Vector value :values ){
for(int i=0;i<value.length;i++){
sum[i]+=value[i];
}
}
//将相同簇的向量的每一维度相加
emit(key,sum)
}
4. reduce函数计算新的质心
//combine结果不等于reduce结果,combine只发生在买个小map阶段
reduct(Vertor key , Iterable<Vertor> values){
Vertor newCenter = new Vertor();
int count=0;
for(Vertor value : values){
count++;
for(int i=0;i<value.length;i++){
newCenter+=value[i]
}
}
for(int i=0;i<key.length;i++){
newCenter[i] = newCenter[i]/count
}
emit(key.ID ,newCenter);
}
++基于传统scala来实现++
def main(args: Array[String]): Unit = {
if (args.size < 3) {
println("Usage: ScalaKMeans <input-path> <k> <max-iterations> [<runs>]")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("KMeans")
val sc = new SparkContext(sparkConf)
//输入
val input = args(0)
//簇的个数
val k = args(1).toInt
//迭代的次数iterations
val iterations = args(2).toInt
//读取输入的文件
val lines = sc.textFile(input)
//创建向量点
val points = lines.map(line =>{
val tokens = line.split("\\s+")
Vectors.dense(tokens.map(_.toDouble))
})
//用k均值算法进行训练
val model = KMeans.train(points,k,iterations,KMeans.K_MEANS_PARALLEL)
//多次迭代之后的k均值模式model,计算它的损失值
val cost = model.computeCost(points)
}