Use the DataWorks OpenAPI to build and manage a real-time data synchronization pipeline from MySQL to Hologres without using the console. This guide covers the full task lifecycle: create, start, monitor, stop, and delete.
How it works
All five operations in this guide use the DataWorks Java software development kit (SDK). Each step passes the DIJobId returned by CreateDIJob to subsequent API operations.
CreateDIJob → StartDIJob → GetDIJob (poll) → StopDIJob → DeleteDIJob
↓
Returns DIJobId
Prerequisites
Before you begin, make sure you have:
-
A Maven project. See Create a new Maven project.
-
A MySQL data source added to DataWorks. See MySQL data source.
-
A Hologres data source added to DataWorks. See Hologres data source.
-
Environment variables configured for your Alibaba Cloud AccessKey ID and AccessKey secret. See Configure environment variables in Linux, macOS, and Windows.
Set up your Maven project
Add dependencies
Open pom.xml and add the following dependencies:
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dataworks_public20200518</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
<version>0.3.2</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-console</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-util</artifactId>
<version>0.2.21</version>
</dependency>
The dataworks_public20200518 artifact is available in the Maven Central Repository.
Initialize the client
Create a client using your AccessKey credentials and the DataWorks endpoint:
private static final String AK_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String AK_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String ENDPOINT = "dataworks.cn-shanghai.aliyuncs.com";
public static Client createClient() throws Exception {
Config config = new Config();
config.setAccessKeyId(AK_ID);
config.setAccessKeySecret(AK_SECRET);
config.setEndpoint(ENDPOINT);
return new Client(config);
}
Read credentials from environment variables rather than hardcoding them in source code.
Step 1: Create a data integration task
Call CreateDIJob to define the synchronization pipeline. The response returns a DIJobId — save it, because every subsequent operation requires it.
Key parameters for this step:
| Parameter | Description | Example value |
|---|---|---|
ProjectId |
Your DataWorks workspace ID | 3058 |
JobName |
A unique name for the task | "api" + System.currentTimeMillis() |
MigrationType |
Synchronization mode. FullAndRealtimeIncremental performs a full load first, then continuously applies incremental changes (CDC). |
"FullAndRealtimeIncremental" |
SourceDataSourceType |
Source database type | "MySQL" |
DestinationDataSourceType |
Destination database type | "Hologres" |
resourceGroupIdentifier |
The resource group that runs the task. Set the same identifier for both offlineResourceSettings (full load) and realtimeResourceSettings (incremental sync). |
"S_res_group_195820716552192_1695299272182" |
For the full parameter reference, see CreateDIJob.
public static Long createDIJob() throws Exception {
System.out.println("create job started ...");
CreateDIJobRequest request = new CreateDIJobRequest();
// Basic configuration
request.setProjectId(3058L);
request.setJobName("api" + System.currentTimeMillis());
request.setMigrationType("FullAndRealtimeIncremental");
request.setSourceDataSourceType("MySQL");
request.setDestinationDataSourceType("Hologres");
// Source data source
CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting =
new CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
srcDatasourceSetting.setDataSourceName("dw_mysql_online");
Map<String, String> props = new HashMap<>();
props.put("TimeZone", "Asia/Shanghai");
props.put("Encoding", "utf-8");
srcDatasourceSetting.setDataSourceProperties(props);
request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
// Destination data source
CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings =
new CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
dstDatasourceSettings.setDataSourceName("dw_holo_test");
request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));
// Resource group (used for both full load and incremental sync)
CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings =
new CreateDIJobRequest.CreateDIJobRequestResourceSettings();
CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings =
new CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings =
new CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
request.setResourceSettings(resourceSettings);
// Table mappings: select specific tables from the source database
CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping =
new CreateDIJobRequest.CreateDIJobRequestTableMappings();
CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource =
new CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
datasource.setObjectType("Datasource");
datasource.setExpression("dw_mysql_online");
CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database =
new CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
database.setObjectType("Database");
database.setExpression("cx_db_1");
CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table =
new CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
table.setObjectType("Table");
table.setExpression("cx_table_1");
tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
request.setTableMappings(Arrays.asList(tableMapping));
// Execute
CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
return createDIJobResponse.getBody().getDIJobId();
}
Record the returned DIJobId. All subsequent operations reference this ID.
Step 2: Start the task
Call StartDIJob to launch the synchronization. The task first performs a full load from MySQL, then switches to incremental synchronization.
For the full parameter reference, see StartDIJob.
public static void startJob(Long jobId) throws Exception {
StartDIJobRequest start = new StartDIJobRequest();
start.setDIJobId(jobId);
StartDIJobResponse response = createClient().startDIJob(start);
System.out.println(new Gson().toJson(response.getBody()));
}
Step 3 (Optional): Monitor the task
Call GetDIJob to check whether the task is running as expected. Set WithDetails to true to get the full task status.
For the full parameter reference, see GetDIJob.
public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
System.out.println("get job started, jobId=" + jobId);
GetDIJobRequest request = new GetDIJobRequest();
request.setDIJobId(jobId);
request.setWithDetails(detail); // Pass true to retrieve full task details
GetDIJobResponse response = createClient().getDIJob(request);
System.out.println(new Gson().toJson(response.getBody()));
return response.getBody().getData();
}
Poll this operation periodically to verify that data is flowing. A successfully running task returns a status field indicating the active synchronization state.
Step 4: Stop the task
Call StopDIJob to pause the synchronization. Stop the task before deleting it.
For the full parameter reference, see StopDIJob.
public static void stopJob(Long jobId) throws Exception {
System.out.println("stop job started, jobId=" + jobId);
StopDIJobRequest stop = new StopDIJobRequest();
stop.setDIJobId(jobId);
StopDIJobResponse response = createClient().stopDIJob(stop);
System.out.println(new Gson().toJson(response.getBody()));
}
Step 5: Delete the task
After the task is unpublished, call DeleteDIJob to remove it.
For the full parameter reference, see DeleteDIJob.
public static void deleteJob(Long jobId) throws Exception {
System.out.println("delete job started, jobId=" + jobId);
DeleteDIJobRequest request = new DeleteDIJobRequest();
request.setDIJobId(jobId);
DeleteDIJobResponse response = createClient().deleteDIJob(request);
System.out.println(new Gson().toJson(response.getBody()));
}
Complete sample code
The following is the full Project Object Model (POM) configuration and Java sample that covers all five lifecycle operations.
POM dependencies
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dataworks_public20200518</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
<version>0.3.2</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-console</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-util</artifactId>
<version>0.2.21</version>
</dependency>
Java sample
package com.aliyun.sample;
import com.aliyun.tea.*;
import com.aliyun.dataworks_public20200518.*;
import com.aliyun.dataworks_public20200518.models.*;
public class Sample {
// Read credentials from environment variables — never hardcode them
private static final String AK_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String AK_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static final String ENDPOINT = "dataworks.cn-shanghai.aliyuncs.com";
public static Client createClient() throws Exception {
Config config = new Config();
config.setAccessKeyId(AK_ID);
config.setAccessKeySecret(AK_SECRET);
config.setEndpoint(ENDPOINT);
return new Client(config);
}
public static Long createDIJob() throws Exception {
System.out.println("create job started ...");
CreateDIJobRequest request = new CreateDIJobRequest();
// Basic configuration
request.setProjectId(3058L);
request.setJobName("api" + System.currentTimeMillis());
request.setMigrationType("FullAndRealtimeIncremental");
request.setSourceDataSourceType("MySQL");
request.setDestinationDataSourceType("Hologres");
// Source data source
CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
srcDatasourceSetting.setDataSourceName("dw_mysql_online");
Map<String, String> props = new HashMap<>();
props.put("TimeZone", "Asia/Shanghai");
props.put("Encoding", "utf-8");
srcDatasourceSetting.setDataSourceProperties(props);
request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
// Destination data source
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
dstDatasourceSettings.setDataSourceName("dw_holo_test");
request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));
// Resource group
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
request.setResourceSettings(resourceSettings);
// Table mappings
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
datasource.setObjectType("Datasource");
datasource.setExpression("dw_mysql_online");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
database.setObjectType("Database");
database.setExpression("cx_db_1");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table =
new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
table.setObjectType("Table");
table.setExpression("cx_table_1");
tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
request.setTableMappings(Arrays.asList(tableMapping));
// Execute
com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse =
createClient().createDIJob(request);
System.out.println("create job finished, response is...");
System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
System.out.println("DIJobId: " + createDIJobResponse.getBody().getDIJobId());
return createDIJobResponse.getBody().getDIJobId();
}
public static void startJob(Long jobId) throws Exception {
StartDIJobRequest start = new StartDIJobRequest();
start.setDIJobId(jobId);
StartDIJobResponse response = createClient().startDIJob(start);
System.out.println("start job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
System.out.println("get job started, jobId=" + jobId);
GetDIJobRequest request = new GetDIJobRequest();
request.setDIJobId(jobId);
request.setWithDetails(detail);
GetDIJobResponse response = createClient().getDIJob(request);
System.out.println("get job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
return response.getBody().getData();
}
public static void stopJob(Long jobId) throws Exception {
System.out.println("stop job started, jobId=" + jobId);
StopDIJobRequest stop = new StopDIJobRequest();
stop.setDIJobId(jobId);
StopDIJobResponse response = createClient().stopDIJob(stop);
System.out.println("stop job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static void deleteJob(Long jobId) throws Exception {
System.out.println("delete job started, jobId=" + jobId);
DeleteDIJobRequest request = new DeleteDIJobRequest();
request.setDIJobId(jobId);
DeleteDIJobResponse response = createClient().deleteDIJob(request);
System.out.println("delete job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static void main(String[] args) throws Exception {
try {
// Step 1: Create the task
Long jobId = createDIJob();
// Step 2: Start the task
startJob(jobId);
// Step 3: Monitor progress
Thread.sleep(100000);
getJob(jobId, true);
// Step 4: Stop the task
stopJob(jobId);
Thread.sleep(10000);
// Step 5: Delete the task
deleteJob(jobId);
} catch (Exception e) {
e.printStackTrace();
}
}
}