idea编写kafka生产者

2023-05-10  本文已影响0人  压缩干粮

kafka的基础demo

import java.util.Properties;
import org.apache.kafka.clients.producer.*;

public class KafkaProducerExample {

   public static void main(String[] args) throws Exception{

      // 配置Kafka生产者
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");  // acks参数对消息持久化的影响?
      props.put("retries", 0); //重试次数
      props.put("batch.size", 16384); // 批量发送的数据大小
      props.put("linger.ms", 1);  // 发送延迟(默认是0,有消息就发)
      props.put("buffer.memory", 33554432); 
     //  kafka是把消息先放到本地内存中,很多很多个消息缓存成一个batch,
     // 再发送到Broker上去,这样性能才高
     //  buffer.memory 本质就是约束kafka Producer能够使用的内存缓冲的大下的
     //  默认是32M
   
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      // 创建Kafka生产者
      Producer<String, String> producer = new KafkaProducer<>(props);

      // 发送消息到Kafka
      String topic = "test-topic";
      String key = "key1";
      String value = "value1";
      producer.send(new ProducerRecord<>(topic, key, value));

      // 关闭Kafka生产者
      producer.close();
   }
}

acks参数对消息持久化的影响?

kafka 的 buffer.memory 与batch.size参数的区别

kakfa入门demo

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class xync {
    public static void main(String[] args) {
//        新建对象
        Properties properties = new Properties();
        properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put("acks","all");
        properties.put("retries",1);
        properties.put("batch.size",16384);
        properties.put("linger.ms",1);
        properties.put("buffer.memory",33554432);
        properties.put("enable.idempotence","true");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String,String> kafkaProducer = new KafkaProducer<>(properties);
//        操作集群
        for (int i = 0; i <100 ; i++) {
            kafkaProducer.send(new ProducerRecord("first",Integer.toString(i),"hello"+i),
                    new Callback(){
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if(recordMetadata != null){
                                System.out.println("发送成功了,发到了"+recordMetadata.topic()+"第"+
                                        recordMetadata.partition()+"分区第"+recordMetadata.offset()+"消息");
                            }
                        }
                    });
        }
//        关闭资源
        kafkaProducer.close();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读