Flume个人学习总结

2017-03-10  本文已影响332人  来往穿梭

WHAT

一个分布式、可靠、高可用的海量日志采集系统

整体架构

Flume的组件:


WHY

Hadoop提供了一个中央化的存储系统,有利于集中式的数据分析和数据共享。但数据是分散在各个离散的设备上,数据保存在传统的存储设备和系统中。对于这些非结构数据(日志)的收集,主要用到Flume。而结构化数据收集(传统数据库与Hadoop同步)可以通过Sqoop全量导入。而后续,可以通过DataBus或Camus增量导入。

HOW

Paste_Image.png
[xc2046@master apache-flume-1.6.0-bin]$ bin/flume-ng agent --conf /home/xc2046/apache-flume-1.6.0-bin/conf --conf-file /home/xc2046/apache-flume-1.6.0-bin/conf/avro.properties --name logAgent -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
package com.example.flume;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class FlumeExample {

    public static void main(String[] args){
        MyRpcClientFacade clientFacade = new MyRpcClientFacade();
        clientFacade.init("192.168.99.130", 41414);
        //Send 10 event to the remote Flume agent.That agent should be configured to listen with AvroSource
        String msg = "Hello Flume!";
        for(int i=0;i<10;i++){
            clientFacade.sendDataToFlume(msg);
            System.out.println("sended");
        }
        clientFacade.cleanUp();
    }

}

class MyRpcClientFacade{

    private RpcClient client;

    private String hostname;

    private int port;

    public void init(String hostname,int port){
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname,port);
        //this.client = RpcClientFactory.getThriftInstance(hostname,port);
    }

    public void sendDataToFlume(String data){
        //create a Flume Event Object that encapsulates the data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        //send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname,port);
            // Thrift Client
            //client = RpcClientFactory.getThriftInstance(hostname,port);
            e.printStackTrace();
        }
    }

    public void cleanUp(){
        //Close the RPC connection
        client.close();
    }

}
Paste_Image.png
只是最基础的Flume一些介绍、操作,希望以后能有更多的理解。推荐一下心_的方向作者的Flume浅析
上一篇下一篇

猜你喜欢

热点阅读