Kafka 三种需求问题和方案

2018-11-22  本文已影响0人  枫叶_huazhe

本文将从三个需求点进行切入,并提供方案选择

关于消息拉取偏移量的重置重新消费

关于消息的重新消费问题之前,我们先看现有的 核心重试代码

while (isRunning) {
      try {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) {
                  for (ConsumerRecord<KEY, VALUE> record : records) {
                        try {
                            for (ENDPOINT bizConsumer : bizConsumers) {
                                //处理业务消息
                                dealMessage(bizConsumer, record.value(), record.key());
                            }
                        } catch (Exception e) {
                            logger.error(/*省略...*/);
                            //放入BlockingQueue 重试队列
                            retryMsgQueue.put(record);
                        }

                        try {
                            //record每消费一条就提交一次(性能会低点)
                            consumer.commitSync();
                        } catch (CommitFailedException e) {
                            break;
                        }
                    }
                }
        } catch (Exception e) {
                logger.error("[KafkaConsumer][{}][run] " + e.getMessage(), groupId + ":" + topic, e);
        }
}

重试有多种策略:

private void beginRetryMessage() {
        executor.execute(() -> {
            while (true) {
                try {
                    ConsumerRecord<KEY, VALUE> record = retryMsgQueue.take();
                    logger.error("[Retry]: 消息偏移量:[{}],进行重试 ", record.offset());
                    for (ENDPOINT endpoint : bizConsumers) {
                        /**
                         * 将每一条重试逻辑放入新的线程中
                         */
                        executor.execute(() -> retryStrategy.execute(() -> dealMessage(endpoint, record.value(), record.key())));
                    }
                    logger.info("retry result {} \r\n", record);
                } catch (InterruptedException e) {
                    logger.error("InterruptedException error", e);
                }
            }
        });
}

重试消费例子日志:

image.png

Seek 重新消费方案

kafka 提供的理论基础,订阅 topic 时,加入 ConsumerRebalanceListener 监听器,当分区分配好之后,可以将拉取偏移量设置到想要的为止。
我们可以通过这个方案,做到重新开始拉取之前已经消费过的消息的逻辑。

实现条件

public class MsgConsumerRebalanceListener implements ConsumerRebalanceListener {
    public MsgConsumerRebalanceListener(Consumer consumer) {
        this.consumer = consumer;
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.info("[RebalanceListener-Assigned]:reblance assigned 触发, partition重新分配");
        partitions.forEach(partition -> {
            //获取消费偏移量,实现原理是向协调者发送获取请求
            OffsetAndMetadata offset = consumer.committed(partition);
            logger.info("onPartitionsAssigned: partition:{}, offset:{}", partition, offset);
            if (offset == null) {
                logger.info("assigned offset is null ,do nothing for it !");
            } else {
                //设置本地拉取分量,下次拉取消息以这个偏移量为准
                consumer.seek(partition, offset.offset());
            }
        });
    }
}

2. 提供根据业务 bizTag 将相同消息 hash 后发入同一个分区

业务需求,例如根据同一个门店号 storeId,将消息发入同一个分区。
由于目前 dp-event-bus 生产者为保证 消息至少一次和事务保障,采用的消息发送逻辑如下:

step1

biz 业务逻辑事务一起,业务在业务上下文中发送事件,事件会存入到 mysql 事件表中,和业务上下文一起提交或者回滚。做到事务消息前提。
注意:此时消息内容被编码了,以 binary 的形式存入数据库。

id eventType 内容
事件唯一id 事件类型 事件具体内容(binary)

step2

定时器,每一个有生产者的服务都会有一个事件轮询定时器,默认 100ms 轮询一次数据库,可指定。轮询到事件表中的内容后将消息发送到 kafka 集群。

增加根据 bizTag hash

业务在 EventBus.fireEvent 时, 需要新增一个 bizTag 标示,此标志会一起存入到数据中,数据库中 dp_common_event 表需要加一个标示 key_hash 的字

id eventType hash_key 内容
事件唯一id 事件类型 bizTag 事件具体内容(binary)

定时器在发送消息时,可以获取到该 biztag
kafka 给我们提供了分区 Partitioner 策略,只需要我们实现这个接口即可,下面是一个例子。

public class SimplePartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
         */
        if (keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if (((String) key).equals("1")) {
            return 1;
        }
        //如果消息的key值不为1,那么使用hash值取模,确定分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

这样即可以满足需求

3. 将当前消息的 offsetpartition 推送给业务。

方法:用户可以定义如下监听方法。

@KafkaConsumer(groupId = "test", topic = "order_test")
@Transactional(rollbackFor = Array(classOf[Throwable]))
class OrderEventConsumer {
  
  @KafkaListener(serializer = classOf[StockEventSerializer])
  def processStockEventByOrder(event: StockEvent,offset:Long,partition:Long): Unit = {
    logger.info(s"your partition $partition, offset: $offset")
    logger.info(s"开始处理Order库存消息")
    new ProcessStockAction(event.stockList).execute
  }
}

我会新增一种策略,将当前消息的分区信息和 offset 信息赋值给这两个变量。

上一篇下一篇

猜你喜欢

热点阅读