Apache Kafka@IT·大数据大数据

Kafka运维填坑

2017-10-09  本文已影响2438人  扫帚的影子

Replica无法从leader同步消息
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread)
[2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to  (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297.
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110)
        at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
        at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138)
Broker到zk集群的连接不时会断开重断
Broker重启耗时很久
不允许脏主选举导致Broker被强制关闭
  if (leaderEndOffset < replica.logEndOffset.messageOffset) {
      // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
      // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
      // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
        ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
        // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
        fatal("...")
        Runtime.getRuntime.halt(1)
      }

调用Runtime.getRuntime.halt(1)直接暴力退出了.
可参考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"

Replica从错误的Partition leader上去同步数据
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn't been created. (kafka.server.ReplicaManager)

ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
__consumer_offsets日志无法被清除
[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
        at scala.Predef$.require(Predef.scala:219)
        at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
        at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
        at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
        at kafka.log.Cleaner.clean(LogCleaner.scala:322)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
     val segmentSize = segment.nextOffset() - segment.baseOffset
      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize,  log.name, segment.log.file.getName, maxDesiredMapSize))
      if (map.size + segmentSize <= maxDesiredMapSize)
        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
      else
        full = true
GC问题
zk和kafka部署
监控很重要
大量异常: Attempted to decrease connection count for address with no connections
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx
        at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
        at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
        at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
        at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.run(SocketServer.scala:445)
        at java.lang.Thread.run(Thread.java:745)
新版sdk访问较旧版的kafka, 发送kafka不支持的request
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
        at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
        at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
        at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.run(SocketServer.scala:421)
        at java.lang.Thread.run(Thread.java:745)
         try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
          }

在处理Request时并未处理这个异常,导致这个异常被其外层的try...catch...处理, 直接进入了下一轮的selector.poll(300), 而在这个selector.poll(300)中会清理之前所有的接收到的Requests, 这就导致在这种情况下,可能会漏处理一些Request, 这样看起来还是个比较严重的问题;

selector.completedReceives.asScala.foreach { receive =>
          var isClose = false

          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
            case e : ArrayIndexOutOfBoundsException =>
              error("NotSupport Request | Closing socket for " + receive.source + " because of error", e)
              isClose = true
              close(selector, receive.source)
          }
          if (!isClose) {
            selector.mute(receive.source)
          }
        }
  1. Kafka上也有相关的Broker does not disconnect client on unknown request, 这个修复内容比较多.

Kafka源码分析-汇总

上一篇 下一篇

猜你喜欢

热点阅读