温故知新:初识Seatunnel

2024-07-29  本文已影响0人  灿烂的GL

本文为学习笔记,会随着学习深入持续更新,仅供参考

1、Seatunnel是干什么的
简化企业中多源、异构数据的集成过程。它能够每天稳定地同步数十亿条数据,支持多种数据格式和存储系统之间的高效数据集成。主要用于处理实时流式和离线批处理任务。


2、实现原理和基本模块
SeaTunnel的数据处理流水线由Source、Sink以及多个Transform构成。Source负责从各种数据源获取数据,Sink则负责将数据写入目标系统。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。


3、在springboot中如何实现数据转换集成
a)直接转换(数据源1 —》数据源2)
参考官方文档
b)需要进行数据处理后再同步(数据源1—》数据处理—》数据源2)
简单的数据转换官方文档有提供配置参数(如:映射、过滤、替换等),复杂的数据转换需要重写transform模块(如涉及业务逻辑或官方没有的转换)

官方转换方法.png
重写transform模块方法有以下几种:
方式一:工具和代码模块独立,可以直接将转换代码写在配置文件里
参考:DynamicCompile
方式二:集成在springboot项目中进行引用
参考代码逻辑如下
### 1. **配置文件,放在resources文件夹下**
env:
  execution.parallelism: 4

source:
  Kafka:
    consumer.bootstrap.servers: "localhost:9092"
    consumer.group.id: "sea-group"
    topic: "input-topic"
    schema:
      field:
        - name: "value"
          type: "string"
                  
transform:
  - class_name: com.example.transform.TLVToJsonTransform
    row_type: 
      - name: "value"
        type: "string"

sink:
  HDFS:
    path: "hdfs://namenode:8020/user/data/output"
    file_format: "json"
    partition_by: ["date"]
    save_mode: "append"
    

### 2. **实现自定义Transform**
创建一个自定义的Transform组件,用于解析TLV格式数据并转换为JSON格式。在SeaTunnel中,Transform是用来对数据进行中间处理的组件。

```java
import org.apache.seatunnel.api.common.SeaTunnelTransform;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.json.JSONObject;

public class TLVToJsonTransform implements SeaTunnelTransform<SeaTunnelRow> {

    private final SeaTunnelRowType rowType;

    public TLVToJsonTransform(SeaTunnelRowType rowType) {
        this.rowType = rowType;
    }

    @Override
    public SeaTunnelRow map(SeaTunnelRow input) {
        String tlvData = (String) input.getField(0); // 假设TLV数据在第一列
        JSONObject json = parseTLVToJson(tlvData);
        return new SeaTunnelRow(new Object[]{json.toString()});
    }

    private JSONObject parseTLVToJson(String tlvData) {
        JSONObject json = new JSONObject();
        for (int i = 0; i < tlvData.length(); ) {
            String tag = tlvData.substring(i, i + 2);
            int length = Integer.parseInt(tlvData.substring(i + 2, i + 4));
            String value = tlvData.substring(i + 4, i + 4 + length * 2);
            json.put(tag, value);
            i += 4 + length * 2;
        }
        return json;
    }
    
    @Override
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return rowType;
    }
}

### 3. **在Spring Boot应用中启动SeaTunnel任务:**
.
import org.apache.seatunnel.core.SeaTunnelCore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class SeaTunnelApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(SeaTunnelApplication.class, args);
        SeaTunnelCore.start(args);
或者SeaTunnel.start("seatunnel-config.conf");
    }
}

备注:配置文件编写可以参考官网,如果有多个topic,可以配置多个文件


参考文件
1、3分钟搞懂 SeaTunnel CDC 数据同步
2、官方例子
3、使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

上一篇下一篇

猜你喜欢

热点阅读