spark广播变量
2018-03-14 本文已影响0人
君子慎独焉
package com.everdata.spark;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.reflect.ClassTag;
public class AppearOne {
//private static CanReportRequireConfigBean configBean ;
private static Long lastTimeMiles = new Date().getTime() - 10 * 60 * 1000;
private static String[] global_args ;
public static void main(String[] args) {
//读取json配置
//generateConfig(args[0]);
global_args = args;
/*if(StringUtils.isNotEmpty(args[1])) {
lastTimeMiles = DateUtil.parseDate(args[0].trim(), DateUtil.COMPACT_MINUTE_FORMAT).getTime();
System.out.println(lastTimeMiles +":"+ args[0]);
}*/
SparkSession spark = SparkSession
.builder()
.appName("AppearOne")
.master("local[1]")
.config("spark.sql.parquet.binaryAsString", "true")
.getOrCreate();
System.out.println("=========="+args[0]);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
Broadcast<String> s=spark.sparkContext().broadcast(args[0], tag);
Dataset<Row> parquetFileDF = spark.read().parquet("d://xxx.parquet");
final Dataset<String> baseDS=parquetFileDF.map(new MapFunction<Row,String>(){
private static final long serialVersionUID = 1L;
@Override
public String call(Row value) throws Exception {
return value.getLong(0)+"";
}
},Encoders.STRING());
System.out.println("==================="+baseDS.count());
baseDS.javaRDD().foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String line) throws Exception {
System.out.println(line);
System.out.println(s.getValue());
}
});
}
}