2022-01-19

2022-01-19  本文已影响0人  MIN_Frank

```/**

* kafka produce 单例模式只初始化一个生产者

*/publicclassKafkaProducer{    privatestaticKafkaTemplate kafkaTemplate =newKafkaTemplate<>(producerFactory());;    publicstaticvoidsend(Stringtopic,Stringkey,Stringdata){        ListenableFuture> future = kafkaTemplate.send(topic, key, data);        future.addCallback(newCallBackSuccess(),newFailCallBack(topic, key, data));    }    publicstaticvoidsend(Stringtopic,Stringdata){        ListenableFuture> future = kafkaTemplate.send(topic, data);        future.addCallback(newCallBackSuccess(),newFailCallBack(topic,"",data));    }    privatestaticvoidsend(Stringtopic, Integer parti, Long time,Objectkey,Stringvalue){        kafkaTemplate.send(topic,parti,time,key,value);    }/**

    * 

    * Description:获取配置

    * Date:        2017年7月11日

    * @author      shaqf

    */privatestaticMap producerConfigs() {Map props = Maps.newHashMap();Stringlist = Properties.appProps.getValue("kafka.broker");        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, list);        props.put(ProducerConfig.RETRIES_CONFIG,0);          props.put(ProducerConfig.BATCH_SIZE_CONFIG,4096);          props.put(ProducerConfig.LINGER_MS_CONFIG,1);          props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,40960);          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);returnprops;      }/** 获取工厂 */privatestaticProducerFactory producerFactory() {returnnewDefaultKafkaProducerFactory<>(producerConfigs());      }/**

    * 发送消息后的成功回调

    */staticclassCallBackSuccessimplementsSuccessCallback{        @Override        publicvoidonSuccess(Objecto) {            System.out.println("成功");        }    }/**

    * 发送消息后的失败回调

    */staticclassFailCallBackimplementsFailureCallback{Stringtopic;Stringkey;Stringdata;      FailCallBack(Stringtopic,Stringkey,Stringdata){this.data = data;this.key = key;this.topic = topic;      }        @Override        publicvoidonFailure(Throwable throwable) {            System.out.println("失败 topid:"+topic+",key:"+key+",data:"+data);            throwable.printStackTrace();        }    }    publicstaticvoidmain(String[] args) throws Exception{        KafkaTemplate hh = kafkaTemplate;        System.out.print(hh);for(int i=0; i<500;i++){            ListenableFuture> r =  hh.send("yyy7","key2",""+i);            r.addCallback(newCallBackSuccess(),newFailCallBack("","",""));            hh.flush();            Thread.sleep(1000);        }    }}消费者

//通过注解监听topic进行消费 @Configuration

@EnableKafka public class KafkaConsumer {

finalstaticStringlist = Properties.appProps.getValue("kafka.broker");/**

* Description:获取配置

* Date:        2017年7月11日

* @author      shaqf

*/privateMap consumerConfigs() {Map props = Maps.newHashMap();    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list);    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);      props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);      props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");    System.out.println("KafkaConsumer consumerConfigs "+ JsonUtil.object2Json(props));returnprops;  }/** 获取工厂 */private ConsumerFactory consumerFactory() {returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}/** 获取实例 */@Bean  public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {    ConcurrentKafkaListenerContainerFactory factory1 =newConcurrentKafkaListenerContainerFactory<>();    factory1.setConsumerFactory(consumerFactory());    factory1.setConcurrency(2);    factory1.getContainerProperties().setPollTimeout(3000);    System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JsonUtil.object2Json(factory1));returnfactory1;}/**

* topic的消费者组1监听

* @return

*/@Beanpublic Group1Listener listener1() {returnnewGroup1Listener();}/**

* topic的消费者组2监听

* @return

*/@Beanpublic Group2Listener listener2() {returnnewGroup2Listener();}

}

消费者Group 1

消费者组1publicclassGroup1Listener{    @KafkaListener(topics = {"test-topic"})publicvoidlisten(ConsumerRecord<?, ?> record){        Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("listen1 "+ message);        }    }    @KafkaListener(topics = {"task1"},groupId ="group1")publicvoidtask1(ConsumerRecord<?, ?> record){        System.out.println("这是"+" task1 的消费者");        System.out.println("这是group1 topic task1 KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record));        Object message = record.value();        System.out.println("group1 topic task1 "+record.topic());        System.out.println(message);        System.out.println(record.key());        System.out.println(record);    }    @KafkaListener(topics = {"gift"},groupId ="group1")publicvoidgift(ConsumerRecord<String, String> record){        String key = record.key();        Stringvalue= record.value();        System.out.println("groupId1 kafka gift Consumer value:"+value);    }}消费者组2

public class Group2Listener {

@KafkaListener(topics = {"taskCmd"})publicvoidtaskCmd(ConsumerRecord<?, ?> record){    System.out.println("  KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record));    Object message = record.value();    System.out.println(" 这是group2 topic taskCmd "+record.topic());    System.out.println(message);    System.out.println(record.key());    System.out.println(record);}@KafkaListener(topics = {"task"})publicvoidtask(ConsumerRecord<?, ?> record){    System.out.println("这是group2 topic task KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record));    Object message = record.value();    System.out.println("这是group2 topic task "+record.topic());    System.out.println(message);    System.out.println(record.key());    System.out.println(record);}@KafkaListener(topics = {"task1"},groupId ="group2")publicvoidtask1(ConsumerRecord<?, ?> record){    System.out.println("这是group2"+" task1 的消费者");    System.out.println("这是group2 topic task1 KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record));    Object message = record.value();    System.out.println("group2 topic task1 "+record.topic());    System.out.println(message);    System.out.println(record.key());    System.out.println(record);}@KafkaListener(topics = {"gift"},groupId ="group2")publicvoidgift(ConsumerRecord<String, String> record){    String key = record.key();    Stringvalue= record.value();    System.out.println("groupId2 kafka gift Consumer value:"+value);}

}

上一篇 下一篇

猜你喜欢

热点阅读