Alink连接Kafka写入读取数据

2021-12-13  本文已影响0人  越陌先生

Python版:

pyalink 版本 1.5.1

使用pyalink的优势是能够更快速的处理 转化 可视化数据 尤其是python提供很多方便的数据处理的库 例如pandas 可以快速的对数据进行操作

写入

写入组件使用 KafkaSinkStreamOp

from pyalink.alink import *
from kafka import *
## 使用本地环境
useLocalEnv(1, flinkHome=None, config=None)
## 获取kafka组件对象 
sink = KafkaSinkStreamOp()\
### kafka 地址
.setBootstrapServers("xxx:xxx:xxx:xxx:9092")\
### 数据格式
.setDataFormat("json")\
.setTopic("iris")

filePath = '/Users/zhangyanqiang/Documents/资料/学习/data/alink/iris.csv'
### 数据scheme定义
SCHEMA_STR = "sepal_length double, sepal_width double,
 petal_length double, petal_width double, species string"
### 数据从csv文件读取
data = CsvSourceStreamOp().setFilePath(filePath).setSchemaStr(SCHEMA_STR)
### 数据链接
data.link(sink)
### 执行操作
StreamOperator.execute()

读取组件使用KafkaSourceStreamOp

from pyalink.alink import *
useLocalEnv(1, flinkHome=None, config=None)
source = KafkaSourceStreamOp()\
.setBootstrapServers("xxx:xxx:xxx:xxx:9092")\
.setTopic("iris")\
### 读取模式 从最开始开始读
.setStartupMode("EARLIEST")\
### alink_group为消费者组id
.setGroupId("alink_group")
### link方法关联一些解析规则
data = source.link(
    ### 解析成Json数据
    JsonValueStreamOp()\
    .setSelectedCol("message")
    .setReservedCols([])
    .setOutputCols(["sepal_length", "sepal_width", "petal_length",
 "petal_width", "category"])
    .setJsonPath(["$.sepal_length", "$.sepal_width", 
"$.petal_length", "$.petal_width","$.category"])
).link(
     ### 按照sql select格式转化
    SelectStreamOp()\
    .setClause("CAST(sepal_length AS DOUBLE) AS sepal_length, "\
               + "CAST(sepal_width AS DOUBLE) AS sepal_width, "\
               + "CAST(petal_length AS DOUBLE) AS petal_length, "\
               + "CAST(petal_width AS DOUBLE) AS petal_width, category")
)

data.print()
StreamOperator.execute()

数据源文件使用的是
https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv
最终 data.print()效果为

image.png

Java版

因为Alink是java写的 用java调用相对来说会方便些
比如 之前旧版本的kafka读取数据源组件类为 Kafka011SinkStreamOp ,版本升级后原来代码 一直报错Kafka011SinkStreamOp 找不到, 心想一定是名字改了 就去Alink源代码里找了下 果然 改成KafkaSinkStreamOp

Alink版本 1.13_2.11

代码

package com.alibaba.alink.kafka;

import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.dataproc.JsonValueStreamOp;
import com.alibaba.alink.operator.stream.sink.KafkaSinkStreamOp;
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
import com.alibaba.alink.operator.stream.source.KafkaSourceStreamOp;

/**
 * @author zhangyanqiang
 * @date 2021/12/13
 **/
public class KafKaSink {

    private static void writeKafka() throws Exception {
        String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
        String SCHEMA_STR
                = "sepal_length double, sepal_width double,
 petal_length double, petal_width double, category string";
        CsvSourceStreamOp data = new CsvSourceStreamOp().setFilePath(URL).
setSchemaStr(SCHEMA_STR);

        KafkaSinkStreamOp sink = new KafkaSinkStreamOp()
                .setBootstrapServers("xxx:xxx:xxx:xxx:9092")
                .setDataFormat("json")
                .setTopic("iris");

        data.link(sink);

        StreamOperator.execute();
    }

    private static void readKafka() throws Exception {
        KafkaSourceStreamOp source = new KafkaSourceStreamOp()
                .setBootstrapServers("xxx:xxx:xxx:xxx:9092")
                .setTopic("iris")
                .setStartupMode("EARLIEST")
                .setGroupId("alink_group");

        StreamOperator data = source
                .link(
                        new JsonValueStreamOp()
                                .setSelectedCol("message")
                                .setReservedCols(new String[] {})
                                .setOutputCols(
                                        new String[] {"sepal_length", "sepal_width", "petal_length", "petal_width", "category"})
                                .setJsonPath(new String[] {"$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width",
                                        "$.category"})
                );

        System.out.print(data.getSchema());
        data.print();
        StreamOperator.execute();
    }

    public static void main(String[] args) throws Exception {
        writeKafka();
        readKafka();
    }
}

上一篇 下一篇

猜你喜欢

热点阅读