尚硅谷大数据技术之电信客服

2018-12-19  本文已影响59人  尚硅谷教育

|

public static void main(String[] args) {

if(args == null || args.length <= 0) {

System.out.println("no arguments");

System.exit(1);

}

ProductLog productLog = new ProductLog();

productLog.initContacts();

productLog.writeLog(args[0], productLog);

}

|

3.1.3 打包测试

分别在Windows****上和Linux****中进行测试:

|

java -cp producer.jar com.atguigu.producer.ProductLog /本地目录/callLog.csv

|

为日志生成任务编写bash****脚本:productlog.sh

|

!/bin/bash

java -cp /home/atguigu/call/ producer.jar com.atguigu.producer.ProductLog /home/atguigu/call/calllog.csv

|

3.2 数据采集/消费(存储)

欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。

flume****:cloudera****公司研发

适合下游数据消费者不多的情况;

适合数据安全性要求不高的操作;

适合与Hadoop生态圈对接的操作。

kafka****:linkedin****公司研发

适合数据下游消费众多的情况;

适合数据安全性要求较高的操作(支持replication);

因此我们常用的一种模型是:

线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

消费存储模块流程如图2****所示:

image.png

3.2.1 数据采集

思路:

a) 配置kafka,启动zookeeper和kafka集群;

b) 创建kafka主题;

c) 启动kafka控制台消费者(此消费者只用于测试使用);

d) 配置flume,监控日志文件;

e) 启动flume监控任务;

f) 运行日志生产脚本;

g) 观察测试。

1)启动zookeeper,kafka集群

|

$/opt/module/kafka/bin/kafka-server-start.sh /opt/module/kafka/config/server.properties

|

2)创建kafka主题

|

$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic calllog --create --replication-factor 1 --partitions 3

|

检查一下是否创建主题成功:

|

$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

|

3)启动kafka控制台消费者,等待flume信息的输入

|

$ /opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 -topic calllog --from-beginning

|

4)配置flume(flume-kafka.conf)

|

define

a1.sources = r1

a1.sinks = k1

a1.channels = c1

source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F -c +0 /home/atguigu/call/calllog.csv

a1.sources.r1.shell = /bin/bash -c

sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sinks.k1.kafka.topic = calllog

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

|

5)启动flume

|

$ /opt/module/flume/bin/flume-ng agent --conf /opt/module/flume/conf/ --name a1 --conf-file /home/atguigu/calllog/flume2kafka.conf

|

6)运行生产日志的任务脚本,观察kafka控制台消费者是否成功显示产生的数据

|

$ sh /home/atguigu/calllog/productlog.sh

|

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

上一篇 下一篇

猜你喜欢

热点阅读