Spark Streaming 整合Kafka
2020-07-03 本文已影响0人
xiaogp
Spark Streaming简介
Spark Streaming是Spark提供的对实时数据进行流计算的组件, 是一个具有吞吐量高, 容错能力墙的实时流数据处理系统, 支持Kafka, Flume等, 获取资源后可以使用map(), reduce(), json(), window()等高级函数进行复杂的算法处理, 处理结果可以存储到文件系统,数据库, 或者展示到实时数据大盘等.
Spark Streaming数据处理流程.png
Spark Streaming使用离散化流(Discretized Stream)作为抽象表示, 左脚DStream. DStream是随着时间推移而收到的数据序列. 每个时间区间收集到的数据都以RDD的形式存在, DStream是由这些RDD组成的序列那
DStream支持两种类型的操作, 一种是转换操作(Transformation), 会生成一个新的RDD, 另一种是输出操作(Output Operation), 可以把数据持久化写入外部系统.
Spark Streaming WordCount
引入Spark Streaming依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<scope>compile</scope>
</dependency>
代码实现wordcount
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
开启shell中的netcat工具输出
root@amarsoft-H81M-S1:~/# nc -lk 9999
aa a aa bbb c
程序输出
Time: 1593756750000 ms
-------------------------------------------
(aa,2)
(a,1)
(bbb,1)
(c,1)
kafka域Spark Streaming整合
引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
实现往每1秒kafka发送随机数字, 用spark streaming进行消费
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ProducerFastStart {
public static final String brokerList = "192.168.61.97:9092";
public static final String topic = "test_gp";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Random random = new Random();
while (true) {
try {
String msg = String.valueOf(random.nextInt(10));
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
producer.send(record).get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
spark streaming每隔2秒拉取当下时间段的数据进行求和
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
object StreamingWithKafka {
private val brokers = "192.168.61.97:9092"
private val topic = "test_gp"
private val group = "group.demo"
private val checkpointDir = "/opt/kafka/checkpoint"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWithKafka")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDir)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](List(topic), kafkaParams))
val value = stream.map(record => {
val iniVal = Integer.valueOf(record.value())
println("当前值为:" + iniVal.toString())
iniVal
}).reduce(_ + _)
value.print()
ssc.start()
ssc.awaitTermination()
}
}
程序输出
-------------------------------------------
Time: 1593759210000 ms
-------------------------------------------
5
当前值为:6
当前值为:7
-------------------------------------------
Time: 1593759212000 ms
-------------------------------------------
13
当前值为:6
-------------------------------------------
Time: 1593759214000 ms
-------------------------------------------
6
当前值为:1
-------------------------------------------