Kafka入门系列

Kafka入门系列—6. Kafka 常用命令及Java API

2018-12-06  本文已影响41人  ted005

常用命令

./zkServer.sh start-foreground

可选参数:

./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
java -jar zookeeper-dev-ZooInspector.jar
屏幕快照 2018-12-06 上午10.56.40.png
./kafka-server-start.sh ../config/server.properties
./kafka-topics.sh --create -topic testtopic -partitions 3 -replication-factor 1  -zookeeper localhost:2181
./kafka-topics.sh --list -zookeeper localhost:2181
./kafka-topics.sh --delete -topic testtopic -zookeeper localhost:2181
./kafka-console-producer.sh -topic testtopic  --broker-list localhost:9092

注意:--from-beginning会从初始offset位置开始接收消息;不加该参数从当前offset位置开始。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic testtopic --from-beginning

Java API使用

public class SampleProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducer.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("client.id", "DemoProducer");
        //序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>("testtopic", "hello world");

        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata recordMetadata = null;
        try {
            recordMetadata = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(recordMetadata);

    }
}
public class SampleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //指定消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
        //关闭自动位移提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        //反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅topic
        consumer.subscribe(Arrays.asList("testtopic"));

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println(record.value());
        });

        //手动提交位移
        consumer.commitAsync();

    }
}

上一篇 下一篇

猜你喜欢

热点阅读