SeaTunnel是一个开源、易用的超高性能分布式数据集成平台,支持海量数据的实时同步。本文介绍如何通过SeaTunnel集成平台将数据写入OSS-HDFS服务。
背景信息
SeaTunnel可稳定高效地同步百亿级数据,专注于数据集成和数据同步,主要解决数据集成领域的以下问题。
数据源多样
常用的数据源有数百种,版本不兼容。随着新技术的出现,可能出现更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。
复杂同步场景
数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。
资源需求高
现有的数据集成和数据同步工具往往需要大量的计算资源或JDBC连接资源来完成海量小文件的实时同步,这在一定程度上加重了企业的负担。
缺乏数据监控
数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况。
技术栈复杂
企业使用的技术组件各不相同,您需要针对不同的组件开发相应的同步程序来完成数据集成。
管理和维护困难
受限于不同的底层技术组件(Flink或者Spark),通常单独开发和管理离线同步和实时同步,增加了管理和维护的难度。
更多信息,请参见SeaTunnel。
前提条件
已开通OSS-HDFS服务。具体步骤,请参见开通并授权访问OSS-HDFS服务。
使用限制
仅允许通过专有网络VPC的方式访问OSS-HDFS服务。创建专有网络VPC时,需确保创建的VPC与待开启OSS-HDFS服务的Bucket位于相同的地域。
步骤一:部署SeaTunnel
本地部署
执行以下步骤前,您需要确保已安装Java 8或者Java 11并设置JAVA_HOME。
在终端通过以下命令下载并解压SeaTunnel。
以下以下载并解压2.3.0版本的SeaTunnel为例。
export version="2.3.0" && wget "https://www.apache.org/dyn/closer.lua/incubator/seatunnel/2.3.0/apache-seatunnel-incubating-2.3.0-bin.tar.gz" && tar -xzvf "apache-seatunnel-incubating-2.3.0-bin.tar.gz"
关于SeaTunnel的版本信息,请参见Apache SeaTunnel。
安装connector。
自2.2.0-beta版本开始,二进制包默认不提供connector依赖。以2.3.0版本为例,您在首次使用SeaTunnel时,需要执行以下命令安装connector。
sh bin/install_plugin.sh 2.3.0
说明${SEATUNNEL_HOME}/config/plugin.properties配置文件默认下载全部connector插件,您可以结合业务需求适当增减connector。
connector插件列表如下:
--connectors-v2-- connector-amazondynamodb connector-assert connector-cassandra connector-cdc-mysql connector-cdc-sqlserver connector-clickhouse connector-datahub connector-dingtalk connector-doris connector-elasticsearch connector-email connector-file-ftp connector-file-hadoop connector-file-local connector-file-oss connector-file-oss-jindo connector-file-s3 connector-file-sftp connector-google-sheets connector-hive connector-http-base connector-http-feishu connector-http-gitlab connector-http-github connector-http-jira connector-http-klaviyo connector-http-lemlist connector-http-myhours connector-http-notion connector-http-onesignal connector-http-wechat connector-hudi connector-iceberg connector-influxdb connector-iotdb connector-jdbc connector-kafka connector-kudu connector-maxcompute connector-mongodb connector-neo4j connector-openmldb connector-pulsar connector-rabbitmq connector-redis connector-s3-redshift connector-sentry connector-slack connector-socket connector-starrocks connector-tablestore connector-selectdb-cloud connector-hbase --end--
Kubernetes(Beta)部署
通过Kubernetes(Beta)部署SeaTunnel目前处于试运行阶段。以下以Flink引擎为例,不推荐在生产环境中使用。
确保已在本地安装Docker,Kubernetes以及Helm。
启动集群。
以Kubernetes 1.23.3版本为例,您可以使用以下命令启动集群。
minikube start --kubernetes-version=v1.23.3
使用SeaTunnel运行镜像。
ENV SEATUNNEL_VERSION="2.3.0-beta" ENV SEATUNNEL_HOME = "/opt/seatunnel" RUN mkdir -p $SEATUNNEL_HOME RUN wget https://archive.apache.org/dist/incubator/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz RUN tar -xzvf apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz RUN cp -r apache-seatunnel-incubating-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/ RUN rm -rf apache-seatunnel-incubating-${SEATUNNEL_VERSION}* RUN rm -rf $SEATUNNEL_HOME/connectors/seatunnel
构建镜像。
docker build -t seatunnel:2.3.0-beta-flink-1.13 -f Dockerfile .
将图像加载至minikube。
minikube image load seatunnel:2.3.0-beta-flink-1.13
在Kubernetes集群上安装证书管理器。
在Kubernetes集群上安装证书管理器以启用Webhook组件,每个Kubernetes集群只需要安装一次证书管理器。
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
使用Helm图表部署最新的Flink Kubernetes Operator版本。
下载Flink Kubernetes Operator。
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/
部署Flink Kubernetes Operator。
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
验证kubectl是否已成功安装。
kubectl get pods
返回以下结果说明已成功安装kubectl。
NAME READY STATUS RESTARTS AGE flink-kubernetes-operator-5f466b8549-mgchb 1/1 Running 3 (23h ago) 1
步骤二:设置配置文件
通过添加配置文件,确定SeaTunnel启动后数据输入、处理和输出的方式和逻辑。配置示例如下:
env {
# You can set SeaTunnel environment configuration here
execution.parallelism = 10
job.mode = "BATCH"
checkpoint.interval = 10000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
LocalFile {
path = "/data/seatunnel-2.3.0/testfile/source"
type = "csv"
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
}
}
transform {
}
# In this case we don't need this function. If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
sink {
OssJindoFile {
path="/seatunnel/oss03"
bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
access_key = "LTAI5t7h6SgiLSganP2m****"
access_secret = "KZo149BD9GLPNiDIEmdQ7d****"
endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
}
# If you would like to get more information about how to configure Seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
以上配置示例包含四个模块,详细说明如下。
模块 | 是否必选 | 说明 |
env | 是 | 用于配置引擎的环境变量。 关于env的更多信息,请参见env。 |
source | 是 | 用于定义SeaTunnel需要获取的数据源,并将获取的数据用于下一个模块transform,支持同时定义多个数据源。 关于支持的数据源列表,请参见source。 |
transform | 否 | 用于定于数据处理模块。当定义了数据源后,可能还需要对数据做进一步的处理。如果您不需要做数据处理,可以直接忽略transform模块,数据将直接从source写入sink。 关于transform的更多信息,请参见transform。 |
sink | 是 | 用于定义SeaTunnel将数据写入的目标端,本教程以写入OSS-HDFS服务为例。
|
步骤三:运行SeaTunnel
通过以下命令运行SeaTunnel。
cd "apache-seatunnel-incubating-${version}" ./bin/seatunnel.sh --config ./config/seatunnel.streaming.conf.template -e local
数据同步结束后,您可以通过SeaTunnel控制台查看输出结果,示例如下。
***********************************************
Job Statistic Information
***********************************************
Start Time : 2023-02-22 17:12:19
End Time : 2023-02-22 17:12:37
Total Time(s) : 18
Total Read Count : 10000000
Total Write Count : 10000000
Total Failed Count : 0
***********************************************
OSS-HDFS配置说明
使用OssJindoFile将数据输出到OSS-HDFS文件系统。