Flink

Flink - 入门学习

2020-06-29  本文已影响0人  红薯爱帅

1. 前言

Flink是批流一体化的数据处理框架,性能卓越,诸多大厂都在使用。
由于时间原因,本篇文章只简单了解一下,后续会深入分析,以及经典case分享。

2. 安装pyflink

$ conda create -n py36 python=3.6
$ conda activate py36
$ conda install Cython
$ python -m pip install apache-flink==1.10.1
export JAVA_HOME=/home/xxx/java/jdk-10.0.2
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

3. run server

$ cat docker-compose.yml 
version: "2.1"
services:
  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
$ docker-compose up -d
$ docker-compose scale taskmanager=3

$ docker-compose ps
       Name                      Command               State                   Ports                 
-----------------------------------------------------------------------------------------------------
flink_jobmanager_1    /docker-entrypoint.sh jobm ...   Up      6123/tcp, 0.0.0.0:8081->8081/tcp      
flink_taskmanager_1   /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
flink_taskmanager_2   /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp
flink_taskmanager_3   /docker-entrypoint.sh task ...   Up      6121/tcp, 6122/tcp, 6123/tcp, 8081/tcp

4. Job开发与测试

test example (pyflink本地启动了mini cluster,未使用我们启动的flink)

cd到pyflink的安装目录,参考: ~/soft/miniconda3/envs/py36/lib/python3.6/site-packages/pyflink
执行word_count.py(大数据版hello-world)

$ python examples/python/table/batch/word_count.py

pyflink shell streaming api

$ ./bin/pyflink-shell.sh remote localhost 8081

# 输入streaming的demo代码
import tempfile
import os
import shutil

sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
    if os.path.isfile(sink_path):
        os.remove(sink_path)
    else:
        shutil.rmtree(sink_path)


s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.connect(FileSystem().path(sink_path)) \
    .with_format(OldCsv()
                 .field_delimiter(',')
                 .field("a", DataTypes.BIGINT())
                 .field("b", DataTypes.STRING())
                 .field("c", DataTypes.STRING())) \
    .with_schema(Schema()
                 .field("a", DataTypes.BIGINT())
                 .field("b", DataTypes.STRING())
                 .field("c", DataTypes.STRING())) \
    .register_table_sink("stream_sink")


t.select("a + 1, b, c").insert_into("stream_sink")

st_env.execute("stream_job")
# cat /tmp/streaming.csv 
2,hi,hello
3,hi,hello

其他提交job方式

通过flink提交py脚本,或者,直接提交flink sql

$ ./bin/flink run -py examples/python/table/batch/word_count.py
上一篇 下一篇

猜你喜欢

热点阅读