Laboratory

Kafka实践(一)

2017-03-02  本文已影响198人  MisterCH

昨天在测试环境搭建了一套zookeeper+kafka(各一台)的机器,开始进行kafka的实践之旅。昨天下班前一直都出现无法发送无法接收的问题,今天终于搞定了。

zookeeper的安装

直接从官网下载bin包后,解压即可

tar -zxvf zookeeper-3.4.9.tar.gz

需要修改的配置有:

  1. 把conf目录下的zoo_sample.cfg改名为zoo.cfg(并修改dataDir)
  2. 修改bin目录下的zkEnv.sh脚本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP

启动zookeeper

bin/zkServer.sh start

Kafka的安装

由于只使用了一个broker,所以直接解压包

tar -zxvf kafka_2.11-0.10.2.0.tgz

需要修改的配置为config/server.properties文件,主要修改的有log.dirs和listeners。

listeners=PLAINTEXT://localhost:9092

这里有个坑,server.properties中一定要配置host.name或者listeners,不然会出现无法收发消息的现象
然后启动即可

bin/kafka-server-start.sh config/server.properties &

客户端

安装完以后需要写生产者的消费者了,直接用最简单的方法来写。

Producer

package producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("acks","1");
    props.put("retries","0");
    props.put("batch.size","16384");
// props.put("linger.ms","1");
// props.put("buffer.memory","33554432");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//生产者的建立
    KafkaProducer producer = new KafkaProducer<>(props);

    for (int i=0;i<100;i++) {
      System.out.println("seding message "+i);
      ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
      producer.send(record, new Callback() {
        public void onCompletion (RecordMetadata metadata, Exception e) {
          if (null != e) {
            e.printStackTrace();
          } else {
            System.out.println(metadata.offset());
          }
        }
      });
    }
    Thread.sleep(100000);
    producer.close();
  }
} 

这里有个坑,如果我直接用producer.send(ProducerRecord)方法,发完100条以后producer.close(),会导致Kafka无法收到消息,怀疑是异步发送导致的,需要真的发送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常发送了。
使用callback是异步发送,此外还能使用同步发送,直接在send方法后加上一个get方法就会直接阻塞直到broker返回消息已收到。

producer.send(record).get();

Producer的properties有几个常用配置:

Consumer

package consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class Consumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("group.id","test");
    props.put("enable.auto.commit","true");
    props.put("auto.commit.interval.ms","1000");
    props.put("session.timeout.ms","30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("testTopic"));
    while(true) {
      ConsumerRecords records = consumer.poll(1000);
      for (ConsumerRecord record: records) {
        System.out.println("offset "+record.offset()+" Message: "+record.value());
      }
    }
  }
}

Consumer的Properties的常用配置有:

参考:http://www.cnblogs.com/edison2012/p/5774207.html

上一篇 下一篇

猜你喜欢

热点阅读