Springboot收发kafka消息
2021-01-04 本文已影响0人
一只菜鸟的笔记
@Component
public class Consumer {
@Autowired
private KafkaTemplate<String, String> template;
@Bean
KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, String> consumerFactory) {
System.setProperty("java.security.auth.login.config", client_cf_file);//加入了安全认证
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
@KafkaListener(topicPattern = "${receive_topic}", containerFactory = "batchFactory")
public void receiveMessage(List<ConsumerRecord<String, String>> records) {
//接收消息
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
//发送消息
template.send("send_topic", record.value());
}
}
}
application.yml
spring:
kafka:
consumer:
bootstrap-servers: 100.1.0.1:9092
group-id: sce
max-poll-records: 1000
auto-offset-reset: earliest
auto-commit-interval: 1000
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN //加入了安全认证
security.protocol: SASL_PLAINTEXT //加入了安全认证
producer:
bootstrap-servers: 100.1.0.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
properties:
sasl.mechanism: PLAIN //加入了安全认证
security.protocol: SASL_PLAINTEXT //加入了安全认证
receive_topic:receive_topic