Spring-Kafka简记-基础使用
2018-09-27 本文已影响0人
78240024406c
new無语 转载请注明原创出处,谢谢!
依赖
MAVEN
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
Gradle
compile 'org.springframework.kafka:spring-kafka:2.1.10.RELEASE'
兼容性
- Apache Kafka客户端1.0.x或更高版本
- Spring Framework 5.0.x
- 最低Java版本:8
基础实现
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoTest {
private static Logger logger = LoggerFactory.getLogger(DemoTest.class);
@Test
public void testAutoCommit() throws Exception {
logger.info("Start auto");
ContainerProperties containerProps = new ContainerProperties("test");
final CountDownLatch latch = new CountDownLatch(50);
containerProps.setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
logger.info("received: " + message);
latch.countDown();
}
});
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName("testAuto");
container.start();
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic("test1");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
logger.info("Stop auto");
}
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(props);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
}
private KafkaTemplate<Integer, String> createTemplate() {
Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
return template;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
Spring Boot 配置实现
application.yml
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoSpringBootTest {
private static Logger logger = LoggerFactory.getLogger(DemoSpringBootTest.class);
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Test
public void test(){
try {
this.template.send("test", "foo1");
this.template.send("test", "foo2");
this.template.send("test", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
} catch (Exception e) {
logger.error("", e);
}
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}