Submit data import tasks using Dolphin (public preview)

更新时间:
复制 MD 格式

This topic describes how to submit a data import task using Dolphin (public preview).

Add data sources

Before you use Dolphin, you must add a compute engine data source (a Spark data source), a Lindorm wide table data source, and a source data source (an ODPS data source or an HDFS data source).

Log on to Dolphin

Activate the Dolphin service and log on to Dolphin.

Create a project

  1. In the top navigation bar, click Project Management.

  2. On the Project Management page, click Create Project and specify the required parameters.

  3. Click OK, and then click the project name to open its management page.

Create a workflow and submit the task

  1. In the left-side navigation pane, select Workflow Definition.

  2. Click Create Workflow.

  3. From the Common Components section, drag the SHELL component to the canvas on the right and then edit it.

  4. Enter a node name. In the Script text box, enter the task submission script. The following code is an example:

    #!/bin/bash
    # The LTS URL for task submission. Replace the value with your LTS instance ID.
    MASTER_URL="http://ld-bp1i13ko5e222****-master3-001.lindorm.rds.aliyuncs.com:12311/pro/proc/"
    # Define the API endpoint for job submission.
    SUBMIT_URL="${MASTER_URL}bulkload/create"
    echo "Submit URL: $SUBMIT_URL"
    # Define the payload for the job submission.
    # The reader configuration. This example uses MaxCompute as the source.
    READERCONFIG='{
    		"partition":[
    			"pt=20250212"
    		],
    		"numPartitions":20,
    		"column":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"table":"test_table_10c"
        }'
    # The writer configuration.
    WRITERCONFIG='{
    		"columns":[
    			"rowkey",
    			"stringcol1",
    			"longcol1",
    			"doublecol1"
    		],
    		"namespace":"default",
    		"lindormTable":"test_table_10c_mig",
    		"compression":"zstd"
        }'
    # The number of compute engine executors for the task.
    INSTANCE='5'
    # SRC specifies the name of the ODPS data source. You must first add the data source on the LTS page. DST specifies the name of the Lindorm wide table data source.
    SRC='odps'
    DST='ld-bp1i13ko5e222****'
    # The specifications for the compute engine executor and driver.
    DRIVERSPEC='large'
    EXECUTORSPEC='xlarge'
    # The source file type. This parameter is optional if the source is MaxCompute.
    FILETTYPE='Parquet'
    create_payload() {
        local result=""
        local first=true
        for arg in "$@"; do
            if [[ "$first" == true ]]; then
                result="$arg"
                first=false
            else
                result="$result&$arg"
            fi
        done
        echo "$result"
    }
    PAYLOAD=$(create_payload "src=$SRC" "dst=$DST" "readerConfig=$READERCONFIG" "writerConfig=$WRITERCONFIG" "instance=$INSTANCE" "driverSpec=$DRIVERSPEC" "executorSpec=$EXECUTORSPEC" "fileType=$FILETTYPE")
    echo "PAYLOAD: $PAYLOAD"
    # Submit the job.
    submit_response=$(curl -d "$PAYLOAD" -H "Content-Type: application/x-www-form-urlencoded" -X POST "$SUBMIT_URL")
    echo $submit_response
    # Parse the submission response to extract the success status and message.
    success=$(echo $submit_response | grep -o '"success":"[^"]*' | cut -d'"' -f4)
    message=$(echo $submit_response | grep -o '"message":"[^"]*' | cut -d'"' -f4)
    # Check if the job was submitted successfully.
    if [[ "$success" != "true" ]]; then
        echo "Job submission failed: $message"
        exit 1
    fi
    # Parse the submission response to extract the job ID.
    job_id=$message
    echo "JobId: $job_id"
    STATUS_URL="${MASTER_URL}${job_id}/info"
    echo $STATUS_URL
    # Check the job status.
    while true; do
        # Get the job status.
        status_response=$(curl --silent --request GET "$STATUS_URL")
        # Parse the status response to extract the state.
        state=$(echo $status_response | sed 's/.*"state":"\([^"]*\)".*/\1/')
        # Print the current state.
        echo "Current job state: $state"
        # Check if the job is complete.
        if [[ $state == "SUCCESS" ]]; then
            echo "Job completed successfully."
            exit 0
        elif [[ $state == "FAILED" ]]; then
            echo "Job failed."
            exit 1
        elif [[ $state == "KILLED" ]]; then
            echo "Job killed."
            exit 1
        fi
        # Check the status again after 60 seconds.
        sleep 60
    done