Spark Streaming整合Flume、Kafka打造实时
这一篇文章详细介绍企业级实时流处理的核心流程,是我们学习大数据的重要里程碑。重要性和重要等级是最高的,请大家务必仔细阅读并认真的进行实操练习。
在我们进行整个整合工作之前,我们需要有一个整体架构图来直观的展示我们的思路:
第一步 整合日志输出到Flume
由于直接通过log4j appender写入到flume,对程序的耦合性太高,我使用了flume主动监控日志文件夹的方式来收集日志的。
关于flume监控文件夹的方式我在flume如何监控多个动态变化的日志文件一文中有详细介绍
第二步 整合Flume到Kafka
#test-avro-memory-hdfsAndkafka的angent,同一份数据写入到两个服务,只需要在flume中配置多个sink即可是实现,配置详情如下。
##配置source
test-avro-memory-hdfsAndkafka.sources=avro-source
##同一份数据,输出到两个目标服务
##输出到hdfs
test-avro-memory-hdfsAndkafka.sinks=hdfs-sink kafka-sink
##输出到kafka
#test-avro-memory-hdfsAndkafka.sinks=kafka-sink
test-avro-memory-hdfsAndkafka.channels=memory-channel
test-avro-memory-hdfsAndkafka.sources.avro-source.type=avro
test-avro-memory-hdfsAndkafka.sources.avro-source.bind=10.101.3.3
test-avro-memory-hdfsAndkafka.sources.avro-source.port=44444
# Describe the hdfs-sink
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.type=hdfs
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.path = hdfs://10.101.3.3:9000/xcx/test/%y-%m-%d/%H%M/
#表示文件的前缀
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.filePrefix = xcx
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileSuffix =.txt
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.writeFormat=Text
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileType=DataStream
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.round = true
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundValue =30
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundUnit = minute
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollSize=1048576
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollCount=0
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.minBlockReplicas=1
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
# Describe the kafka-sink
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.brokerList=10.101.3.3:9092
###需要提前在kafka中新建一个topic,名字为flume_kafka_streaming,供flume和streaming使用
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.topic=flume_kafka_streaming
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.requiredAcks=1
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.batchSize=5
# Use a channel which buffers events in memory
test-avro-memory-hdfsAndkafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
test-avro-memory-hdfsAndkafka.sources.avro-source.channels=memory-channel
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.channel=memory-channel
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.channel=memory-channel