每天都有新的收获

MapReduce算法模式-倒排索引模式

2018-02-05  本文已影响22人  24格的世界
夜深了我还没有休息,临近过春节了,可是加班还是那么的多,心累了,好想找个地方休息休息,放松下自己 、、、
可是路还要走,生活还要继续,洗把脸,我还要战斗,生活就那点事、、、

一、倒排索引

倒排索引,是一种为了提高搜索效率而创建的索引,是一种数据结构。在搜索索引中输入关键词,然后让搜索引擎取互联网采集包括关键词的信息网络列表返还给你,但是这个样会耗费比较长的时间,但是如果在查询之前,就知道关键词已经存在哪些网页中,这样的话那查询的效率会更加的快速。

在本文章中通过mapreduce实现倒排索引的:
数据例子如下:
Facebook is very good and very nice
Google is very good too
Tencent is very good too

所要实现的效果:
Facebook 第1行数据,索引位置0 :1;
Google 第2行数据,索引位置0 :1;
Tencent 第3行数据,索引位置0 :1;
and 第1行数据,索引位置4 :1;
good 第1行数据,索引位置3 :1;第2行数据,索引位置3 :1;第3行数据,索引位置3 :1;
is 第1行数据,索引位置1 :1;第2行数据,索引位置1 :1;第3行数据,索引位置1 :1;
nice 第1行数据,索引位置6 :1;
too 第2行数据,索引位置4 :1;第3行数据,索引位置4 :1;
very 第1行数据,索引位置2 :1;第1行数据,索引位置5 :1;第2行数据,索引位置2 :1;第3行数据,索引位置2 :1;

整个项目的结构如下:

InvertedIndexKeyValue : 存放map和reduce中间输出变量的函数(这里只实现了map的中间比变量)
InvertedIndexMain:整个项目的入口函数,实现配置map和reduce类的入口
InvertedIndexMapper:项目的map过程
InvertedIndexReducer:项目的reduce过程

项目目录结构.png

分开介绍各个函数的结构:

InvertedIndexKeyValue:主要是定义带有中文描述的数据结构,为map的中间结果使用,当然这里面也可以添加reduce的中间结果,依据需求添加;

package InvertedIndex;

public class InvertedIndexKeyValue {
    //切分单词,形成相应的倒排索引的模式数据
    private String keywords;  //定义map中间结果的key
    private String describe;  //定义map中间结果的 value
    public void setKeywords(String keywords) {
        this.keywords = keywords;
    }
    public void setDescribe(String describe) {
        this.describe = describe;
    }
    public String getKeywords() {
        return keywords;
    }
    public String getDescribe() {
        return describe;
    }
}

InvertedIndexMapper:实现map的过程,主要数统计文件中每一行的数据中单词出现的词频,形成<key,value>的键值对;其中key的构成带有两部分:单词+单词位于第几行中出现;

package InvertedIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

public class InvertedIndexMapper {
    private static Text text= new Text();

    public static InvertedIndexKeyValue AnalysisKeyWord(String keyword,String i,String j){
        //分开英文单词的函数,以空格进行分词的操作
        InvertedIndexKeyValue invertedIndexKeyValue=new InvertedIndexKeyValue();
        invertedIndexKeyValue.setKeywords(keyword+":"+"第"+i+"行数据,索引位置"+j+" ");
        invertedIndexKeyValue.setDescribe("1");
        return invertedIndexKeyValue;
    }

    public static void delfile(Configuration conf, Path path) throws Exception{
        //每次重复执行之前对相应目录进行清除的操作,确保程序正常执行
        FileSystem fs =FileSystem.get(new URI(path.toString()),conf);
        Path fspath = path;
        if(fs.exists(fspath)){
            fs.delete(fspath,true);
        }
    }

    public static class MapIndex extends Mapper<LongWritable,Text,Text,Text>{
        //先执行计数的过程
        private Text valueof = new Text();
        private Text keyof = new Text();
        private int i=1;
        protected void map(LongWritable key, Text text, Context context) throws IOException, InterruptedException{
            StringTokenizer stringTokenizer = new StringTokenizer(text.toString());
            int index =0; //每个单词位于每一行的数据的索引位置
            while(stringTokenizer.hasMoreTokens()){
                //对每一行进行处理的过程
                InvertedIndexKeyValue invertedIndexKeyValue=InvertedIndexMapper.AnalysisKeyWord(stringTokenizer.nextToken(),String.valueOf(i),String.valueOf(index));
                keyof.set(invertedIndexKeyValue.getKeywords().toString());
                valueof.set(invertedIndexKeyValue.getDescribe().toString());
                context.write(keyof,valueof);
                index++;
            }
            i+=1;
        }
    }
}

InvertedIndexReducer:此函数由两部分构成,Combine和Reduce两个类构成,分别进行reduce的过程;

Combine处理的过程是:map的输出结果的部分,key的形式为:Facebook:第1行数据,索引位置0,而value的形式为1,combine的过程为拆分出“:”之前的单词进行reduce的过程统计值啊每一行上单词出现的词频;
Reduce处理的过程是:把Combine的处理结果按照单词相同和相关描述进行归类,把相关描述按照串联的情况写到一起

package InvertedIndex;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class InvertedIndexReducer {
    // 完成词频统计的工作
    public static class ReduceIndex extends Reducer<Text,Text,Text,Text>{
        private Text keyindex = new Text();
        private Text valueindex = new Text();
        protected void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException{
            StringBuilder stringBuilder = new StringBuilder();
            for (Text va :value){
                stringBuilder.append(va.toString()+";");
            }
            keyindex.set(key);
            valueindex.set(stringBuilder.toString());
            context.write(keyindex,valueindex);
        }
    }

    public static class Combine extends Reducer<Text,Text,Text,Text>{
        private Text keyinfo = new Text();
        private Text valueinfo = new Text();
        protected void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException{
            //开始统计词频的过程
            int sum = 0 ;
            for (Text text:value){
                sum += Integer.parseInt(text.toString()); //对value的值进行统计处理
            }
            //重新设计key和value的值,sum座位value的值,对key值进行拆分处理,一部分取出作为value的值进行处理
            int spiltindex = key.toString().indexOf(":");
            keyinfo.set(key.toString().substring(0,spiltindex));
            valueinfo.set(key.toString().substring(spiltindex+1)+":"+sum);
            context.write(keyinfo,valueinfo);
        }
    }
}

InvertedIndexMain:负责的相应的类的调用的过程

package InvertedIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexMain {
    //倒排索引的主函数
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Path path1 = new Path("InvertedIndex/InvertedIndexFile");
        //Path path2 = new Path(args[1]);
        Path path2 = new Path("outputInvertedIndex");
        InvertedIndexMapper.delfile(conf,path2);
        Job job= Job.getInstance(conf,"InvertedIndex");

        FileInputFormat.setInputPaths(job,path1);
        FileOutputFormat.setOutputPath(job,path2);

        job.setJarByClass(InvertedIndexMain.class);

        job.setMapperClass(InvertedIndexMapper.MapIndex.class);
        job.setCombinerClass(InvertedIndexReducer.Combine.class);
        job.setReducerClass(InvertedIndexReducer.ReduceIndex.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //map函数搞定
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.waitForCompletion(true);
    }
}

执行结果.png

PS:hadoop的本地环境的配置参考我上一篇文章!!!

加班过后,熬夜写了今天的学习内容,虽然累,但是觉得还是值得的,为自己加油!!!
上一篇下一篇

猜你喜欢

热点阅读