kafka相关配置和JavaAPI

2021-12-10  本文已影响0人  傻疯子
配置

server.properties是配置文件

具体文档可以查看https://kafka.apache.org/24/documentation.html

数据清空策略
log.flush.interval.messages分区消息达到设置数后清空到磁盘上
log.flush.interval.ms间隔时间数据清空到磁盘上

数据保存策略,达到条件后就会删除
log.retention.hours数据保存时间
log.retention.bytes数据保存大小
log.retention.check.interval.ms数据保存条件检查间隔时间

生产者数据通讯策略
acks为1,得到leader收到的消息后,发送下一条
acks为all,得到所有节点收到的消息后,发送下一条
acks为0,不需要回复,直接发送
acks为all,可以保存数据不丢

同一个group_id消费者数量如果大于分区数,则多余消费者无法得到消息。少于则一个消费者会消费多个分区的数据。

数据Broker新增后,负载均衡开启和检查时间
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds
手动执行负载均衡命令
bin/kafka-leader-election.sh --bootstrap-server broker_ip:port --election-type preferred --all-topic-partitions

JavaAPI

生产者

      Properties prop = new Properties();
      //指定kafka的broker地址
      prop.put("bootstrap.servers", "broker_ip:port");
      //指定key-value数据的序列化格式
      prop.put("key.serializer", StringSerializer.class.getName());
      prop.put("value.serializer", StringSerializer.class.getName());

      //指定topic
      String topic = "kafka_topic"; 
      
      //创建kafka生产者
      KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);

      //向topic中生产数据
      producer.send(new ProducerRecord<String, String>(topic, "hello kafka"));

      //关闭链接
      producer.close();

消费者

      Properties prop = new Properties();
      //指定kafka的broker地址
      prop.put("bootstrap.servers", "broker_ip:port");
      //指定key-value的反序列化类型
      prop.put("key.deserializer", StringDeserializer.class.getName());
      prop.put("value.deserializer", StringDeserializer.class.getName());
      //指定消费者组
      prop.put("group.id", "test");
      //latest表示找不到offset或offset对应的数据就消费最新的,earliest消费最早的,none找不到之前offset抛出异常
      prop.put("auto.offset.reset","latest");

      //创建消费者
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
      Collection<String> topics = new ArrayList<String>();
      topics.add("kafka_topic");
      //订阅指定的topic
      consumer.subscribe(topics);

      while(true) {
         ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<String,String> consumerRecord : poll) {
            System.out.println(consumerRecord);
         }
      }
上一篇 下一篇

猜你喜欢

热点阅读