03 hadoop 日志收集
2020-09-07 本文已影响0人
格林哈
0 目标
- 收集系统 docker 137个服务日志中错误日志, 根据服务名,跟日期显示
1 flume 收集日志到hdfs
- Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# taildir 监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们
agent1.sources.source1.type = TAILDIR
agent1.sources.source1.positionFile = /usr/local/servers/tails/taildir_position.json
agent1.sources.source1.filegroups = f1 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11 f12 f13 f14 f15 f16 f17 f18 f19 f20 f21 f22 f23 f24 f25 f26 f27 f28 f29 f30 f31 f32 f33 f34 f35 f36 f37 f38 f39 f40 f41 f42 f43 f44 f45 f46 f47 f48 f49 f50 f51 f52 f53 f54 f55 f56 f57 f58 f59 f60 f61 f62 f63 f64 f65 f66 f67 f68 f69 f70 f71 f72 f73 f74 f75 f76 f77 f78 f79 f80 f81 f82 f83 f84 f85 f86 f87 f88 f89 f90 f91 f92 f93 f94 f95 f96 f97 f98 f99 f100 f101 f102 f103 f104 f105 f106 f107 f108 f109 f110 f111 f112 f113 f114 f115 f116 f117 f118 f119 f120 f121 f122 f123 f124 f125 f126 f127 f128 f129 f130 f131 f132 f133 f134 f135 f136 f137 f138 f139 f140 f141 f142 f143 f144 f145 f146 f147 f148 f149 f150 f151 f152 f153 f154 f155 f156 f157 f158 f159 f160 f161 f162 f163 f164 f165 f166 f167 f168 f169 f170 f171 f172 f173
agent1.sources.source1.filegroups.f1 = /root/logs/faceservice/provider-face/[^.]+.log
agent1.sources.source1.filegroups.f2 = /root/logs/faceservice/portal-face/[^.]+.log
agent1.sources.source1.filegroups.f3 = /root/logs/dubbo/dev_in_1438/weepal-service-account/[^.]+.log
# 省略
agent1.sources.source1.filegroups.f172 = /root/logs/performattend/performattend_dev_in_1433/performattend/[^.]+.log
agent1.sources.source1.filegroups.f173 = /root/logs/cmixsearch/cmix_dev_out_62/cmixsearch/[^.]+.log
agent1.sources.source1.fileHeader=true
agent1.sources.source1.fileHeaderKey=filepath
#agent1.sources.source1.interceptors = i1
#agent1.sources.source1.interceptors.i1.type = host
#agent1.sources.source1.interceptors.i1.hostHeader = hostname
#配置sink组件为hdfs
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path=hdfs://192.168.1.22:9000/flume-collection/log94/%Y-%m-%d/input%{filepath}
#指定文件名前缀
agent1.sinks.sink1.hdfs.filePrefix = service_log
#指定每批下沉数据的记录条数
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#指定下沉文件按1G大小滚动
agent1.sinks.sink1.hdfs.rollSize = 1073741824
#指定下沉文件按1000000条数滚动
agent1.sinks.sink1.hdfs.rollCount = 1000000
#指定下沉文件按30分钟滚动
agent1.sinks.sink1.hdfs.rollInterval = 0
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
#使用memory类型channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
2 在azkaban 建立任务
2.1 新建任务 map,把flume 收集的日志,采集我们需要的,存入hdfs
- 命名 map.job
- log942-1.0-SNAPSHOT.jar com.weepal.preresult.LogProcess 就是我们写的收集程序
# map.job 内容
type=command
command=hadoop jar log942-1.0-SNAPSHOT.jar com.weepal.preresult.LogProcess
- com.weepal.preresult.LogProcess 内容 例如统计每日异常数量
public class LogProcess extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
String inputPath = "hdfs://node01:8020/flume-collection/log94/" + DateUtil.getYestDate() + "/input/root/logs";
String outputPath = "hdfs://node01:8020/flume-collection/log94/" + DateUtil.getYestDate() + "/output";
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), conf);
if (fileSystem.exists(new Path(outputPath))) {
fileSystem.delete(new Path(outputPath), true);
}
Job job = Job.getInstance(conf);
job.setJarByClass(LogProcess.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(LogProcess.LogProcessMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(LogProcess.LogProcessReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path[]{new Path(inputPath)});
FileInputFormat.setInputDirRecursive(job,true); // 设置可以递归读取目录
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new LogProcess(), args);
System.exit(run);
}
static class LogProcessMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String line = value.toString();
String serverName = fileSplit.getPath().getParent().getParent().getName() + "-" + fileSplit.getPath().getParent().getParent().getParent().getName();
if (line.indexOf("ERROR") != -1) {
this.k.set(serverName);
context.write(this.k, new LongWritable(1));
}
}
}
static class LogProcessReduce extends Reducer<Text,LongWritable,Text,NullWritable> {
NullWritable v = NullWritable.get();
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count =0;
//1:遍历values集合
for (LongWritable value : values) {
//2:将集合中的值相加
count += value.get();
}
k.set(key.toString() + "\u0001" + count + "\u0001" + DateUtil.getDate());
//3:将k3和v3写入上下文中
context.write(k, v);
}
}
}
2.2 新建任务 tomysql 把我们分析的数据,通过hive进行收集,然后把最终要显示的数据,导入到mysql
- 需要借助 sqoop 这个工具
- sqoop 是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具
- 命名 tomysql.job
# tomysql.job 内容
#! /bin/sh
source /etc/profile
echo 'sqoop start'
day_01=`date -d'-1 day' +%Y-%m-%d`
oneday_url="/flume-collection/log94/$day_01/output"
echo $oneday_url
sqoop export --connect jdbc:mysql://192.168.1.94:3306/share --username root --password 123456 --table log94 --input-fields-terminated-by '\001' --m 1 --export-dir $oneday_url
echo 'sqoop end'
2.3 把几个任务文件打包一起,在azkaban 新建定时任务,每天2点执行。
-
任务打包图片,
image.png
-
mysql log94表日志
image.png