Write data to the OSS-HDFS service using the SeaTunnel integration platform

更新时间:
复制 MD 格式

SeaTunnel is an open-source, easy-to-use, and high-performance distributed data integration platform. It supports real-time synchronization of large amounts of data. This topic describes how to use the SeaTunnel integration platform to write data to the OSS-HDFS service.

Background information

SeaTunnel can synchronize tens of billions of data records stably and efficiently. It focuses on data integration and data synchronization and solves the following common problems:

  • Diverse data sources

    Hundreds of common data sources exist, and their versions are often incompatible with each other. As new technologies emerge, the number of data sources continues to grow. It is difficult for users to find a single tool that can fully and quickly support all of these data sources.

  • Complex synchronization scenarios

    Data synchronization must support multiple scenarios, such as offline full synchronization, offline incremental synchronization, Change Data Capture (CDC), real-time synchronization, and full database synchronization.

  • High resource requirements

    Existing data integration and synchronization tools often require a large amount of computing resources or Java Database Connectivity (JDBC) connections to perform real-time synchronization of a massive number of small files. This increases the operational burden on enterprises.

  • Lack of data monitoring

    Data is often lost or duplicated during data integration and synchronization. The synchronization process often lacks monitoring, which makes it impossible to view the real-time status of data during a task.

  • Complex technology stacks

    Enterprises use various technology components. To achieve data integration, you must develop different synchronization programs for each component.

  • Difficult management and maintenance

    Due to the limitations of different underlying technology components, such as Flink or Spark, offline and real-time synchronization are usually developed and managed separately. This increases management and maintenance complexity.

For more information, see SeaTunnel.

Prerequisites

The OSS-HDFS service is enabled. For more information, see Enable the OSS-HDFS service.

Limits

You can access the OSS-HDFS service only through a virtual private cloud (VPC). When you create the VPC, make sure that it is in the same region as the bucket for which you enabled the OSS-HDFS service.

Step 1: Deploy SeaTunnel

Local deployment

Important

Before you begin, make sure that Java 8 or Java 11 is installed and the JAVA_HOME environment variable is set.

  1. In a terminal, you can run the following command to download and decompress SeaTunnel.

    The following command downloads and decompresses SeaTunnel 2.3.0.

    export version="2.3.0" && wget "https://archive.apache.org/dist/incubator/seatunnel/2.3.0/apache-seatunnel-incubating-2.3.0-bin.tar.gz" && tar -xzvf "apache-seatunnel-incubating-2.3.0-bin.tar.gz"

    For more information about SeaTunnel versions, see Apache SeaTunnel.

  2. Install the connectors.

    Starting from version 2.2.0-beta, binary packages do not provide connector dependencies by default. For version 2.3.0, you can run the following command to install the connectors when you first use SeaTunnel.

    bash bin/install-plugin.sh 2.3.0
    Note

    By default, the ${SEATUNNEL_HOME}/config/plugin.properties configuration file is used to download all connector plug-ins. You can add or remove connectors as needed.

    The following list shows the available connector plug-ins.

       --connectors-v2--
       connector-amazondynamodb
       connector-assert
       connector-cassandra
       connector-cdc-mysql
       connector-cdc-sqlserver
       connector-clickhouse
       connector-datahub
       connector-dingtalk
       connector-doris
       connector-elasticsearch
       connector-email
       connector-file-ftp
       connector-file-hadoop
       connector-file-local
       connector-file-oss
       connector-file-oss-jindo
       connector-file-s3
       connector-file-sftp
       connector-google-sheets
       connector-hive
       connector-http-base
       connector-http-feishu
       connector-http-gitlab
       connector-http-github
       connector-http-jira
       connector-http-klaviyo
       connector-http-lemlist
       connector-http-myhours
       connector-http-notion
       connector-http-onesignal
       connector-http-wechat
       connector-hudi
       connector-iceberg
       connector-influxdb
       connector-iotdb
       connector-jdbc
       connector-kafka
       connector-kudu
       connector-maxcompute
       connector-mongodb
       connector-neo4j
       connector-openmldb
       connector-pulsar
       connector-rabbitmq
       connector-redis
       connector-s3-redshift
       connector-sentry
       connector-slack
       connector-socket
       connector-starrocks
       connector-tablestore
       connector-selectdb-cloud
       connector-hbase
       --end--

Kubernetes (Beta) deployment

