Spark Streaming整合Flume、Kafka打造实时

2019-06-22  本文已影响0人  机灵鬼鬼

这一篇文章详细介绍企业级实时流处理的核心流程,是我们学习大数据的重要里程碑。重要性和重要等级是最高的,请大家务必仔细阅读并认真的进行实操练习。

在我们进行整个整合工作之前,我们需要有一个整体架构图来直观的展示我们的思路:

第一步  整合日志输出到Flume

由于直接通过log4j appender写入到flume,对程序的耦合性太高,我使用了flume主动监控日志文件夹的方式来收集日志的。

关于flume监控文件夹的方式我在flume如何监控多个动态变化的日志文件一文中有详细介绍

第二步  整合Flume到Kafka

#test-avro-memory-hdfsAndkafka的angent,同一份数据写入到两个服务,只需要在flume中配置多个sink即可是实现,配置详情如下。

##配置source

test-avro-memory-hdfsAndkafka.sources=avro-source

##同一份数据,输出到两个目标服务

##输出到hdfs

test-avro-memory-hdfsAndkafka.sinks=hdfs-sink kafka-sink

##输出到kafka

#test-avro-memory-hdfsAndkafka.sinks=kafka-sink

test-avro-memory-hdfsAndkafka.channels=memory-channel

test-avro-memory-hdfsAndkafka.sources.avro-source.type=avro

test-avro-memory-hdfsAndkafka.sources.avro-source.bind=10.101.3.3

test-avro-memory-hdfsAndkafka.sources.avro-source.port=44444

# Describe the hdfs-sink

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.type=hdfs

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.path = hdfs://10.101.3.3:9000/xcx/test/%y-%m-%d/%H%M/

#表示文件的前缀

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.filePrefix = xcx

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileSuffix =.txt

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.writeFormat=Text

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileType=DataStream

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.round = true

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundValue =30

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundUnit = minute

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollSize=1048576

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollCount=0

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.minBlockReplicas=1

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true

# Describe the kafka-sink

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.brokerList=10.101.3.3:9092

###需要提前在kafka中新建一个topic,名字为flume_kafka_streaming,供flume和streaming使用

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.topic=flume_kafka_streaming

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.requiredAcks=1

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.batchSize=5

# Use a channel which buffers events in memory

test-avro-memory-hdfsAndkafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel

test-avro-memory-hdfsAndkafka.sources.avro-source.channels=memory-channel

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.channel=memory-channel

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.channel=memory-channel

第三步  整合Kafka到Spark Streaming

请参看Spark Streaming整合kafka

第四步 使用Spark Streaming对接收到的数据进行处理

请参看Spark Streaming整合kafka

上一篇下一篇

猜你喜欢

热点阅读