基于Scala实现Kafka生产者API

2019-06-12  本文已影响0人  写scala的老刘

Kafka 生产者发送数据一共有3种方式

(1) 发送并忘记(fire-and-forget),把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息

(2)使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常), 就可以知道消息是否发送成功

(3)异步发送, 大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。 不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。这也是生产环境下最常用的一种方式,相关Scala代码如下:

package cn.com.bonc.Face
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
/**
  * Create by liuyancheng 2019-06-12 11:13
  */
object ScalaKafkaProducer {
  def main(args: Array[String]): Unit = {
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "kafkahost:port")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](kafkaProp)
    val lines = Source.fromFile("E:\\CodeProject\\IdeaProjects\\HeNan\\FactRecognition\\src\\main\\resources\\data.txt").getLines()
    while (lines.hasNext) {
      val line = lines.next()
      val record = new ProducerRecord[String, String]("facetest", line)
      producer.send(record, new Callback {
        override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
          if (metadata != null) {
            println("发送成功")
          }
          if (exception != null) {
            println("消息发送失败")
          }
        }
      })
    }
    producer.close()
  }
}

上一篇 下一篇

猜你喜欢

热点阅读