kafka streams 总结

2019-03-01  本文已影响0人  把爱放下会走更远

阅读了kafka官网说明,总结了一下,和demo的个性实现;用幕布软件写的,直接贴过来了

下面是自己对kafka streams的一些应用和封装,用其做一些时间窗口内的数据统计

入口类

public class OtherApplication {
    static KafkaStreams streams;
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "other-application2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdes.eventSerde().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Event> kStream = builder.stream("inputTopic",
                Consumed.
                        with(Serdes.String(), MySerdes.eventSerde()).
                        withTimestampExtractor(
                                new MyEventTimeExtractor()
                        )
        );
        CountProcessor countProcessor =
                new CountProcessor(new BasicMetrics(new BasicMetrics.WindowFragment(60 * 1000L, 0, 1), "money", "account", "countStore"));
        countProcessor.streamIn(kStream);


        streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

次数统计类

public class CountProcessor extends StreamInWindowProcessor<Long> {

    public CountProcessor(BasicMetrics basicMetrics) {
        super(basicMetrics);
    }

    @Override
    public void streamIn(TimeWindowedKStream<String, Event> kStream) {
        kStream
                .count(
                        Materialized
                                .as(
                                        Stores.persistentWindowStore(
                                                basicMetrics.storeName, basicMetrics.windowFrag.windowSizeMs * 100, 2, basicMetrics.windowFrag.windowSizeMs, false
                                        )
                                )
                )
                .toStream()
                .process(() -> this, basicMetrics.storeName);
    }

    @Override
    public void process(WindowStoreIterator<Long> iterator) {
        long total = 0L;
        while (iterator.hasNext()) {
            KeyValue<Long, Long> next = iterator.next();
            total += next.value;
        }
//todo total
    }
}

窗口统计父类

public abstract class StreamInWindowProcessor<T> implements Processor<Windowed<String>, T> {
    ReadOnlyWindowStore<String, T> store;
    public BasicMetrics basicMetrics;
    public Object key;
    private static Map<Object, Object> resultMap = new ConcurrentHashMap<>();

    public static Object getData(Object key) {
        return resultMap.get(key);
    }

    public static void setData(Object key, Object data) {
        resultMap.put(key, data);
    }

    public StreamInWindowProcessor(BasicMetrics basicMetrics) {
        this.basicMetrics = basicMetrics;
    }

    public void streamIn(KStream<String, Event> stream) {
        try {
            TimeWindowedKStream<String, Event> kStream = stream
                    .groupBy((key, value) -> basicMetrics.groupByField + "$$" + value.get(basicMetrics.groupByField))
                    .windowedBy(TimeWindows.of(basicMetrics.windowFrag.windowSizeMs));
            streamIn(kStream);
        } catch (Throwable t) {
            t.printStackTrace();
        }

    }

    public abstract void streamIn(TimeWindowedKStream<String, Event> kStream);


    @Override
    public void init(ProcessorContext context) {
        if (!StringUtils.isEmpty(basicMetrics.storeName)) {
            store = (ReadOnlyWindowStore<String, T>) context.getStateStore(basicMetrics.storeName);
        }
    }

    @Override
    public void process(Windowed<String> key, T value) {
        long currentStart = key.window().start();
        this.key = key.key();
        WindowStoreIterator<T> fetch = store.fetch(key.key(), basicMetrics.windowFrag.from(currentStart), basicMetrics.windowFrag.to(currentStart));
        process(fetch);
    }

    public abstract void process(WindowStoreIterator<T> iterator);

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

辅助类

public class BasicMetrics {
    public String storeName;

    public BasicMetrics(WindowFragment windowFrag, String secondField, String groupByField, String storeName) {
        this.windowFrag = windowFrag;
        this.secondField = secondField;
        this.groupByField = groupByField;
        this.storeName = storeName;
    }


    public WindowFragment windowFrag;
    public String groupByField;
    public String secondField;

    public static class WindowFragment {
        public WindowFragment(Long windowSizeMs, Integer preFrom, Integer preEnd) {
            this.windowSizeMs = windowSizeMs;
            this.preFrom = preFrom;
            this.preEnd = preEnd;
        }

        public Long from(long currentEventStart) {
            return currentEventStart - preEnd * windowSizeMs;
        }

        public Long to(long currentEventStart) {
            return currentEventStart - preFrom * windowSizeMs;
        }

        public Long windowSizeMs;
        public Integer preFrom = 0;//e.g:当前窗口
        public Integer preEnd = 1;//e.g:上一窗口,windowSizeMs为1分钟和preFrom:0和preEnd:1表示"最近两分钟"
    }
}

上一篇 下一篇

猜你喜欢

热点阅读