Java-Spark系列8-Spark streaming整合K
一. Spark streaming整合Kafka概述
1.1 Maven配置
对于使用SBT/Maven项目定义的Scala/Java应用程序,将您的流应用程序与以下工件链接(参见主编程指南中的链接部分获取更多信息)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.4.0
不要手动添加依赖于org.apache.kafka的工件(例如kafka-clients)。spark-streaming-kafka-0-10工件已经有了适当的传递依赖,并且不同的版本可能难以诊断的方式不兼容。
1.2 创建Direct Stream
注意,导入的命名空间包括版本,org.apache.spark.streaming.kafka010
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
对于可能的kafkaParams,请参阅Kafka消费者配置文档。如果你的Spark批处理时间大于Kafka默认的心跳会话超时时间(30秒),适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批,这将需要在代理上更改group.max.session.timeout.ms。注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的存储偏移量。
1.3 定位策略
新的Kafka消费者API会预取消息到缓冲区。因此,出于性能考虑,Spark集成将缓存的消费者保留在执行器上(而不是为每批重新创建它们),并倾向于在拥有适当消费者的主机位置上调度分区,这一点很重要。
在大多数情况下,你应该使用LocationStrategies。preferred consistent如上所示。这将在可用的执行器之间均匀地分布分区。如果你的执行器和你的Kafka代理在同一个主机上,使用PreferBrokers,它会更喜欢在Kafka的leader上为那个分区调度分区。最后,如果分区之间的负载有显著的倾斜,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者的缓存默认最大大小为64。如果你希望处理超过(64 *个执行器)的Kafka分区,你可以通过spark.streaming.kafka.consumer.cache.maxCapacity来改变这个设置。
如果你想要关闭Kafka消费者的缓存,你可以设置spark.streaming.kafka.consumer.cache.enabled为false。为了解决SPARK-19185中描述的问题,可能需要禁用缓存。一旦Spark -19185被解析,这个属性可能会在Spark的后续版本中被删除。
缓存是由topicpartition和group.id决定的,所以使用一个单独的组。每个调用createDirectStream的id。
1.4 消费者的策略
新的Kafka消费者API有许多不同的方式来指定主题,其中一些需要大量的后对象实例化设置。ConsumerStrategies提供了一个抽象,允许Spark在从检查点重新启动后获得正确配置的消费者。
ConsumerStrategies。如上所示,订阅允许您订阅固定的主题集合。SubscribePattern允许您使用正则表达式来指定感兴趣的主题。注意,与0.8集成不同,使用Subscribe或SubscribePattern应该在流运行期间响应添加分区。最后,Assign允许指定一个固定的分区集合。这三种策略都有重载构造函数,允许您指定特定分区的起始偏移量。
如果您有特定的消费者设置需求,而上述选项无法满足这些需求,那么ConsumerStrategy是一个可以扩展的公共类。
1.5 创建RDD
如果您有一个更适合批处理的用例,您可以为定义的偏移量范围创建一个RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
注意,您不能使用PreferBrokers,因为如果没有流,就没有驱动端消费者自动为您查找代理元数据。如果有必要,可以将PreferFixed与自己的元数据查找一起使用。
1.6 获得Offsets
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
注意,对hasoffsetrange的类型转换只有在对createDirectStream的结果调用的第一个方法中才会成功,而不是在之后的方法链中。需要注意的是,RDD分区和Kafka分区之间的一对一映射在任何shuffle或重分区方法之后都不会保留,例如reduceByKey()或window()。
1.7 存储 Offsets
Kafka的传递语义在失败的情况下取决于偏移量如何和何时存储。Spark输出操作至少一次。因此,如果您想要等价于精确一次语义,则必须要么在幂等输出之后存储偏移量,要么在原子事务中与输出一起存储偏移量。通过这个集成,您有3个选项来存储偏移量,以提高可靠性(和代码复杂度)。
1.8 检查点
如果启用Spark检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。你的输出操作必须是幂等的,因为你会得到重复的输出;事务不是一个选项。此外,如果应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这个问题(因为输出无论如何都需要是幂等的,它们不应该冲突)。但是对于需要更改代码的计划外故障,除非有其他方法来确定已知的良好起始偏移量,否则您将丢失数据。
1.9 Kafka自身
Kafka有一个偏移量提交API,它将偏移量存储在一个特殊的Kafka主题中。默认情况下,新使用者将定期自动提交偏移量。这几乎肯定不是您想要的,因为由使用者成功轮询的消息可能还没有导致Spark输出操作,从而导致未定义的语义。这就是为什么上面的流例子将" enable.auto.commit "设置为false。然而,你可以在知道你的输出已经被存储后,使用commitAsync API提交偏移量给Kafka。与检查点相比,Kafka的好处是不管你的应用代码发生了什么变化,它都是一个持久的存储。然而,Kafka不是事务性的,所以你的输出必须仍然是幂等的。
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
1.10 自身数据存储
对于支持事务的数据存储,将偏移量作为结果保存在同一个事务中可以使两者保持同步,即使在失败的情况下也是如此。如果您非常小心地检测重复或跳过的偏移范围,则回滚事务可以防止重复或丢失的消息影响结果。这提供了等价的“一次精确”语义。甚至对于聚集产生的输出也可以使用这种策略,聚集通常很难使其幂等。
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
二.Spark Streaming整合Kafka实战
2.1 Maven配置
下面是我整个项目的Maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkStudy</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.zqs.kafka.Producer</mainClass> <!-- 此处为主入口-->
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.2 代码
生产者代码:
package org.zqs.kafka;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static String topic = "test_20210816_2";//定义主题
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
SparkStreaming消费代码
package org.zqs.kafka;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
public class SparkStreaming2 {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingFromkafka");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf , Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//多个可用ip可用","隔开
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "sparkStreaming");
Collection<String> topics = Arrays.asList("test_20210816_2");//配置topic,可以是数组
JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream =KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>(){
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
}
});
javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String,String>>() {
@Override
public void call(JavaPairRDD<String, String> javaPairRDD) throws Exception {
// TODO Auto-generated method stub
javaPairRDD.foreach(new VoidFunction<Tuple2<String,String>>() {
@Override
public void call(Tuple2<String, String> tuple2)
throws Exception {
// TODO Auto-generated method stub
System.out.println(tuple2._2);
}
});
}
});
/*JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
*/
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
}
}
2.3 测试
-- 打包
mvn package
-- 上传文件到服务器
-- 提交到spark集群
spark-submit \
--class org.zqs.kafka.SparkStreaming2 \
--master local[2] \
/home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
生产者要提前运行
image.png