JavaRDD 转化为 Dataset的两种方案

2019-04-10  本文已影响0人  Phoebe_Liu

JavaRDD 转化为 Dataset<Row>方案一:

实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

Student.java实体类:

import java.io.Serializable;

@SuppressWarnings("serial")
public class Student implements Serializable {
    private String sid;
    private String sname;
    private int sage;

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public String getSname() {
        return sname;
    }

    public void setSname(String sname) {
        this.sname = sname;
    }

    public int getSage() {
        return sage;
    }

    public void setSage(int sage) {
        this.sage = sage;
    }

    @Override
    public String toString() {
        return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
    }
}

实现一

SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
        final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

        JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
            public Student call(String line) throws Exception {
                String parts[] = line.split(",");
                Student stu = new Student();
                stu.setSid(parts[0]);
                stu.setSname(parts[1]);
                stu.setSage(Integer.valueOf(parts[2]));
                return stu;
            }
        });

        Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
        df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

实现二

SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
        final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

        JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
            public Row call(String line) throws Exception {
                String[] parts = line.split(",");
                String sid = parts[0];
                String sname = parts[1];
                int sage = Integer.parseInt(parts[2]);

                return RowFactory.create(sid, sname, sage);
            }
        });

        ArrayList<StructField> fields = new ArrayList<StructField>();
        StructField field = null;
        field = DataTypes.createStructField("sid", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("sname", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
        fields.add(field);

        StructType schema = DataTypes.createStructType(fields);

        Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
        df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

Java创建ClassTag的方法,我能找到的有三个方法。其中第三种方法,只要需要classtag的地方都可以直接用,参数都不需要。

1:ClassManifestFactory.classType( String.class )。
2:ClassTag.MODULE.apply( String.class )。
3:JavaSparkContext.MODULE.fakeClassTag( )

  1. List 转 Seq:

List<String> tmpList = new ArrayList<>();
tmpList.add("abc");
Seq<String> tmpSeq = JavaConverters.asScalaIteratorConverter(tmpList.iterator()).asScala().toSeq();

  1. Seq 转 List:

List<String> tmpList = scala.collection.JavaConversions.seqAsJavaList(tmpSeq);

上一篇 下一篇

猜你喜欢

热点阅读