程序员

KAFKA架构

2019-11-21  本文已影响0人  93张先生
method.png
节点.png

kafka是一个分布式的流式数据的处理平台;

流式数据平台的关键能力
发布和订阅消息,像一个消息队列或者企业的消息系统;
以一个容错和持久化的方式存储流式数据;
实时的处理流式数据;
kafka应用两大场景
系统和系统间实时可靠的流式数据管道
转化和响应流式数据的实时数据应用
四个核心API
Producer API
Consumer API
Streams API
Connector API
客户端和服务端通过简单、高性能、语言无关的TCP协议完成;
Topics and Logs

topic 是分布式的保留默认7天消息记录; 分 parition、parition-1,parition-2,partition-3;每一个partition文件中包含offset.log、offset.index、offset.timeindex;offse.log存储的是原始数据,没有进行存储处理;

topic分布

日志分布在kafka集群中;每个服务器处理数据和请求共享的partitions;leader partition 处理read write 请求;followers被动的从leader复制;leader partition 失败,followers partition 自动变为新的leader;

地域复制

Kafka MirrorMaker 提供了集群地域复制功能;数据可以跨越多个数据中心和云存储;在一个active / passive 场景用于数据备份和恢复;

Producers

发送数据可以指定分区;轮询或根据record中的key进行函数划分partition;为了负载均衡。

Consumers
细节.png

消费者组;同一个组被负载均衡到每一个Consumer;不同组,record将被广播到每一个组中;

topic中的partition根据消费者组中的Consumer来进行消费;一个partition最多由一个消费者消费;消费者组成员消数据是根据kafka协议动态处理;新加入一个Consumer,会接管一部分其他成员的数据;一个Consumer挂了,partitions将会被分配到或者的consumers;

保证

同一个procducer,先发送的数据,先被存储;

consumer看到顺序是数据存储的顺序;

topic拥有 replication factor N,允许 N -1个server 失败;而不丢失已被提交的数据;

kafka作为一个消息系统

消费者组概念兼容了传统消息系统的排队和发布订阅模式;传统消费模式,通过排他性,一个消费者消费一个队列,保证了消息的顺序消费;并不能并行化处理数据;kafka利用了分区的概念,实现了并行处理,消费顺序保证、负载均衡;利用一个partition只能被一个确切的consumer消费;保证了消费顺序(一个消费者,消费一个partition,记录只能被顺序的由指定的consmer消费,是offset确定在不同的partition,从而由不同的consumer,实现了负载均衡);

kafka作为一个存储系统

数据被kafka写入硬盘,副本是为了冗余;produceer等待确认,一条消息只有被副本了和保证持久化了,才认为完成了,才会给producer确认回答;

kafka硬盘结构提供了持久化的数据是50KB或者 50T,kafka的性能是一样的;

kafka允许控制客户端控制读取位置,kafka可以被视为一个专用于高性能、低延迟的提交日志存储系统,副本和传播的文件系统。

kafka for stream processing

不止read write storage 数据,还可以处理实时数据;

a kafka stream processor 是一个实时处理一个input topics,在输入流上执行一些操作,然后持续的写到output topics;

比如,数据输入流有价格和商品数量,输出流有订单和这些计算价格的调整;

streams APIs提供给应用 计算聚合操作、流的汇总操作;

kafka streams

他是编写关键的实时应用和微服务的最简单方法

它是构建一个应用和微服务的客户端库;其中输入和输出存储在kafka集群中;它结合了编写和部署Java和Scala客户端应用的简单性 、以及kafka服务器端集群技术的优势;

弹性 高度扩展 容错
部署到容器 VM 裸机 云
实用小型 中型 大型应用 与kafka安全行完全集成
一次处理语义
无需单独的处理集群
public class WordCountDemo {

    public static void main(final String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> source = builder.stream("streams-plaintext-input");

        final KTable<String, Long> counts = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value)
                .count();

        // need to override value serde to Long type
        counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

}
stream.png
解说:

The two diagrams below illustrate what is essentially happening behind the scenes. The first column shows the evolution of the current state of the KTable<String, Long>that is counting word occurrences for count. The second column shows the change records that result from state updates to the KTable and that are being sent to the output Kafka topic streams-wordcount-output.
1.第一列显示了KTable<String,Long>,显示了字符串和计数,第二列,显示了更新的状态,并将要发送给topic streams-wordcount-output;

  1. streams-plaintext-input 第一次输入 all streams lead to kafka
    streams-wordcount-output 显示
    all 1
    streams 1
    lead 1
    to 1
    kafka 1
    3.streams-plaintext-input 第二次输入 hello kafka streams
    streams-wordcount-output 显示
    all 1
    streams 1
    lead 1
    to 1
    kafka 1

hello 1
kafka 2
streams 2
新增显示了变化的字符串
hello 1
kafka 2
streams 2
streams-wordcount-output 只输出有了变化的字符串


Looking beyond the scope of this concrete example, what Kafka Streams is doing here is to leverage the duality between a table and a changelog stream (here: table = the KTable, changelog stream = the downstream KStream): you can publish every change of the table to a stream, and if you consume the entire changelog stream from beginning to end, you can reconstruct the contents of the table.
kafka stream 主要利用了table和changlog stream 之间的差别; 你可以发送任何的ktable的变化到

上一篇 下一篇

猜你喜欢

热点阅读