Flume个人学习总结
2017-03-10 本文已影响332人
来往穿梭
WHAT
一个分布式、可靠、高可用的海量日志采集系统
- 可靠性:保证数据不丢失
- 可扩展性:各组件数目可扩展
- 高性能:吞吐率很高,能满足海量数据收集需求
- 可管理性:可动态增加和删除组件
整体架构
Flume的组件:
- Event:Flume数据传输的基本单元
- Client: 是一个将原始log包装成Event并发送它们到一个或多个Agent的实体
-
Agent:一个Agent包含source、channel、sink和其他组件,利用这些组件将events从一个节点传输到另一个节点或最终目的
- Source:负责接收event,或通过特殊机制产生event,并将其批量的放入一个或多个channel source必须至少和一个channel关联!
- Channel:位于source和sink之间,用于缓存event
- Sink:负责将event传输到下一跳或者最终目的,成功完成后将event从channel中删除
WHY
Hadoop提供了一个中央化的存储系统,有利于集中式的数据分析和数据共享。但数据是分散在各个离散的设备上,数据保存在传统的存储设备和系统中。对于这些非结构数据(日志)的收集,主要用到Flume。而结构化数据收集(传统数据库与Hadoop同步)可以通过Sqoop全量导入。而后续,可以通过DataBus或Camus增量导入。
HOW
- 说明:四个参数分别为source(avro)方式,0.0.0.0为本地模式,端口设置为41414,最终路径为 logAgent.sinks.hdfsSink.hdfs.path = hdfs://192.168.99.130:9000/flume/record3/%Y-%m-%d/%H%M 的临时文件
- 去flume的bin目录下运行以下语句
[xc2046@master apache-flume-1.6.0-bin]$ bin/flume-ng agent --conf /home/xc2046/apache-flume-1.6.0-bin/conf --conf-file /home/xc2046/apache-flume-1.6.0-bin/conf/avro.properties --name logAgent -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
- 通过一段java代码,链接设置的Flume,向HDFS上的文件写入数据
package com.example.flume;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class FlumeExample {
public static void main(String[] args){
MyRpcClientFacade clientFacade = new MyRpcClientFacade();
clientFacade.init("192.168.99.130", 41414);
//Send 10 event to the remote Flume agent.That agent should be configured to listen with AvroSource
String msg = "Hello Flume!";
for(int i=0;i<10;i++){
clientFacade.sendDataToFlume(msg);
System.out.println("sended");
}
clientFacade.cleanUp();
}
}
class MyRpcClientFacade{
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname,int port){
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname,port);
//this.client = RpcClientFactory.getThriftInstance(hostname,port);
}
public void sendDataToFlume(String data){
//create a Flume Event Object that encapsulates the data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
//send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname,port);
// Thrift Client
//client = RpcClientFactory.getThriftInstance(hostname,port);
e.printStackTrace();
}
}
public void cleanUp(){
//Close the RPC connection
client.close();
}
}
- 去HDFS上cat刚刚Flume生成的临时文件