Kafka组件部署文档

2021-03-15  本文已影响0人  明训

获取安装包

打开浏览器输入网址http://kafka.apache.org/downloads选择版本 kafka_2.10-0.10.1.0.tgz 下载即可。

[root@rtp-fk-51 ~]# wget https://archive.apache.org/dist/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz

kafka_2.10-0.10.1.0.tgz说明

Scala 版本为 2.10

kafka的版本为0.10.1.0

kafka_A-B.tgz即A的版本是Scala的版本,B指的是kafka的版本

解压安装包

[root@rtp-fk-51 ~]# cd /iflytek/server/
[root@rtp-fk-51 server]# ls
kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# tar -zxvf kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# ls
kafka_2.10-0.10.1.0
kafka_2.10-0.10.1.0.tgz
[root@rtp-fk-51 server]# rm -rf kafka_2.10-0.10.1.0.tgz 
[root@hotax-139 server]# mv kafka_2.10-0.10.1.0 kafka

创建数据目录

[root@hotax-139 ~]# mkdir -p  /iflytek/data/kafka-logs

配置调整

[root@rtp-fk-51 config]# cd /iflytek/server/kafka/config
[root@rtp-fk-51 config]# ls
connect-console-sink.properties    connect-file-source.properties  log4j.properties        zookeeper.properties
connect-console-source.properties  connect-log4j.properties        producer.properties
connect-distributed.properties     connect-standalone.properties   server.properties
connect-file-sink.properties       consumer.properties             tools-log4j.properties
[root@rtp-fk-51 config]# vim server.properties

编辑配置项

delete.topic.enable=true
listeners=PLAINTEXT://172.31.234.139:9092
log.dirs=/iflytek/data/kafka-logs
num.partitions=4
zookeeper.connect=172.31.234.139:2181

注:如需要把kafka所有节点放在一个节点下的话就配置zookeeper.connect=IP:端口/kafka

zookeeper.connect=172.31.234.139:2181/kafka

开启JMX

修改kafka-server-start.sh 添加一行即可

[root@rtp-fk-51 bin]# cd /iflytek/server/kafka_2.10-0.10.1.0/bin/
[root@rtp-fk-51 bin]# ls
connect-distributed.sh            kafka-preferred-replica-election.sh  kafka-topics.sh
connect-standalone.sh             kafka-producer-perf-test.sh          kafka-verifiable-consumer.sh
kafka-acls.sh                     kafka-reassign-partitions.sh         kafka-verifiable-producer.sh
kafka-configs.sh                  kafka-replay-log-producer.sh         windows
kafka-console-consumer.sh         kafka-replica-verification.sh        zookeeper-security-migration.sh
kafka-console-producer.sh         kafka-run-class.sh                   zookeeper-server-start.sh
kafka-consumer-groups.sh          kafka-server-start.sh                zookeeper-server-stop.sh
kafka-consumer-offset-checker.sh  kafka-server-stop.sh                 zookeeper-shell.sh
kafka-consumer-perf-test.sh       kafka-simple-consumer-shell.sh
kafka-mirror-maker.sh             kafka-streams-application-reset.sh
[root@rtp-fk-51 bin]# vim kafka-server-start.sh 

原始配置

 28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 29     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
 30 fi

调整配置

 28 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 29     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
 30     export JMX_PORT="8999"
 31 fi

适当加大kakfka的堆内存-Xmx4G -Xms4G

启动kakfa

前台启动kafka

[root@hotax-139 ~]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ls
bin  config  libs  LICENSE  NOTICE  site-docs
[root@hotax-139 kafka]# ./bin/kafka-server-start.sh config/server.properties

后台启动kafka

[root@hotax-139 ~]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@hotax-139 kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties 
[root@hotax-139 kafka]#

验证kafka是否启动

[root@hotax-139 kafka]# jps -l |grep kafka
27715 kafka.Kafka
[root@hotax-139 kafka]#

停止kafka

[root@hotax-139 kafka]# cd /iflytek/server/kafka/
[root@hotax-139 kafka]# ./bin/kafka-server-stop.sh 
[root@hotax-139 kafka]# jps -lm |grep kafka
[root@hotax-139 kafka]#  

集群说明

  1. 当集群部署时,需要指定配置文件server.properties中的broker.id的值为1或者依次递增的值。

问题解决

消息重复消费问题

解决方案

1.进到kafka配置文件目录

cd /iflytek/server/kafka/config 

2.编辑配置文件
在配置文件server.properties中追加配置项

offsets.retention.minutes=129600

1天=60241=1440
7天=60247=10080
30天=602430=43200
60天=602460=86400
90天=602490=129600

消息发送失败问题

问题描述

03-28 15:11:51.998 [ERROR] [pool-3-thread-3] skynet.boot.stream.kafka.MqProducer4KFK[MqProducer4KFK.java:146] {"tag":"[send result error]:org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.;","context":"VE1Y5LVKMRE7EN41"}
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
    at skynet.boot.stream.kafka.MqProducer4KFK.doSend(MqProducer4KFK.java:135)
    at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:132)
    at skynet.boot.mq.core.impl.BaseMqProducer.send(BaseMqProducer.java:73)
    at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:181)
    at com.iflytek.swwl.clustertask.action.GeneratorWorkerHandler.onProcess(GeneratorWorkerHandler.java:1)
    at skynet.boot.mq.MqSvcHandler.process(MqSvcHandler.java:113)
    at skynet.boot.mq.core.impl.MsgProcesserImpl.process(MsgProcesserImpl.java:130)
    at skynet.boot.mq.core.impl.BaseMqConsumer$1.onMessage(BaseMqConsumer.java:96)
    at skynet.boot.stream.kafka.MyConsumer.run(MqConsumer4KFK.java:155)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

解决方案

1.进到kafka配置文件目录

cd /iflytek/server/kafka/config 

2.编辑配置文件
在配置文件server.properties中追加配置项

message.max.bytes=10485760
replica.fetch.max.bytes=11534336

其中replica.fetch.max.bytes需要大于message.max.bytes

1M=102410241=1048576bytes
5M=102410245=5242880bytes
6M=102410246=6291456bytes
10M=1024102410=10485760bytes
10M=1024102411=11534336bytes

上一篇下一篇

猜你喜欢

热点阅读