Apache Pulsar[5] API demo

2020-06-29  本文已影响0人  QuinnSun

生产者

public class PulsarProducer {
    private static String localClusterUrl = "pulsar://localhost:6650";
    public static void main(String[] args) {
        try {
            Producer<byte[]> producer = getProducer();
            String msg = "test send";

            Long start = System.currentTimeMillis();
            MessageId msgId = producer.send(msg.getBytes());
            System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
        } catch (Exception e) {
            System.err.println(e);
        }
    }

    public static Producer<byte[]> getProducer() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://my-tenant/my-namespace/my-topic")
                .producerName("test-producer")
                .create();
        return producer;
    }
}

消费者

public class PulsarConsumerDemo {
    private static String localClusterUrl = "pulsar://localhost:6650";

    public static void main(String[] args) {
        try {
            //将订阅消费者指定的主题和订阅
            Consumer<byte[]> consumer = getClient().newConsumer()
                    .topic("persistent://my-tenant/my-namespace/my-topic")
                    .subscriptionName("my-subscription")
                    .subscriptionType(SubscriptionType.Failover)
                    .subscribe();
            while (true) {
                Message msg = consumer.receive();
                System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
                // 确认消息,以便broker删除消息
                consumer.acknowledge(msg);
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static PulsarClient getClient() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder()
                .serviceUrl(localClusterUrl).build();
        return client;
    }
}
上一篇下一篇

猜你喜欢

热点阅读