Java连接kafka + kerberos

2021-11-30  本文已影响0人  halfempty

当开启kerberos后, java连接kafka时需要调整部分参数, 详细见样例

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;

import java.util.Arrays;
import java.util.Properties;

public class App {

    public void produce() {
        System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:/krb5.conf");

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        TestCallback callback = new TestCallback();

        String topic = "xn-test";
        for (int i = 0; i < 5; i++) {
            System.out.println("--------------" + i);
            producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), "MSG-" + i), callback);
        }
        producer.close();
    }

    public void consume() {
        System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:/krb5.conf");

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cc1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("xn-test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for(ConsumerRecord<String, String> record: records) {
                System.out.printf("%s-%s-%s\n", record.partition(), record.offset(), record.value());
            }
        }
    }

    public static void main(String[] args) {
        App app = new App();
        app.produce();
//        app.consume();
    }

    public static class TestCallback implements Callback {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("Error...");
                e.printStackTrace();
            } else {
                String message = String.format("sent message to topic:%s partition:%s  offset:%s",
                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读