玩转大数据

Flink 使用之数据源

2019-12-19  本文已影响0人  AlienPaul

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

Flink内置数据源

Text file

读取磁盘或者HDFS中的文件作为数据源。
唯一的参数file path可以指定:

注意:

  1. 如果不填写前缀file://或者hdfs://,默认为file://
  2. 使用Flink读取HDFS文件系统,需要去官网下载对应Pre-bundled Hadoop包。这里给出的链接是适用于Hadoop 2.8.3。之后将这个jar复制到flink安装位置的lib目录中。
val stream = env.readTextFile("/path/to/file.txt")

socketTextStream

使用socket作为数据源。但不推荐socket在生产环境中作为数据源。原因如下:

SocketTextStream适合用于debug或者是测试用途。

val stream = env.socketTextStream("localhost", 9000)

fromElements

将一系列元素作为数据源。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3);

fromCollection

和fromElements方法类似,不同的是该方法接收一个集合对象,而不是可变参数。如下所示:

val stream = env.fromCollection(Array(1, 2, 3))

Kafka 数据源

该数据源用于接收Kafka的数据。
使用Kafka数据源之前需要先确定Kafka的版本,引入对应的Kafka Connector以来。对应关系如下所示。

Kafka 版本 Maven 依赖
0.8.x flink-connector-kafka-0.8_2.11
0.9.x flink-connector-kafka-0.9_2.11
0.10.x flink-connector-kafka-0.10_2.11
0.11.x flink-connector-kafka-0.11_2.11
1.0 以上 flink-connector-kafka_2.11

引入Maven依赖。以flink-connector-kafka_2.11为例,添加以下依赖到pom.xml文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

在集群中运行时,为了减少提交jar包的大小,需要将该依赖设置为provided。然后把此依赖包复制到Flink各个节点安装位置的lib目录中。

一个简单的使用例子如下:

// 设置Kafka属性
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.100.128:9092")
properties.setProperty("group.id", "test")

// 创建Kafka数据源,其中test为topic名称
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)

DeserializationSchema

DeserializationSchema用于将接收到的二进制数据转换为Java或Scala对象。Kafka Connector提供了如下4种DeserializationSchema:

自定义DeserializationSchema

所有的Schema需要实现DeserializationSchema。该接口源码如下所示:

@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

    /**
     * Deserializes the byte message.
     *
     * @param message The message, as a byte array.
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    T deserialize(byte[] message) throws IOException;

    /**
     * Method to decide whether the element signals the end of the stream. If
     * true is returned the element won't be emitted.
     *
     * @param nextElement The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
     */
    boolean isEndOfStream(T nextElement);
}

方法解释:

以SimpleStringSchema为例展示下怎么编写自定义的DeserializationSchema。
相关代码如下:

@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    // SerializationSchema接口的方法省略
    @Override
    public String deserialize(byte[] message) {
        return new String(message, charset);
    }

    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }
    // ...
}

起始位置属性配置

使用示例:

myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

方法解释:

Topic和分区感知

Topic感知

可以使用如下构造函数创建FlinkKafkaConsumer:

FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) 

和指定topic名称不同的是,这里传入的是一个正则表达式。所有名称匹配该正则表达式的topic都会被订阅。如果配置了分区感知(配置flink.partition-discovery.interval-millis为非负数),Job启动之后kafka新创建的topic如果匹配该正则,也会被订阅到。

分区感知

在Job运行过程中如果kafka新创建了partition,Flink可以动态感知到,然后对其中数据进行消费。整个过程仍然可以保证exactly once语义。

默认情况分区感知是禁用的。如果要开启分区感知,可以设置flink.partition-discovery.interval-millis,即分区感知触发时间间隔。

实现自定义数据源

自定义数据源需要实现Flink提供的SourceFunction接口。

SourceFunction接口的定义如下:

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
}

run方法

run方法为数据源向下游发送数据的主要逻辑。编写套路为:

cancel方法:

cancel方法在数据源停止的时候调用。cancel方法必须能够控制run方法中的循环,停止循环的运行。并做一些状态清理操作。

SourceContext类

SourceContext在SourceFunction中使用,用于向下游发送数据,或者是发送watermark。
SourceContext的方法包括:

CheckpointedFunction

如果数据源需要保存状态,那么就需要实现CheckpointedFunction中的相关方法。
CheckpointedFunction包含如下方法:

样例

Flink官方给出的样板Source。这个数据源会发送0-999到下游系统。代码如下所示:

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
    private long count = 0L;
    // 使用一个volatile类型变量控制run方法内循环的运行
    private volatile boolean isRunning = true;

    // 保存数据源状态的变量
    private transient ListState<Long> checkpointedCount;

    public void run(SourceContext<T> ctx) {
        while (isRunning && count < 1000) {
            // this synchronized block ensures that state checkpointing,
            // internal state updates and emission of elements are an atomic operation
            // 此处必须要加锁,防止在checkpoint过程中,仍然发送数据
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }

    public void cancel() {
        // 设置isRunning为false,终止run方法内循环的运行
        isRunning = false;
    }

    public void initializeState(FunctionInitializationContext context) {
        // 获取存储状态
        this.checkpointedCount = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>("count", Long.class));

        // 如果数据源是从失败中恢复,则读取count的值,恢复数据源count状态
        if (context.isRestored()) {
            for (Long count : this.checkpointedCount.get()) {
                this.count = count;
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext context) {
        // 保存数据到状态变量
        this.checkpointedCount.clear();
        this.checkpointedCount.add(count);
    }
}
上一篇下一篇

猜你喜欢

热点阅读