Flink的DataSet API求WC
2020-05-31 本文已影响0人
喵星人ZC
IDEA整体结构
image.png
pom
<properties>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--Log依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
WC.txt
hadoop hadoop spark
spark flink MR
MR MR storm flink
log4j.properties不想看无关日志就设置只打印error日志
log4j.rootLogger=error,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%p]%d{yyy-MM-dd hh:mm:ss} %F - %m%n
BatchJob
package com.zc.bigdata
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
object BatchJob {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val value: DataSet[String] = env.readTextFile("data/wc.txt")
val counts = value.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map (WC(_, 1))
.groupBy("word")
.sum("cnt")
counts.print()
}
}
case class WC(word: String, cnt: Int)
image.png