Big Data

flink示例

2019-06-28  本文已影响1人  盗梦者_56f2

DataStream API

#创建执行环境
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
val env = StreamExecutionEnvironment.createLocalEnvironment() #本地执行环境
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
var executionConfig = env.getConfig
#添加输入源
#自定义
StreamExecutionEnvironment.addSource(sourceFunction)
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
#基于socket
val text = env.socketTextStream("localhost", port, '\n')
#基于文件
readTextFile(path)- 逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。
readFile(fileInputFormat, path) - 按指定的文件输入格式指定读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个方法在内部调用的方法。
#基于集合
fromCollection(Seq)——从Java.util.Collection创建数据流。集合中的所有元素必须是相同类型的。
fromCollection(Iterator)——从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
fromElements(elements:_*)——给定的对象序列中创建数据流。所有对象必须具有相同的类型。
fromParallelCollection(SplittableIterator)——并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
generateSequence(from,to)——在给定的区间内并行生成数字序列。
#转换
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
            return event.getUser();
        }
    });
DataStream<Tuple2<String, Long>> result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
            acc.f0 = event.getUser();
            acc.f1 += event.getByteDiff();
            return acc;
        }
    });
result
    .map(new MapFunction<Tuple2<String,Long>, String>() {
        @Override
        public String map(Tuple2<String, Long> tuple) {
            return tuple.toString();
        }
    });
case class WordWithCount(word: String, count: Long)
val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")
#输出源
writeAsText()/ TextOutputFormat- 按字符串顺序写入元素。通过调用每个元素的toString()方法获得字符串。
writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
print()/ printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。还可以选择在输出之前提供前缀(msg)。这可以帮助区分不同的打印调用。如果并行度大于1,则输出还将加上生成输出的任务的标识符。
writeUsingOutputFormat()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
writeToSocket - 根据SerializationSchema将元素写入Socket
addSink - 调用自定义接收器功能。Flink捆绑了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器功能。

result.print();
windowCounts.print().setParallelism(1)
result.addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
#触发程序执行
see.execute();
env.execute("Socket Window WordCount")

DataSet API

#创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createLocalEnvironment()
#输入源
#基于文件
readTextFile(path)/ TextInputFormat- 按行读取文件并将其作为字符串返回。
readTextFileWithValue(path)/ TextValueInputFormat- 按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。
readCsvFile(path)/ CsvInputFormat- 解析逗号(或其他字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本的java类型及其Value对应的字段类型。
readFileOfPrimitives(path, delimiter)/ PrimitiveInputFormat- 使用给定的分隔符解析以新行(或其他字符序列)分隔的基本数据类型(例如String或Integer)的文件。
readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value class的指定路径中读取文件,并将它们返回为Tuple2 <Key,Value>。
#基于集合
fromCollection(Collection) - 从Java Java.util.Collection创建数据集。集合中的所有元素必须属于同一类型。
fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
fromElements(elements: _*) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
generateSequence(from, to) - 并行生成给定间隔中的数字序列。
#通用
readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。
createInput(inputFormat)/ InputFormat- 接受通用输入格式。

val localLines = env.readTextFile("file:///path/to/my/textfile")
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
val values = env.fromElements("Foo", "bar", "foobar", "fubar")
val numbers = env.generateSequence(1, 10000000)
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")
#转换
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)
#输出源
writeAsText()/ TextOutputFormat- 按字符串顺序写入元素。通过调用每个元素的toString()方法获得字符串。
writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
print()/ printToErr()- 在标准输出/标准错误流上打印每个元素的toString()值。
write()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
output()/ OutputFormat- 大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

textData.writeAsText("file:///my/result/on/localFS")
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)
counts.print()
counts.writeAsCsv(outputPath, "\n", " ")

Table API & SQL

val env = StreamExecutionEnvironment.getExecutionEnvironment
val bEnv = ExecutionEnvironment.getExecutionEnvironment
#创建表环境
val tableEnv = TableEnvironment.getTableEnvironment(env)
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
#注册表
tableEnv.registerTable("table1", ...)    
tableEnv.registerTableSource("table2", ...) 
tableEnv.registerExternalCatalog("extCat", ...)
tableEnv.registerTableSink("outputTable", ...)

val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)
val catalog: ExternalCatalog = new InMemoryExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
#创建表
val tapiResult = tableEnv.scan("table1")
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
#转换
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)
#输出
tapiResult.insertInto("outputTable")
#执行
env.execute()
#DataStream / DataSet -> Table 
val stream: DataStream[(Long, String)] = ...
#方式一
tableEnv.registerDataStream("myTable", stream)
#方式二
val table1: Table = tableEnv.fromDataStream(stream)
#方式三
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
#Table -> DataStream / DataSet
val tableEnv = TableEnvironment.getTableEnvironment(env)
val table: Table = ...
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table)
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

数据类型

有六种不同类别的数据类型:

#case class
case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2))
#tuple
val input2 = env.fromElements(("hello", 1), ("world", 2))
#POJO
class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}
val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2))
上一篇 下一篇

猜你喜欢

热点阅读