Spark 学习笔记(三)-数据读存-JSON

2018-04-26  本文已影响0人  vision_zhang

JSON是一种半结构化的数据格式,最简单的读取方式是将数据作为文本文件读取,然后使用JSON解析器来对RDD的值进行映射操作。

例子数据
{"name":"上海滩","singer":"叶丽仪","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陈百强","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
{"name":"红日","singer":"李克勤","album":"怀旧专辑","path":"mp3/shanghaitan.mp3"}
{"name":"爱如潮水","singer":"张信哲","album":"怀旧专辑","path":"mp3/airucaoshun.mp3"}
{"name":"红茶馆","singer":"陈惠嫻","album":"怀旧专辑","path":"mp3/redteabar.mp3"}
package spark_Function;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;




public class json {

    public static void main(String[] args) {
        // TODO 自动生成的方法存根

        SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
        JavaSparkContext jsc = new JavaSparkContext(conf);
                JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
        JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
        result.foreach(x -> System.out.println(x));


        jsc.close();

    }

}



class ParseJson implements FlatMapFunction<Iterator<String>,Mp3Info>{
    /**
     * 
     */
    private static final long serialVersionUID = 8603650874403773926L;

    @Override
    public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
        // TODO 自动生成的方法存根
        ArrayList<Mp3Info> mp3 = new ArrayList<Mp3Info>();
        ObjectMapper mapper = new ObjectMapper();
        while(lines.hasNext()){
                String line = lines.next();
            try{
                mp3.add(mapper.readValue(line, Mp3Info.class));
            }catch(Exception e){


            }       
        }
        return mp3.iterator();
    }

}




class Mp3Info implements Serializable{
    private static final long serialVersionUID = -3811808269846588364L;
    private String name;
    private String album;
    private String path;
    private String singer;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getAlbum() {
        return album;
    }
    public void setAlbum(String album) {
        this.album = album;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getSinger() {
        return singer;
    }
    public void setSinger(String singer) {
        this.singer = singer;
    }
    @Override
    public String toString() {
        return "Mp3Info [name=" + name + ", album=" 
                 + album + ", path=" + path + ", singer=" + singer + "]";
    }

}

ObjectMapper类是Jackson库的主要类,提供方法将java对象与json结构匹配,

处理格式不正确的记录可能会引起很要中的错误,尤其是像JSON这样的半结构化数据来说。对于大规模的数据集来说格式错误很常见,所以如果选这跳过错误的数据应该使用累加器来跟踪错误。

package spark_Function;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;




public class json {

    public static void main(String[] args) {
        // TODO 自动生成的方法存根

        SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
        JavaSparkContext jsc = new JavaSparkContext(conf);


        JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
        JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
                                      filter(
                                          x->x.getAlbum().equals("怀旧专辑")
                                      );
        JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
        result.foreach(x->System.out.println(x));
        formatted.saveAsTextFile("G:/sparkRS/wjson");

        jsc.close();

    }

}

class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
    /**
     * 
     */
    private static final long serialVersionUID = -6590868830029412793L;

    public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
        ArrayList<String> text = new ArrayList<String>();
        ObjectMapper mapper = new ObjectMapper();
        while (song.hasNext()) {
            Mp3Info person = song.next();
            text.add(mapper.writeValueAsString(person));
        }
        return text.iterator();
    }
}



class Mp3Info implements Serializable{
    private static final long serialVersionUID = -3811808269846588364L;
    private String name;
    private String album;
    private String path;
    private String singer;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getAlbum() {
        return album;
    }
    public void setAlbum(String album) {
        this.album = album;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getSinger() {
        return singer;
    }
    public void setSinger(String singer) {
        this.singer = singer;
    }
    @Override
    public String toString() {
        return "Mp3Info [name=" + name + ", album=" 
                 + album + ", path=" + path + ", singer=" + singer + "]";
    }

}
上一篇下一篇

猜你喜欢

热点阅读