实时数据相关

flink 自定义source,解析__consumer_off

2019-01-14  本文已影响0人  岳过山丘

0.main函数

env.addSource(new KafkaLagSource(parameterTool))//添加source
               .assignTimestampsAndWatermarks(new MetricEventLagWatermarkExtractor())/添加TimestampsAndWatermarks
.keyBy(x -> x.getTags().get("group") + "," + x.getTags().get("topic"))
//去重
               .timeWindow(Time.seconds(2 * 60), Time.seconds(2 * 60)).reduce(new ReduceFunction<MetricEvent>() {
           @Override
           public MetricEvent reduce(MetricEvent value1, MetricEvent value2) throws Exception {
               return value1.getTimestamp() > value2.getTimestamp() ? value1 : value2;
           }
       }) .flatMap(new ComputeLagFunction())
              .print();
  1. 自定义flink source,读取kafka __consumer_offsets,解析

import com.google.common.collect.Lists;
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.group.BaseKey;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.group.OffsetKey;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.weakref.jmx.internal.guava.collect.Maps;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
//MetricEvent自定义类型,name,timestamp,tag,fields

public class KafkaLagSource extends RichSourceFunction<MetricEvent>  {
    public static final String KAFKA_LAG_NAME = "kafka_lag_name";
    private transient Consumer<byte[], byte[]> consumer;
    ParameterTool parameterTool;

    public KafkaLagSource(ParameterTool parameterTool) {
        this.parameterTool = parameterTool;
    }

//一直从kafka poll 数据,解析后ctx.collect发到下个算子,每条数据是group在topic的某个partition消费的offset
    @Override
    public void run(SourceContext<MetricEvent> ctx) throws Exception {
        while (true) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
            Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
            while (iterator.hasNext()) {
                ConsumerRecord<byte[], byte[]> record = iterator.next();
                if (record.key() == null) {
                    continue;
                }
                BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
                byte[] value = record.value();
                if (value == null) {
                    continue;
                }
                if (baseKey instanceof OffsetKey) {
                    OffsetKey newKey = (OffsetKey) baseKey;
                    MetricEvent event = buildEvent(newKey, value);
                    ctx.collect(event);
                }
            }
        }
    }

    private MetricEvent buildEvent(OffsetKey newKey, byte[] value) {
        OffsetAndMetadata offsetMeta = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
        TopicPartition tp = newKey.key().topicPartition();
        String group = newKey.key().group();

        MetricEvent event = new MetricEvent();
        event.setName(KAFKA_LAG_NAME);
        event.setTimestamp(offsetMeta.commitTimestamp());
        HashMap<String, String> tags = Maps.newHashMap();
        tags.put(KafkaLagConstants.GROUP, group);
        tags.put(KafkaLagConstants.TOPIC, tp.topic());
        tags.put(KafkaLagConstants.PARTITION, tp.partition() + "");
        event.setTags(tags);
        HashMap<String, Object> fields = Maps.newHashMap();
        fields.put(KafkaLagConstants.OFFSET, offsetMeta.offset());
        event.setFields(fields);
        return event;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        consumer = createKafkaConsumer();
        consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
    }

    @Override
    public void cancel() {
    }

    Consumer<byte[], byte[]> createKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", parameterTool.get("bootstrap.servers"));
        props.put("group.id", parameterTool.get("group.id"));
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        return new KafkaConsumer<byte[], byte[]>(props);
    }
}

2.计算lag


import com.google.common.base.Throwables;
import io.terminus.spot.analyzer.alerting.utils.KafkaLagConstants;
import io.terminus.spot.analyzer.base.models.MetricEvent;
import io.terminus.spot.analyzer.base.utils.GsonUtil;
import io.terminus.spot.analyzer.base.utils.HttpUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class ComputeLagFunction extends RichFlatMapFunction<MetricEvent, MetricEvent> {
 private String kafkaManangerIp;
 private int lagLimit;

 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     ParameterTool tools = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
     kafkaManangerIp = tools.get("kafka.manager.ip");
     lagLimit = tools.getInt("kafka.lag.limit", 100000);
     log.info("kafkaManangerIp {}", kafkaManangerIp);

 }



 @Override
 public void flatMap(MetricEvent value, Collector<MetricEvent> out) throws Exception {
     Map<String, String> tags = value.getTags();
     String topic = tags.get(KafkaLagConstants.TOPIC);
     String group = tags.get(KafkaLagConstants.GROUP);
     try {
         String url = "http://" + kafkaManangerIp + ":9000/clusters/spot/consumers/" + group + "/topic/" + topic + "/type/KF";
         String httpResult = (HttpUtils.doGet(url));
         Document doc = Jsoup.parse(httpResult);
         String offsetString = (doc.body().selectFirst("table.table").selectFirst("tr").select("td").eachText().get(1));
         Long offset = Long.valueOf(offsetString);
         value.getFields().put(KafkaLagConstants.LAG, offset > 0 ? offset : 0L);
         value.getTags().put(KafkaLagConstants.PATH, url);
         if (offset > lagLimit) { //lag过大 发送到下游
             out.collect(value);
         }
     } catch (Exception e) {
         log.error(Throwables.getStackTraceAsString(e));
     }
 }
}

拿到topic,group拼url请求,解析,拿到totallag。


image.png

3.pom

  <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
            <version>2.4.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.11.3</version>
        </dependency>
上一篇 下一篇

猜你喜欢

热点阅读