kafka 优雅实践

2019-10-11  本文已影响0人  wyh1791

本文分别从生产端和消费端分别说明

1.生产端优化

生产端通过如下提高并发和可靠性

2.消费端优化(针对图片处理)

消费端通过如下方式进行优化


#kafka基础配置,不要变动

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.acks=1

spring.kafka.producer.retries=1000

spring.kafka.producer.compression-type=gzip

spring.kafka.producer.properties.linger.ms=1000

spring.kafka.producer.properties.batch.size=200000

spring.kafka.producer.properties.max.block.ms=600000

spring.kafka.producer.properties.buffer.memory=100554432

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.max-poll-records=1

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.properties.max.poll.interval.ms=1800000

spring.kafka.consumer.properties.rebalance.timeout.ms=300000

spring.kafka.consumer.properties.session.timeout.ms=300000

spring.kafka.consumer.properties.heartbeat.interval.ms=3000

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=PLAIN

spring.kafka.properties.request.timeout.ms=301000

spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

spring.kafka.listener.concurrency=5


#kafka账号

spring.kafka.bootstrap-servers=172.16.97.161:2093

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";

#kafka topic

spring.topics.imageTransformMessage.topic=image_transform_message_prod

spring.topics.imageTransformMessage.group=image_transform_message_prod

spring.topics.imageTransformResult.topic=image_transform_result_prod

spring.topics.imageTransformResult.group=image_transform_result_prod

3.代码实例

3.1发送端代码


@Slf4j

@Component

public class KafkaProducer {

    @Autowired

    private KafkaTemplate kafkaTemplate;

    @Value("${spring.topics.imageTransformResult.topic}")

    private String topic;

    /**

    * 相同的key发到同一个partition

    * <p>

    * 由于kafka是根据key的hash值取模去分的partition 导致肯能分布不均,所以此处随机去partition的值

    * @param key

    * @param data

    * @param <T>

    * @return

    */

    public <T> boolean sendMessage(String key, T data) {

        String jsonData = JSONObject.toJSONString(data);

        UUID uuid = UUID.randomUUID();

        String suuid = StringUtils.remove(uuid.toString(), "-");

        try {

            int partitionSize = kafkaTemplate.partitionsFor(topic).size();

            int randomPartition = (int) (System.currentTimeMillis() % partitionSize);

            Header header = new RecordHeader("UUID", suuid.getBytes());

            ProducerRecord producerRecord = new ProducerRecord(topic, randomPartition, key, jsonData, Arrays.asList(header));

            log.info("begin send key {}, data {}, uuid {}", key, data, suuid);

            kafkaTemplate.send(producerRecord);

            log.info("after send uuid {}", suuid);

            return true;

        } catch (Exception e) {

            log.error("sendMessage error, suuid {}, key {}, data {}", suuid, key, jsonData, e);

            String message = "商品同步kafka消息发送失败:" + suuid;

            return false;

        }

    }

}

3.2消费端代码


@Slf4j

@Component

public class KafkaConsumer {

    @Autowired

    private ImageBiz imageBiz;

    @Autowired

    private KafkaProducer kafkaProducer;

    @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")

    public void processMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

        Stopwatch stopwatch = Stopwatch.createStarted();

        try {

            String key = record.key();

            String data = record.value();

            log.info("kafka receive message, key {}, data {}", key, data);

            if (BaseUtil.isEmpty(data)) {

                acknowledgment.acknowledge();

                return;

            }

            acknowledgment.acknowledge();

        } catch (Exception e) {

            String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");

            log.error("消费消息异常,请线上查找原因: {}", suuid, e);

        }

    }

}

4.消费端场景

kafka消费端主要有两种场景

  1. 消息数量不多, 但是处理每一消息的时间比较长

  2. 消息数量很多, 处理每一个消息的时间很短

场景1

和上面介绍的图片处理类似, 每次拉去少量消息, 给消息处理留足够时间

场景2

场景2可以转化为场景1, 把原来的1000个消息组织为一个消息, 批量处理

如果发送方很分散, 并且只能一个个的发消息, 可以使用批量监听消息

配置修改


#一次拉取1000消息

spring.kafka.consumer.max-poll-records=1000

#批量消费模式

spring.kafka.listener.type=batch

消费端代码


@Slf4j

@Component

public class KafkaConsumer {

    @Autowired

    private ImageBiz imageBiz;

    @Autowired

    private KafkaProducer kafkaProducer;

    @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")

    public void processMessage(List<ConsumerRecord<String, String>> record, Acknowledgment acknowledgment) {

        Stopwatch stopwatch = Stopwatch.createStarted();

        try {

            acknowledgment.acknowledge();

        } catch (Exception e) {

            String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");

            log.error("消费消息异常,请线上查找原因: {}", suuid, e);

        }

    }

}

参考:

上一篇下一篇

猜你喜欢

热点阅读