Kafka

Hadoop生态架构之kafka

2020-01-11  本文已影响0人  勇于自信

1.kafka基本理论知识

1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Producer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-copy
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉

2.实践

1.kafka基本操作命令

前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、producer发送数据:
]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定设置不同
分别在slave1和slave2上开启进程:
./bin/kafka-server-start.sh config/server.properties

创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

2.自主写producer、consumer

1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafka.py

执行一次producer:
]# python producer_kafka.py

2、指定partition发送数据
]# python producer_kafka_2.py

3、指定partition读出数据
]# python consumer_kafka_2.py
consumer_kafka.py:

from kafka import KafkaConsumer
def main():
    consumer = KafkaConsumer(b"newyear_many_test", group_id=b"my_group_id",
                             bootstrap_servers=["master:9092", "slave1:9092", "slave2:9092"],
            auto_offset_reset='earliest')
            #auto_offset_reset='latest')
    for message in consumer:
        # This will wait and print messages as they become available
        print(message)


if __name__ == "__main__":
    main()

producer_kafka.py:

import time

from kafka import SimpleProducer, KafkaClient
from kafka.common import LeaderNotAvailableError

def print_response(response=None):
    if response:
        print('Error: {0}'.format(response[0].error))
        print('Offset: {0}'.format(response[0].offset))


def main():
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)

    topic = b'newyear_many_test'
    msg = b'Hello World from Badou!'

    try:
        print_response(producer.send_messages(topic, msg))
    except LeaderNotAvailableError:
        time.sleep(1)
        print_response(producer.send_messages(topic, msg))

    kafka.close()

if __name__ == "__main__":
    main()

consumer_kafka_2.py:

#encoding=utf8
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartition

def main():
    consumer = KafkaConsumer('newyear_many_test', bootstrap_servers=['master:9092'])

    print consumer.partitions_for_topic("newyear_many_test")
    print consumer.topics()  #获取主题列表
    print consumer.subscription()  #获取当前消费者订阅的主题
    print consumer.assignment()  #获取当前消费者topic、分区信息
    print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量

    consumer.seek(TopicPartition(topic=u'newyear_many_test', partition=2), 5)  #重置偏移量
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
            message.offset, message.key,
            message.value))


if __name__ == "__main__":
    main()

producer_kafka_2.py:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

# # block until all pending messages are sent
# for _ in range(10):
#     producer.send('test_m_brokers', b'are you ok!!!')
#
# producer.flush()


# key for hashed partitioning
producer.send('newyear_many_test', key=b'1', value=b'aaa')
producer.flush()
3.kafka打通flume

1.新建./conf/kafka_test/flume_kafka.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/dong/flume_test/1.log

#a1.sources.r1.type = http
#a1.sources.r1.host = master
#a1.sources.r1.port = 52020

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i1.headerName = key
a1.sources.r1.interceptors.i1.preserveExisting = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList  = master:9092
#a1.sinks.k1.topic = badou_flume_kafka_test
#a1.sinks.k1.topic = badou_storm_kafka_test
a1.sinks.k1.topic = topic_1013

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动成功,如下图:


3.测试:
1.1flume监控产生的数据:
]# for i in seq 1 100; do echo '====> '$i >> 1.log ; done
1.2kafka消费数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:


至此,消费成功!
补充,另外一种消费方式可以使用下面命令来产生测试数据:
]# curl -X POST -d '[{"headers":{"flume":"flume is very easy!"}, "body":"111"}]' http://master:52020
上一篇下一篇

猜你喜欢

热点阅读