Apache Kafka

spark streaming + kafka

2018-01-03  本文已影响4人  Jerry_Hao

spark streaming + kafka

官网下载spark spark-2.0.1-bin-hadoop2.7

java代码测试

package douzi.risk;

import java.util.HashMap;
import java.util.ArrayList;
import java.util.Map;

import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;

public class MainStreaming {

    public static void main(String[] args) throws Exception {
        // Create context with a 10 seconds batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        ArrayList<String> topicsSet = Lists.newArrayList("test");
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "test_kafka_spark_stream_1");
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        // Create direct kafka stream with brokers and topics
        ConsumerStrategy<String, String> cs = ConsumerStrategies.Subscribe(topicsSet, kafkaParams);
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
                LocationStrategies.PreferConsistent(),
                cs
        );

        JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> tuple2) {
                return tuple2.value();
            }
        });

        JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<>(s, 1);
                    }
                }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        wordCounts.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
}

spark deploy

运行spark-submit命令行提交jar包 bin/spark-submit --class douzi.risk.MainStreaming /Users/Dev/IdeaProjects/kafka-spark-streaming/target/risk-1.0-SNAPSHOT.jar

上一篇下一篇

猜你喜欢

热点阅读