【7】安装Flink
2018-10-30 本文已影响12人
07b287742148
Standalone Cluster
环境
1. JDK1.8
2. 集群节点vm01,vm02,vm03
主节点vm01,从节点vm02,vm03
安装步骤
- 官网下载 flink 1.6.1
- 在主节点上解压flink,进入配置目录.
flink/conf
- 配置主节点地址. flink.yaml
jobmanager.rpc.address:vm01
- 配置从节点文件. slaves
vm02
vm03
- 将flink文件夹发送到各个从节点
- 启动集群,主节点进入flink的bin目录
flink/bin
./start-cluster.sh
启动成功后可在各个节点敲jps查看flink相关进程
主节点 StandaloneSessionClusterEntrypoint
从节点 TaskManagerRunner
也可在web页面访问:vm01:8081
- 运行流处理的wordcount示例代码,在主节点flink根目录下
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname vm01 --port 9999
显示以下信息表示启动成功
Starting execution of program
监听端口,输入信息
$ nc -l 9999
hello
word
查看结果,注意是到从节点,也就是具体执行任务的节点上看
$ tail -f flink/log/flink-*-taskexecutor-*-.out
hello 1
word 1
- 关闭flink,
flink/bin
$ ./stop-cluster.sh
本地IDEA编程-Scala版本
- pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
<scope>compile</scope>
</dependency>
- scala代码
package com.pein.example
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("vm01", 9999)
val counts = text.flatMap{_.toLowerCase.split("\\W+")filter{_.nonEmpty}}
.map{(_, 1)}
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
- 主节点输入信息在idea上可看到对应的输出
$ nc -l vm01 9999
123
123
123
1233
1233
1233
1233
#本地调试的时候在主节点nc的时候要加上hostname否则连接不上
#对应输出信息
2> (123,2)
2> (123,1)
4> (1233,1)
4> (1233,3)