spark广播变量

2018-03-14  本文已影响0人  君子慎独焉
package com.everdata.spark;


import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.reflect.ClassTag;





public class AppearOne {

    //private static CanReportRequireConfigBean configBean ;
    private static Long lastTimeMiles = new Date().getTime() - 10 * 60 * 1000;
    
    private static String[] global_args ;
    
    public static void main(String[] args) {
        //读取json配置
        //generateConfig(args[0]);
        global_args = args;
        
        /*if(StringUtils.isNotEmpty(args[1])) {
            lastTimeMiles = DateUtil.parseDate(args[0].trim(), DateUtil.COMPACT_MINUTE_FORMAT).getTime();
            System.out.println(lastTimeMiles +":"+ args[0]);
        }*/
        
        SparkSession spark = SparkSession
                  .builder()
                  .appName("AppearOne")
                  .master("local[1]")
                  .config("spark.sql.parquet.binaryAsString", "true")
                  .getOrCreate();
        System.out.println("=========="+args[0]);
        ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
        Broadcast<String> s=spark.sparkContext().broadcast(args[0], tag);
        Dataset<Row> parquetFileDF = spark.read().parquet("d://xxx.parquet");
        final Dataset<String> baseDS=parquetFileDF.map(new MapFunction<Row,String>(){
            private static final long serialVersionUID = 1L;
            @Override
            public String call(Row value) throws Exception {
                return value.getLong(0)+"";
            }
        },Encoders.STRING());
        System.out.println("==================="+baseDS.count());
        baseDS.javaRDD().foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String line) throws Exception {
                System.out.println(line);
                System.out.println(s.getValue());
            }
        });
    }
}


上一篇下一篇

猜你喜欢

热点阅读