3-flink api概述

2020-07-26  本文已影响0人  无暇的风笛

1、抽象分层

3.1-api抽象分层.png
  1. ProcessFunction:提供对时间、事件、状态的细粒度控制,用于处理一些复杂事件的逻辑上,易用性较低
  2. DataStreamApi&DataSet:核心api,提供对流/批数据的操作处理,基于函数式的,简单易用
  3. SQL&TableApi:flink sql的集成基于apache calcite,使用比其他api更灵活方便

2、datastream api

datastream api主要包含以下3块内容

1、datasource

数据的输入来源,来源方式主要有以下几种

  1. 来自文件:读取文本文件,将符合TextInputFormat规范的文件,将字符串返回

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = env.readTextFile("file:///filePath");
    
  2. 来自集合:fromCollection(Collection),fromElements(T ...)等

  3. 来自socket

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
    
  4. 自定义输入

    自定义输入源有两种方式:

    • 实现SourceFunction接口来自定义无并行度的数据源

      demo:每一秒产生一条数据的source

      package streaming.source;
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
      
      
      /**
       * @author xiaolong
       */
      public class InputSource implements SourceFunction<Long> {
      
          private boolean isRunning = true;
      
          private Long counter = 1L;
      
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      
      
          @Override
          public void run(SourceContext<Long> context) throws Exception {
              while (isRunning) {
                  context.collect(counter);
                  counter++;
                  Thread.sleep(1000);
              }
      
          }
      }
      

2、transform

flink提供了很多算子,经常使用的有以下这些:

3.2-合并流输出结果.png

测试结果:


3.4-合并不同流输出结果.png 3.3-reduce输出.png

3、sink

flink有如下几种sink操作:

  1. 标准输出:print()/printToErr()

  2. 输出到文档或socket:writeAsCsv,writeAsText,writeToSocket

  3. 写入到flink第三方存储:ElasticSearch,Redis,kafkaProducer等

    测试从socket读取数据,写入到kafka

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
    import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    
    import java.util.Properties;
    
    
    /**
     * @author xiaolong
     */
    public class TestSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> strSource = env.socketTextStream("localhost", 9000, "\n");
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "xxxxxx"); // brokers地址
            properties.put("transaction.timeout.ms", 15 * 60 * 1000); // 设置FlinkKafkaProducer011的超时时间,默认是1h, kafka服务默认事务超时时间是15min,如果不设置会报错
            FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
                    "kafkaDruid",  // kafka topic
                    new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),  // 序列化
                    properties,      // properties
                    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);  // kafka语义
    
            // 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳;
            // 此方法不适用于早期版本的 Kafka
            myProducer.setWriteTimestampToKafka(true);
            strSource.addSink(myProducer);
            strSource.print();
            env.execute("testFlinkJob");
    
        }
    }
    

    socket输入:

3.7-socket输入.png

测试结果,到kafka平台上可查看到最新的消息:

3.6-数据写入到kafka.png
  1. 自定义输出,实现SinkFunction或RichSInkFunction接口
上一篇 下一篇

猜你喜欢

热点阅读