【7】安装Flink

2018-10-30  本文已影响12人  07b287742148

Standalone Cluster

环境

1. JDK1.8
2. 集群节点vm01,vm02,vm03
   主节点vm01,从节点vm02,vm03

安装步骤

  1. 官网下载 flink 1.6.1
  2. 在主节点上解压flink,进入配置目录. flink/conf
  3. 配置主节点地址. flink.yaml
   jobmanager.rpc.address:vm01
  1. 配置从节点文件. slaves
    vm02
    vm03
  1. 将flink文件夹发送到各个从节点
  2. 启动集群,主节点进入flink的bin目录 flink/bin
    ./start-cluster.sh
    
    启动成功后可在各个节点敲jps查看flink相关进程
    主节点 StandaloneSessionClusterEntrypoint
    从节点 TaskManagerRunner
    
    也可在web页面访问:vm01:8081
  1. 运行流处理的wordcount示例代码,在主节点flink根目录下
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname vm01 --port 9999

显示以下信息表示启动成功
Starting execution of program

监听端口,输入信息
$ nc -l 9999
hello
word


查看结果,注意是到从节点,也就是具体执行任务的节点上看
$ tail -f flink/log/flink-*-taskexecutor-*-.out
hello 1
word 1
  1. 关闭flink,flink/bin
$ ./stop-cluster.sh

本地IDEA编程-Scala版本

  1. pom依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.6.1</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.6.1</version>
    <scope>compile</scope>
</dependency>
  1. scala代码
package com.pein.example

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowCount {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("vm01", 9999)

    val counts = text.flatMap{_.toLowerCase.split("\\W+")filter{_.nonEmpty}}
      .map{(_, 1)}
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")

  }
}

  1. 主节点输入信息在idea上可看到对应的输出
$ nc -l vm01 9999
123
123
123
1233
1233
1233
1233

#本地调试的时候在主节点nc的时候要加上hostname否则连接不上

#对应输出信息
2> (123,2)
2> (123,1)
4> (1233,1)
4> (1233,3)
上一篇下一篇

猜你喜欢

热点阅读