Hadoop

103.Flume对Kafka数据采集并写入HBase

2022-02-22  本文已影响0人  大勇任卷舒

103.1 演示环境介绍

103.2 操作演示


1.HBaseSink开发示例
<!-- HBase Sink 依赖包 -->
<dependency>
    <groupId>org.apache.flume.flume-ng-sinks</groupId>
    <artifactId>flume-ng-hbase-sink</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<!-- HBase Client 依赖包 -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.12.1</version>
</dependency>
/**
 * package: com.cloudera.hbase
 * describe: 自定义HBaseSink,实现了自定义Rowkey及解析JSON字符串
 * creat_user: Fayson
 * 公众号:碧茂大数据
 */
public class FaysonHBaseSink extends AbstractSink implements Configurable {
    private String tableName;
    private byte[] columnFamily;
    //增加自定义Rowkey字段,可以用多个列组合,以","分割
    ...
    private static final Logger logger = LoggerFactory.getLogger(com.cloudera.hbase.FaysonHBaseSink.class);
    private FaysonHBaseEventSerializer serializer;
    @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();
        List<Row> actions = new LinkedList<Row>();
        List<Increment> incs = new LinkedList<Increment>();
        try {
            txn.begin();
            if (serializer instanceof BatchAware) {
                ((BatchAware)serializer).onBatchStart();
            }
            long i = 0;
            for (; i < batchSize; i++) {
                Event event = channel.take();
                if (event == null) {
                    if (i == 0) {
                        status = Status.BACKOFF;
                        sinkCounter.incrementBatchEmptyCount();
                    } else {
                        sinkCounter.incrementBatchUnderflowCount();
                    }
                    break;
                } else {
                    if(rowKeys != null && rowKeys.length() > 0) {
                        serializer.initialize(event, columnFamily, rowKeys);
                    } else {
                        serializer.initialize(event, columnFamily);
                    }
                    actions.addAll(serializer.getActions());
                    incs.addAll(serializer.getIncrements());
                }
            }
            if (i == batchSize) {
                sinkCounter.incrementBatchCompleteCount();
            }
            sinkCounter.addToEventDrainAttemptCount(i);
            putEventsAndCommit(actions, incs, txn);
        } catch (Throwable e) {
            try{
                txn.rollback();
            } catch (Exception e2) {
                logger.error("Exception in rollback. Rollback might not have been " +
                        "successful." , e2);
            }
            logger.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            if(e instanceof Error || e instanceof RuntimeException){
                logger.error("Failed to commit transaction." +
                        "Transaction rolled back.", e);
                Throwables.propagate(e);
            } else {
                logger.error("Failed to commit transaction." +
                        "Transaction rolled back.", e);
                throw new EventDeliveryException("Failed to commit transaction." +
                        "Transaction rolled back.", e);
            }
        } finally {
            txn.close();
        }
        return status;
    }
}
package com.cloudera.hbase;
import org.apache.flume.Event;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
/**
 * package: com.cloudera.hbase
 * describe: 继承HBaseSink的HbaseEventSerializer接口类,增加initialize(Event var1, byte[] var2, String var3)
 * 用于处理指定rowkey
 * creat_user: Fayson
 * 公众号:碧茂大数据
 */
public interface FaysonHBaseEventSerializer extends HbaseEventSerializer {
    void initialize(Event var1, byte[] var2, String var3);
}
package com.cloudera.hbase;
/**
 * package: com.cloudera.hbase
 * describe: 自定义HBaseSink常量类
 * 公众号:碧茂大数据
 */
public class FaysonHBaseSinkConstants {
    public static final String CONFIG_ROWKEYS = "rowkeys";
}
public class JsonHBaseEventSerializer implements FaysonHBaseEventSerializer {
    private static final Logger logger = LoggerFactory.getLogger(com.cloudera.hbase.JsonHBaseEventSerializer.class);
    private LinkedHashSet<String> rowkeySet;
    @Override
    public void initialize(Event event, byte[] columnFamily, String rowkeys) {
        this.headers = event.getHeaders();
        this.payload = event.getBody();
        this.cf = columnFamily;
        rowkeySet = new LinkedHashSet<>();
        logger.info("rowkeys:" + rowkeys);
        for(String rowkey : rowkeys.split(",")) {
            rowkeySet.add(rowkey);
        }
    }
    @Override
    public List<Row> getActions() throws FlumeException {
        List<Row> actions = Lists.newArrayList();
        //将JSON消息转换为Map对象
        Map<String, String> resultMap = JsonStr2Map.jsonStr2Map(new String(payload, charset));
        try {
            byte[] rowKey;
            if(!rowkeySet.isEmpty()) {  //如果rowkeySet集合不为空则使用自定义的rowkey
                StringBuffer rowkeyBuff = new StringBuffer();
                for(String rowkey : rowkeySet) {
                    rowkeyBuff.append(resultMap.get(rowkey) + "-");
                }
                rowKey = rowkeyBuff.substring(0, rowkeyBuff.length()-1).getBytes();
                //移除Map中不需要存入Column的列
                for(String rowkey : rowkeySet) {
                    resultMap.remove(rowkey);
                }
            } else {
                if (rowKeyIndex < 0) {
                    rowKey = getRowKey();
                } else {
                    rowKey = resultMap.get(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
                }
            }
            Put put = new Put(rowKey);
            for(Map.Entry<String, String> entry : resultMap.entrySet()) {
                put.add(cf, entry.getKey().getBytes(), entry.getValue().getBytes(Charsets.UTF_8));
            }
            if (depositHeaders) {
                for (Map.Entry<String, String> entry : headers.entrySet()) {
                    put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
                }
            }
            actions.add(put);
        } catch (Exception e) {
            throw new FlumeException("Could not get row key!", e);
        }
        return actions;
    }
}
mvn clean package
[root@cdh01 shell]# sh bk_cp.sh node.list /root/flume-sink-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/

2.Flume Agent的配置

kafka.sources  = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_kudu_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.channel = channel1
kafka.sinks.sink1.type = com.cloudera.hbase.FaysonHBaseSink
kafka.sinks.sink1.table = fayson_ods_deal_daily
kafka.sinks.sink1.columnFamily = info
kafka.sinks.sink1.rowkeys = id,mobile_phone_num
kafka.sinks.sink1.serializer = com.cloudera.hbase.JsonHBaseEventSerializer

3.验证

4.总结

大数据视频推荐:
腾讯课堂
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

上一篇 下一篇

猜你喜欢

热点阅读