八、Kafka Connector

2019-05-21  本文已影响0人  木戎

Flink-kafka-connector

配置

nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
nohup kafka-server-start /usr/local/etc/kafka/server.properties &

Flink消费Kafka注意事项

默认读取上次保存的offset信息 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据

SerializationSchema&&DeserializationSchema


如何将从 kafka 中获取的字节流转换为 Java Object,则通过 DeserializationSchema 来实现转换。其中 SimpleStringSchema 将 kafka 获取的字节流转换为字符串。
其中 KeyedDeserializationSchema 支持 Key, Value 反序列化。

示例

 <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
      <version>1.8.0</version>
 </dependency>
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;

public class KafkaDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();// kafka&&zk 配置参数
        properties.setProperty("bootstrap.servers", "localhost:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09("test",new SimpleStringSchema(),properties);
/*
        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute();
    }

    public static class MyNoParalleSource implements SourceFunction<String> {//1

        //private long count = 1L;
        private boolean isRunning = true;

        /**
         * 主要的方法
         * 启动一个source
         * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while(isRunning){
                //图书的排行榜
                List<String> books = new ArrayList<>();
                books.add("Pyhton从入门到放弃");//10
                books.add("Java从入门到放弃");//8
                books.add("Php从入门到放弃");//5
                books.add("C++从入门到放弃");//3
                books.add("Scala从入门到放弃");//0-4
                int i = new Random().nextInt(5);
                // 此处类同于storm的emit操作
                ctx.collect(books.get(i));

                //每2秒产生一条数据
                Thread.sleep(2000);
            }
        }
        //取消一个cancel的时候会调用的方法
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
上一篇下一篇

猜你喜欢

热点阅读