一 Kafka介绍及常用命令行

2021-03-20  本文已影响0人  overflowedstack

kafka是什么

Kafka本质上是一个MQ,它是分布式的,基于发布/订阅模式,主要应用于大数据实时处理领域。

应用场景

异步处理(MQ的传统应用场景)

使用消息队列的好处:

  1. 解耦
    允许你独立的扩展或者修改两边的处理过程,只要确保它们遵守同样的接口约束。
  2. 可恢复性
    系统的一部分组件失效时,不会影响到整个系统。即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  3. 缓冲
    可以解决两边速度不一致的问题,更多的是处理生产大于消费的问题。
  4. 峰值处理能力
    在访问量剧增的情况下,使用消息队列能够使关键组件顶住突发的访问压力。

消息队列的两种模式

  1. 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
  2. 发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
    生产者发布消息
    消费者订阅消息,通过长轮询拉取消息,kafka采用这种形式 (还有另一种订阅方式,就是队列推送消息给消费者,但是这种方式容易造成消费者资源浪费或者处理跟不上)

基础架构

  1. 生产者

  2. Kafka集群 (多个broker)
    topic主题:消息的分类
    partition分区:提高某个topic的负载均衡,提高并发度
    分区的leader:leader和follower一定不在同一个broker上
    分区的follower
    replication副本

  3. 消费者
    消费者组:多个消费者可以放在一个消费者组里。同一个分区只能被同消费者组里的一个消费者消费。所以消费者个数跟topic的分区数个数相等时,消费速度最高。
    消费者组可以提高消费能力。

  4. Zookeeper
    Kafka集群正常工作依赖于zk。
    4.1. zk帮助kafka集群存储一些信息。
    要想把多台broker设置为一个kafka集群,只要让它们连同一个zk就可以了。
    4.2. 消费者也会存储一些数据在zk。
    消费的位移offset保存在zk,为了防止消费者挂掉,起来后继续消费。不过在新版本kafka(0.9版本及之后)中,这个信息由kafka集群自己存储了。
    为什么改成存到kafka呢?因为消费者频繁拉取kafka消息的同时,如果还需要频繁的和zk通信,会影响到消费者和zk的效率。

常用命令行操作

本机MacOS已安装好Kafka。

  1. 查看Kafka的配置文件,/usr/local/etc/kafka/, 这些配置主要是给命令行用的:
kafka $ ls -l
total 136
-rw-r--r--  1 a123  admin   906  2 19 16:12 connect-console-sink.properties
-rw-r--r--  1 a123  admin   909  2 19 16:12 connect-console-source.properties
-rw-r--r--  1 a123  admin  5321  2 19 16:12 connect-distributed.properties
-rw-r--r--  1 a123  admin   883  2 19 16:12 connect-file-sink.properties
-rw-r--r--  1 a123  admin   881  2 19 16:12 connect-file-source.properties
-rw-r--r--  1 a123  admin  1111  2 19 16:12 connect-log4j.properties
-rw-r--r--  1 a123  admin  2262  2 19 16:12 connect-standalone.properties
-rw-r--r--  1 a123  admin  1221  2 19 16:12 consumer.properties
-rw-r--r--  1 a123  admin  4727  2 19 16:12 log4j.properties
-rw-r--r--  1 a123  admin  1925  2 19 16:12 producer.properties
-rw-r--r--  1 a123  admin  6865  2 19 16:12 server.properties
-rw-r--r--  1 a123  admin  1032  2 19 16:12 tools-log4j.properties
-rw-r--r--  1 a123  admin  1169  2 19 16:12 trogdor.conf
-rw-r--r--  1 a123  admin  1037  2 19 16:12 zookeeper.properties

这里的server.properties里面配置了server的一些属性,目前主要看下面几个:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# A comma separated list of directories under which to store log files
# 这个目录存的是kafka的暂存数据,并不是log日志
log.dirs=/usr/local/var/lib/kafka-logs

# The minimum age of a log file to be eligible for deletion due to age
# kafka数据的暂存时间
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#kafka数据文件的大小
log.segment.bytes=1073741824

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  1. 启动Kafka集群
#启动kafka server
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
  1. 创建/查看/删除topic
    3.1 创建主题
#创建主题
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

#副本数不能大于broker的数目
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic article
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2021-03-20 13:57:51,129] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)

3.2 查看主题

#查看所有主题
kafka-topics --describe --zookeeper localhost:2181

#查看某个主题 -topic
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3.3 删除主题

#删除主题
$ kafka-topics --delete --zookeeper localhost:2181 -topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
#由于kafka中delete.topic.enable 默认为false,所以这个topic并没有真的被删除,只是在zk上被标成删除。
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:    MarkedForDeletion:true
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

[zk: localhost:2181(CONNECTED) 14] ls /admin/delete_topics
[test]


#更改server.properties,设置delete.topic.enable=true,重启kafka server.
#从kafka server启动日志里可以看到,启动时在处理删除test topic的相关工作
[2021-03-20 12:49:20,174] INFO [GroupCoordinator 0]: Removed 0 offsets associated with deleted partitions: test-0. (kafka.coordinator.group.GroupCoordinator)
......
[2021-03-20 12:49:20,180] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaFetcherManager)
[2021-03-20 12:49:20,180] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaAlterLogDirsManager)
......
[2021-03-20 12:49:20,247] INFO Log for partition test-0 is renamed to /usr/local/var/lib/kafka-logs/test-0.a70d1b38100b44d385f17d23b3b673d0-delete and is scheduled for deletion (kafka.log.LogManager)

#再次用命令行,及在zk上 查看test topic,已经看不到了
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
$
[zk: localhost:2181(CONNECTED) 3] ls /admin/delete_topics
[]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics 
[__consumer_offsets]
  1. 启动生产者,消费者,互相通信
#启动生产者
$ kafka-console-producer --broker-list localhost:9092 --topic test
>hello world

#启动消费者
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
hello world
  1. 消费者组
# 查看所有消费者组
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-33462

#查看某个消费者组的详细信息
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group console-consumer-33462

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            0          -               3               -               consumer-1-b79c3b8d-8efe-4e0f-a95a-e7482007295a /127.0.0.1      consumer-1

上一篇 下一篇

猜你喜欢

热点阅读