0006.Kafka事务处理

2022-04-16  本文已影响0人  笑着字太黑
1.Kafka中事务处理的实现方式

1.1.注解@Transactional

1.1.1.开启生产者工厂的事务功能,并设置TransactionIdPrefix

KafkaConfiguration.transProducerFactory()

1.1.2.使用以上生产者工厂来创建事务管理类

KafkaConfiguration.transactionManager()

1.1.3.在发送消息的方法上使用注解@Transactional
1.1.4.设置Listener读取信息的事务处理级别

properties = {"isolation.level:read_committed"}

1.1.5.设置setAllowNonTransactional

开启事务处理后发送消息时必须使用事务处理,
如果想不使用的话可以使用setAllowNonTransactional
KafkaConfiguration.kafkaTemplate()

1.2.发送消息时使用executeInTransaction方法
1.3.使用Produce开启事务处理

KafkaTemplate封装了对Produce的使用,暂时没有调查测试直接使用Produce。

2.相关代码

2.1.KafkaConfiguration.java

@Configuration
@EnableKafka
public class KafkaConfiguration {

    private static final Logger log= LoggerFactory.getLogger(KafkaConfiguration.class);

    @Autowired
    public KafkaProperties kafkaProperties;
    
    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        log.info("====kafkaTemplate() start");
        this.logKafkaProperties();
        
        KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());
        template.setAllowNonTransactional(true);

        log.info("====kafkaTemplate() end");
        return template;
    }
    
    @Bean
    public KafkaTemplate<Integer, String> transKafkaTemplate() {
        log.info("====kafkaTransTemplate() start");
        
        KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());

        log.info("====kafkaTransTemplate() end");
        return template;
    }
    
    @Bean
    public ProducerFactory<Integer, String> transProducerFactory() {
        Map<String, Object> producerProps = kafkaProperties.buildProducerProperties();
        DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

        // 开启生产者工厂的事务功能,并设置TransactionIdPrefix
        producerFactory.transactionCapable();
        producerFactory.setTransactionIdPrefix("tran-");
        
        return producerFactory;
    }
    
    @Bean
    public KafkaTransactionManager<Integer, String> transactionManager(ProducerFactory<Integer, String> producerFactory) {
        log.info("====transactionManager() start");
        
        // 开启事务功能,我们需要使用生产者工厂来创建这个事务管理类
        KafkaTransactionManager<Integer, String> manager = new KafkaTransactionManager<Integer, String>(producerFactory);

        log.info("====transactionManager() end");
        return manager;
    }
    
    private void logKafkaProperties() {
        log.info("    ====adminProperties:" + kafkaProperties.buildAdminProperties());
        log.info("    ====consumerProperties:" + kafkaProperties.buildConsumerProperties());
        log.info("    ====producerProperties:" + kafkaProperties.buildProducerProperties());
        log.info("    ====streamsProperties:" + kafkaProperties.buildStreamsProperties());
    }
}

2.2.HelloWoldListener.java

@Component
public class HelloWoldListener {

    private static final Logger log= LoggerFactory.getLogger(HelloWoldListener.class);

    @KafkaListener(
        id = "helloWorld", 
        topics = "topic.quick.helloWorld",
        properties = {"isolation.level:read_committed"}
    )
    public void listen(String msgData) {
        log.info("====HelloWoldListener receive : "+msgData);
    }
}

2.3.TransHelloWorldTest.java

@SpringBootTest
@RunWith(SpringRunner.class)
public class TransHelloWorldTest {

    // 通过名字匹配,所以需要使用@Resource
    @Resource
    private KafkaTemplate<Integer,String> transKafkaTemplate;

    @Test
    @Transactional
    public void tesTransAnnotationHelloWorld() throws InterruptedException {
        System.out.println("====tesTransAnnotationHelloWorld run");
        boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
        System.out.println("    ====transactionCapable:" + transactionCapable);
        System.out.println("    ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
        transKafkaTemplate.send("topic.quick.helloWorld", "this is my first trans demo, Transaction Annotation Hello World!!!");
        throw new RuntimeException("Exception for trans annotation test.");
    }
    
    @Test
    public void testTransExecuteInTransaction() throws Exception {
        System.out.println("====testTransExecuteInTransaction run");
        boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
        System.out.println("    ====transactionCapable:" + transactionCapable);
        System.out.println("    ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
        transKafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Integer, String, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<Integer, String> kafkaOperations) {
                kafkaOperations.send("topic.quick.helloWorld", "this is my second trans demo, Transaction Execute Hello World!!!");
                throw new RuntimeException("Exception for trans execute test.");
            }
        });
    }
}
上一篇 下一篇

猜你喜欢

热点阅读