通过SeaTunnel集成平台将数据写入OSS-HDFS服务

SeaTunnel是一个开源、易用的超高性能分布式数据集成平台,支持海量数据的实时同步。本文介绍如何通过SeaTunnel集成平台将数据写入OSS-HDFS服务。

背景信息

SeaTunnel可稳定高效地同步百亿级数据,专注于数据集成和数据同步,主要解决数据集成领域的以下问题。

  • 数据源多样

    常用的数据源有数百种,版本不兼容。随着新技术的出现,可能出现更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。

  • 复杂同步场景

    数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。

  • 资源需求高

    现有的数据集成和数据同步工具往往需要大量的计算资源或JDBC连接资源来完成海量小文件的实时同步,这在一定程度上加重了企业的负担。

  • 缺乏数据监控

    数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况。

  • 技术栈复杂

    企业使用的技术组件各不相同,您需要针对不同的组件开发相应的同步程序来完成数据集成。

  • 管理和维护困难

    受限于不同的底层技术组件(Flink或者Spark),通常单独开发和管理离线同步和实时同步,增加了管理和维护的难度。

更多信息,请参见SeaTunnel

前提条件

已开通OSS-HDFS服务。具体步骤,请参见开通并授权访问OSS-HDFS服务

使用限制

仅允许通过专有网络VPC的方式访问OSS-HDFS服务。创建专有网络VPC时,需确保创建的VPC与待开启OSS-HDFS服务的Bucket位于相同的地域。

步骤一:部署SeaTunnel

本地部署

重要

执行以下步骤前,您需要确保已安装Java 8或者Java 11并设置JAVA_HOME。

  1. 在终端通过以下命令下载并解压SeaTunnel。

    以下以下载并解压2.3.0版本的SeaTunnel为例。

    export version="2.3.0" && wget "https://www.apache.org/dyn/closer.lua/incubator/seatunnel/2.3.0/apache-seatunnel-incubating-2.3.0-bin.tar.gz" && tar -xzvf "apache-seatunnel-incubating-2.3.0-bin.tar.gz"

    关于SeaTunnel的版本信息,请参见Apache SeaTunnel

  2. 安装connector。

    自2.2.0-beta版本开始,二进制包默认不提供connector依赖。以2.3.0版本为例,您在首次使用SeaTunnel时,需要执行以下命令安装connector。

    sh bin/install_plugin.sh 2.3.0
    说明

    ${SEATUNNEL_HOME}/config/plugin.properties配置文件默认下载全部connector插件,您可以结合业务需求适当增减connector。

    connector插件列表如下:

       --connectors-v2--
       connector-amazondynamodb
       connector-assert
       connector-cassandra
       connector-cdc-mysql
       connector-cdc-sqlserver
       connector-clickhouse
       connector-datahub
       connector-dingtalk
       connector-doris
       connector-elasticsearch
       connector-email
       connector-file-ftp
       connector-file-hadoop
       connector-file-local
       connector-file-oss
       connector-file-oss-jindo
       connector-file-s3
       connector-file-sftp
       connector-google-sheets
       connector-hive
       connector-http-base
       connector-http-feishu
       connector-http-gitlab
       connector-http-github
       connector-http-jira
       connector-http-klaviyo
       connector-http-lemlist
       connector-http-myhours
       connector-http-notion
       connector-http-onesignal
       connector-http-wechat
       connector-hudi
       connector-iceberg
       connector-influxdb
       connector-iotdb
       connector-jdbc
       connector-kafka
       connector-kudu
       connector-maxcompute
       connector-mongodb
       connector-neo4j
       connector-openmldb
       connector-pulsar
       connector-rabbitmq
       connector-redis
       connector-s3-redshift
       connector-sentry
       connector-slack
       connector-socket
       connector-starrocks
       connector-tablestore
       connector-selectdb-cloud
       connector-hbase
       --end--

Kubernetes(Beta)部署

重要
  • 通过Kubernetes(Beta)部署SeaTunnel目前处于试运行阶段。以下以Flink引擎为例,不推荐在生产环境中使用。

  • 确保已在本地安装Docker,Kubernetes以及Helm。

  1. 启动集群。

    以Kubernetes 1.23.3版本为例,您可以使用以下命令启动集群。

    minikube start --kubernetes-version=v1.23.3
  2. 使用SeaTunnel运行镜像。

    ENV SEATUNNEL_VERSION="2.3.0-beta"
    ENV SEATUNNEL_HOME = "/opt/seatunnel"
    
    RUN mkdir -p $SEATUNNEL_HOME
    
    RUN wget https://archive.apache.org/dist/incubator/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    RUN tar -xzvf apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    
    RUN cp -r apache-seatunnel-incubating-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/
    RUN rm -rf apache-seatunnel-incubating-${SEATUNNEL_VERSION}*
    RUN rm -rf $SEATUNNEL_HOME/connectors/seatunnel
  3. 构建镜像。

    docker build -t seatunnel:2.3.0-beta-flink-1.13 -f Dockerfile .
  4. 将图像加载至minikube。

    minikube image load seatunnel:2.3.0-beta-flink-1.13
  5. 在Kubernetes集群上安装证书管理器。

    在Kubernetes集群上安装证书管理器以启用Webhook组件,每个Kubernetes集群只需要安装一次证书管理器。

    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
  6. 使用Helm图表部署最新的Flink Kubernetes Operator版本。

    1. 下载Flink Kubernetes Operator。

      helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/
    2. 部署Flink Kubernetes Operator。

      helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
  7. 验证kubectl是否已成功安装。

    kubectl get pods

    返回以下结果说明已成功安装kubectl。

    NAME                                                   READY   STATUS    RESTARTS      AGE
    flink-kubernetes-operator-5f466b8549-mgchb             1/1     Running   3 (23h ago)   1

