spark streaming + kafka
2018-01-03 本文已影响4人
Jerry_Hao
spark streaming + kafka
官网下载spark spark-2.0.1-bin-hadoop2.7
java代码测试
package douzi.risk;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
public class MainStreaming {
public static void main(String[] args) throws Exception {
// Create context with a 10 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
ArrayList<String> topicsSet = Lists.newArrayList("test");
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "test_kafka_spark_stream_1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// Create direct kafka stream with brokers and topics
ConsumerStrategy<String, String> cs = ConsumerStrategies.Subscribe(topicsSet, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
cs
);
JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> tuple2) {
return tuple2.value();
}
});
JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
spark deploy
运行spark-submit命令行提交jar包 bin/spark-submit --class douzi.risk.MainStreaming /Users/Dev/IdeaProjects/kafka-spark-streaming/target/risk-1.0-SNAPSHOT.jar