Important
  • The Kubernetes (Beta) deployment for SeaTunnel is in a trial phase. The following example uses the Flink engine. This method is not recommended for production environments.

  • Make sure that Docker, Kubernetes, and Helm are installed locally.

  1. Start the cluster.

    The following command starts a cluster that runs Kubernetes 1.23.3.

    minikube start --kubernetes-version=v1.23.3
  2. Use the SeaTunnel runtime image.

    ENV SEATUNNEL_VERSION="2.3.0-beta"
    ENV SEATUNNEL_HOME = "/opt/seatunnel"
    
    RUN mkdir -p $SEATUNNEL_HOME
    
    RUN wget https://archive.apache.org/dist/incubator/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    RUN tar -xzvf apache-seatunnel-incubating-${SEATUNNEL_VERSION}-bin.tar.gz
    
    RUN cp -r apache-seatunnel-incubating-${SEATUNNEL_VERSION}/* $SEATUNNEL_HOME/
    RUN rm -rf apache-seatunnel-incubating-${SEATUNNEL_VERSION}*
    RUN rm -rf $SEATUNNEL_HOME/connectors/seatunnel
  3. Build the image.

    docker build -t seatunnel:2.3.0-beta-flink-1.13 -f Dockerfile .
  4. Load the image to Minikube.

    minikube image load seatunnel:2.3.0-beta-flink-1.13
  5. Install the certificate manager on the Kubernetes cluster.

    To enable the Webhook component, you must install the certificate manager on the Kubernetes cluster. You only need to install the certificate manager once for each Kubernetes cluster.

    kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
  6. Deploy the latest version of the Flink Kubernetes Operator using a Helm chart.

    1. Download the Flink Kubernetes Operator.

      helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.13.0/
    2. Deploy the Flink Kubernetes Operator.

      helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
  7. Verify that kubectl is installed.

    kubectl get pods

    The following output indicates that kubectl is installed.

    NAME                                                   READY   STATUS    RESTARTS      AGE
    flink-kubernetes-operator-5f466b8549-mgchb             1/1     Running   3 (23h ago)   1

Step 2: Set the configuration file

You need to add a configuration file to define how SeaTunnel ingests, processes, and outputs data after it starts. The following is a sample configuration.

env {                                                                                                                                                                          
  # You can set the SeaTunnel environment configuration here                                                                                                                       
  execution.parallelism = 10                                                                                                                                                   
    job.mode = "BATCH"                                                                                                                                                           
    checkpoint.interval = 10000                                                                                                                                                  
    #execution.checkpoint.interval = 10000                                                                                                                                       
    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                          
    }                                                                                                                                                                              


source {                                                                                                                                                                       
  LocalFile {                                                                                                                                                                    
  			path = "/data/seatunnel-2.3.0/testfile/source"                                                                                                                               
  			type = "csv"                                                                                                                                                                 
			delimiter = "#"                                                                                                                                                                
    		schema {                                                                                                                                                                       
    			fields {                                                                                                                                                                   
      				name = string                                                                                                                                                          
      				age = int                                                                                                                                                              
      				gender = string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
						} 
           			}
     		}                                                                                             
}   

      
transform {
    
}
  # In this case, we do not need this function. For more information about how to configure SeaTunnel and to see a full list of sink plug-ins,                                                              
  # go to https://seatunnel.apache.org/docs/category/sink-v2

      
sink {                                                                                                                                                                         
	OssJindoFile {                                                                                                                                                               
      path="/seatunnel/oss03"                                                                                                                                                                                                                                                                                        
      bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                       
      access_key = "yourAccessKeyID"
      access_secret = "yourAccessKeySecret"                                                                                                                                                                                                                                                          
      endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                              
      }

  # For more information about how to configure SeaTunnel and to see a full list of sink plug-ins,                                                              
  # go to https://seatunnel.apache.org/docs/category/sink-v2                                                                                                            
} 

The preceding configuration example includes four modules. The following sections describe each module.

Module

Required

Description

env

Yes

Configures environment variables for the engine.

For more information about the env module, see env.

source

Yes

Defines the data source from which SeaTunnel retrieves data. The retrieved data is used by the next module, transform. You can define multiple data sources at the same time.

For a list of supported data sources, see source.

transform

No

Defines the data processing module. After you define a data source, you may need to process the data further. If you do not need to process the data, you can ignore the transform module. Data is written directly from the source to the sink.

For more information about the transform module, see transform.

sink

Yes

Defines the destination where SeaTunnel writes data. This tutorial uses the OSS-HDFS service as an example.

Step 3: Run SeaTunnel

You can run SeaTunnel using the following command.

cd "apache-seatunnel-incubating-${version}" && ./bin/seatunnel.sh --config ./config/seatunnel.streaming.conf.template -e local

After the data synchronization is complete, you can view the output in the SeaTunnel console. The following output is an example.

***********************************************                                                                                                                                
           Job Statistic Information                                                                                                                                           
***********************************************                                                                                                                                
Start Time                : 2023-02-22 17:12:19                                                                                                                                
End Time                  : 2023-02-22 17:12:37                                                                                                                                
Total Time(s)             :                  18                                                                                                                                
Total Read Count          :            10000000                                                                                                                                
Total Write Count         :            10000000                                                                                                                                
Total Failed Count        :                   0                                                                                                                                
***********************************************

OSS-HDFS configuration details

You can use OssJindoFile to output data to the OSS-HDFS file system.

Configuration parameters

Name

Type

Required

Description

Default value

path

string

Yes

The path to which the file is written.

None

bucket

string

Yes

The bucket in the OSS-HDFS service.

None

access_key

string

Yes

The AccessKey ID used to access the OSS-HDFS service.

None

access_secret

string

Yes

The AccessKey secret used to access the OSS-HDFS service.

None

endpoint

string

Yes

The endpoint used to access the OSS-HDFS service.

None

file_format

string

No

The file type. Supported types are parquet, orc, json, csv, and text.

"csv"

field_delimiter

string

No

The field separator. The default is the same as that of Hive. This parameter takes effect only when you write data to a text file.

'\001'

row_delimiter

string

No

The row delimiter. This parameter takes effect only when you write data to a text file.

"\n"

partition_by

array

No

The partition fields. If the input data contains partition fields, data is written to directories based on these fields. The behavior is the same as that of Hive.

None

is_partition_field_write_in_file

boolean

No

Specifies whether to write the partition fields to the file.

false

sink_columns

array

No

The fields to write to the file.

All source columns from Transform

common-options

object

No

Common parameters. For more information, see common-options.

None

Configuration example

The following example shows the configuration for the text format. To configure a file of a different format, you can replace the value of the `file_format` parameter in the example. For example, you can set file_format = "csv".

OssJindoFile {
    path="/seatunnel/sink"
    bucket = "oss://examplebucket.cn-hangzhou.oss-dls.aliyuncs.com"
    access_key = "yourAccessKeyID"
    access_secret = "yourAccessKeySecret"
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"
    file_format = "text"
  }

References