步骤二:设置配置文件

通过添加配置文件,确定SeaTunnel启动后数据输入、处理和输出的方式和逻辑。配置示例如下:

env {                                                                                                                                                                          
  # You can set SeaTunnel environment configuration here                                                                                                                       
  execution.parallelism = 10                                                                                                                                                   
    job.mode = "BATCH"                                                                                                                                                           
    checkpoint.interval = 10000                                                                                                                                                  
    #execution.checkpoint.interval = 10000                                                                                                                                       
    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                          
    }                                                                                                                                                                              


source {                                                                                                                                                                       
  LocalFile {                                                                                                                                                                    
  			path = "/data/seatunnel-2.3.0/testfile/source"                                                                                                                               
  			type = "csv"                                                                                                                                                                 
			delimiter = "#"                                                                                                                                                                
    		schema {                                                                                                                                                                       
    			fields {                                                                                                                                                                   
      				name = string                                                                                                                                                          
      				age = int                                                                                                                                                              
      				gender = string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
						} 
           			}
     		}                                                                                             
}   

      
transform {
    
}
  # In this case we don't need this function. If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,                                                              
  # please go to https://seatunnel.apache.org/docs/category/sink-v2

      
sink {                                                                                                                                                                         
	OssJindoFile {                                                                                                                                                               
      path="/seatunnel/oss03"                                                                                                                                                                                                                                                                                        
      bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                       
      access_key = "LTAI5t7h6SgiLSganP2m****"
      access_secret = "KZo149BD9GLPNiDIEmdQ7d****"                                                                                                                                                                                                                                                          
      endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                              
      }

  # If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,                                                              
  # please go to https://seatunnel.apache.org/docs/category/sink-v2                                                                                                            
} 

以上配置示例包含四个模块,详细说明如下。

模块

是否必选

说明

env

用于配置引擎的环境变量。

关于env的更多信息,请参见env

source

用于定义SeaTunnel需要获取的数据源,并将获取的数据用于下一个模块transform,支持同时定义多个数据源。

关于支持的数据源列表,请参见source

transform

用于定于数据处理模块。当定义了数据源后,可能还需要对数据做进一步的处理。如果您不需要做数据处理,可以直接忽略transform模块,数据将直接从source写入sink。

关于transform的更多信息,请参见transform

sink

用于定义SeaTunnel将数据写入的目标端,本教程以写入OSS-HDFS服务为例。

  • 关于支持的数据写入目标端列表,请参见sink

  • 关于sink的配置示例,请参见OSS-HDFS配置说明

步骤三:运行SeaTunnel

通过以下命令运行SeaTunnel。

cd "apache-seatunnel-incubating-${version}" ./bin/seatunnel.sh --config ./config/seatunnel.streaming.conf.template -e local

数据同步结束后,您可以通过SeaTunnel控制台查看输出结果,示例如下。

***********************************************                                                                                                                                
           Job Statistic Information                                                                                                                                           
***********************************************                                                                                                                                
Start Time                : 2023-02-22 17:12:19                                                                                                                                
End Time                  : 2023-02-22 17:12:37                                                                                                                                
Total Time(s)             :                  18                                                                                                                                
Total Read Count          :            10000000                                                                                                                                
Total Write Count         :            10000000                                                                                                                                
Total Failed Count        :                   0                                                                                                                                
***********************************************

OSS-HDFS配置说明

使用OssJindoFile将数据输出到OSS-HDFS文件系统。

配置参数

名称

类型

是否必选

说明

默认值

path

string

文件写入路径。

bucket

string

指定OSS-HDFS服务中的Bucket。

access_key

string

用于访问OSS-HDFS服务的AccessKey ID。

access_secret

string

用于访问OSS-HDFS服务的AccessKey Secret。

endpoint

string

用于访问OSS-HDFS服务的Endpoint。

file_format

string

文件类型。支持parquet,orc, json,csv和text。

"csv"

field_delimiter

string

文件数据分隔符,默认与hive一致,仅在写入类型为text文件时生效。

'\001'

row_delimiter

string

文件行分隔符。仅在写入类型为text文件时生效。

"\n"

partition_by

array

分区字段。如果上游数据中具有分区字段,则该配置支持数据根据分区字段目录写入,行为与hive一致。

is_partition_field_write_in_file

boolean

是否将分区字段写入文件中。

false

sink_columns

array

写入文件的字段 。

从Transform中获取的所有列Source

common-options

object

公共参数。更多信息,请参见common-options

配置示例

以下是text格式的配置示例。如果您需要配置为其他格式的配置文件,仅需相应替换以下示例中file_format的值,例如file_format = "csv"

OssJindoFile {
    path="/seatunnel/sink"
    bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
    access_key = "LTAI********"
    access_secret = "KZo1********"
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
    file_format = "text"
  }

相关文档