Flink的DataSet API求WC

2020-05-31  本文已影响0人  喵星人ZC

IDEA整体结构


image.png

pom


<properties>
    <flink.version>1.10.0</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.8</scala.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!--Log依赖-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>runtime</scope>
    </dependency>

  </dependencies>

WC.txt

hadoop  hadoop  spark
spark   flink   MR
MR  MR  storm   flink

log4j.properties不想看无关日志就设置只打印error日志

log4j.rootLogger=error,console
        
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%p]%d{yyy-MM-dd hh:mm:ss} %F - %m%n

BatchJob

package com.zc.bigdata

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

object BatchJob {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val value: DataSet[String] = env.readTextFile("data/wc.txt")

    val counts = value.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map (WC(_, 1))
      .groupBy("word")
      .sum("cnt")

    counts.print()
  }
}

case class WC(word: String, cnt: Int)

image.png
上一篇下一篇

猜你喜欢

热点阅读