基于Scala实现Kafka生产者API
2019-06-12 本文已影响0人
写scala的老刘
Kafka 生产者发送数据一共有3种方式
(1) 发送并忘记(fire-and-forget),把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息
(2)使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常), 就可以知道消息是否发送成功
(3)异步发送, 大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。 不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。这也是生产环境下最常用的一种方式,相关Scala代码如下:
package cn.com.bonc.Face
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
/**
* Create by liuyancheng 2019-06-12 11:13
*/
object ScalaKafkaProducer {
def main(args: Array[String]): Unit = {
val kafkaProp = new Properties()
kafkaProp.put("bootstrap.servers", "kafkahost:port")
kafkaProp.put("acks", "1")
kafkaProp.put("retries", "3")
//kafkaProp.put("batch.size", 16384)//16k
kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](kafkaProp)
val lines = Source.fromFile("E:\\CodeProject\\IdeaProjects\\HeNan\\FactRecognition\\src\\main\\resources\\data.txt").getLines()
while (lines.hasNext) {
val line = lines.next()
val record = new ProducerRecord[String, String]("facetest", line)
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (metadata != null) {
println("发送成功")
}
if (exception != null) {
println("消息发送失败")
}
}
})
}
producer.close()
}
}