Deliver data to OSS using an SDK

更新时间:
复制 MD 格式

Before you use a software development kit (SDK) for data shipping, review the notes, interfaces, and other information about the data lake delivery feature. After you create a delivery task, Tablestore automatically delivers data from the data table to an Object Storage Service (OSS) bucket for storage.

Usage notes

  • Data delivery is available in the China (Hangzhou), China (Shanghai), China (Beijing), China (Shenzhen), and China (Zhangjiakou) regions.

  • The delete operation on Tablestore data is ignored when the data is delivered. Tablestore data delivered to OSS is not deleted when you perform a delete operation on the data.

  • It takes at most one minute for initialization when you create a delivery task.

  • Latencies are within 3 minutes when data is written at a steady rate. The P99 latency is within 10 minutes when data is synchronized.

    Note

    The P99 latency indicates the average latency of the slowest 1% of requests over the previous 10 seconds.

Prerequisites

  • You have completed the following operations in Object Storage Service:

    Activate Object Storage Service (OSS) and create a bucket in the same region as your Tablestore instance. For more information, see Activate OSS.

    Note

    Data lake delivery supports shipping data to any OSS bucket in the same region as the Tablestore instance. To deliver data to other data warehouses, such as MaxCompute, submit a ticket.

  • The following operations in the Tablestore service are complete:

    • Obtain the instance endpoint from the instance details page. For more information, see Endpoints.

    • A Tablestore data table has been created. For more information, see Create a data table.

  • Perform the following operations in the Resource Access Management (RAM) service:

    • Create a Resource Access Management (RAM) user and grant the AliyunOTSFullAccess permission to the RAM user to manage Tablestore. For more information, see Create a RAM user and Manage the permissions of a RAM user.

      Warning

      If the AccessKey of your Alibaba Cloud account is leaked, all your cloud resources are at risk. We recommend that you use the AccessKey of a RAM user for operations to reduce security risks.

      Create an AccessKey for the RAM user. For more information, see Create an AccessKey.

  • Configure access credentials. For more information, see Configure access credentials.

Interfaces

Interface

Description

CreateDeliveryTask

Creates a delivery task.

ListDeliveryTask

Lists all delivery tasks for a data table.

DescribeDeliveryTask

Queries the description of a delivery task.

DeleteDeliveryTask

Deletes a delivery task.

Parameters

Parameter

Description

tableName

The name of the data table.

taskName

The name of the delivery task.

The name can contain only lowercase letters (a-z), digits, and hyphens (-). It must start and end with a lowercase letter or a digit. The length must be 3 to 16 characters.

taskConfig

The configuration of the delivery task. It includes the following options:

  • ossPrefix: The directory prefix in the OSS Bucket where Tablestore delivers data. The delivery path supports five time variables: $yyyy, $MM, $dd, $HH, and $mm.

    • When you use time variables in the delivery path, OSS directories can be dynamically generated based on the data write time. This lets you partition data by time in the hive partition naming style and organize files in OSS accordingly.

    • If you do not use time variables in the delivery path, all files are delivered to a fixed OSS prefix directory.

  • ossBucket: The name of the OSS Bucket.

  • ossEndpoint: The endpoint of the region where the OSS Bucket is located.

  • ossStsRole: The Alibaba Cloud Resource Name (ARN) of the service-linked role for Tablestore.

  • format: The delivered data is stored in the Parquet column store format. Default value: Parquet.

    By default, PLAIN is used to encode all types of data for delivery.

    Currently, only the Parquet format is supported. The default configuration is sufficient for most use cases.

  • eventTimeColumn: The event time column. This parameter partitions data based on the time in a specified column. You must specify the column name and its time format (EventTimeFormat). Valid values for EventTimeFormat are RFC822, RFC850, RFC1123, RFC3339, and Unix. Specify the format based on the actual time format of your data.

    If this parameter is not set, data is partitioned based on the write time.

  • parquetSchema: The data columns to deliver. You must manually configure the source table fields, destination fields, and destination field types.

    While you can write fields to IMCI files in any order and use any names, the column order in the Schema array determines how the column store data is arranged in OSS.

    Important

    The field type of the delivered data must match the field type of the data source. Otherwise, the data is discarded as dirty data. For more information about field type mappings, see Data format mapping.

taskType

The type of the delivery task. It includes the following options:

  • INC: Incremental data delivery mode. Only incremental data is synchronized.

  • BASE: Full data delivery mode. A one-time full table scan is performed for data synchronization.

  • BASE_INC: differential data delivery. After full data is synchronized, Tablestore synchronizes incremental data.

    During incremental data synchronization, you can check the latest delivery time and the current delivery status.

Usage

You can use the Java SDK or Go SDK to implement the data lake delivery feature. This section uses the Java SDK as an example to describe the data lake delivery operations.

The following example shows how to create a delivery task for a data table.

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {

        public static void main(String[] args) {
            // Replace yourInstanceName with the name of your instance.
            final String instanceName = "yourInstanceName";
            // Replace yourEndpoint with the endpoint of your instance.
            final String endPoint = "yourEndpoint";
            // Get the AccessKey ID and AccessKey secret from environment variables.
            final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");            
            final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");

            SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            try {
                createDeliveryTask(client);
                System.out.println("end");
            } catch (TableStoreException e) {
                System.err.println("Operation failed. Details: " + e.getMessage() + e.getErrorCode() + e.toString());
                System.err.println("Request ID:" + e.getRequestId());
            } catch (ClientException e) {
                System.err.println("Request failed. Details: " + e.getMessage());
            } finally {
                client.shutdown();
            }
        }

        private static void createDeliveryTask(SyncClient client){
            String tableName = "sampleTable";
            String taskName = "sampledeliverytask";
            OSSTaskConfig taskConfig = new OSSTaskConfig();
            taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
            taskConfig.setOssBucket("datadeliverytest");
            taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
            taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
            // eventColumn is an optional configuration. It specifies partitioning based on the time in a specific data column. If this parameter is not set, data is partitioned based on the time it is written to Tablestore.
            EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
            taskConfig.setEventTimeColumn(eventColumn);
            taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
            taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
            taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
            CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
            request.setTableName(tableName);
            request.setTaskName(taskName);
            request.setTaskConfig(taskConfig);
            request.setTaskType(DeliveryTaskType.BASE_INC);
            CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
            System.out.println("requestID: "+ response.getRequestId());
            System.out.println("traceID: " + response.getTraceId());
            System.out.println("create delivery task success");
        }
}