Use the AnalyticDB for MySQL Client to efficiently write data to a 2.0 cluster

更新时间:
复制 MD 格式

This topic describes how to use the AnalyticDB for MySQL Client to efficiently write data to a 2.0 cluster programmatically.

Background information

The AnalyticDB for MySQL Client provides an efficient and simple way to insert data into an AnalyticDB for MySQL 2.0 cluster. You can submit data to the AnalyticDB for MySQL Client interface to insert it directly into the AnalyticDB for MySQL 2.0 cluster. When you use the AnalyticDB for MySQL Client, you do not need to manage partition aggregation or connection pools. This provides you with more control over real-time data writing and removes the dependency on services such as DataHub.

Maven repositories

Use the following Maven dependency to manage the software development kit (SDK) version:

<dependency>
  <groupId>com.alibaba.cloud.analyticdb</groupId>
  <artifactId>adbclient</artifactId>
  <version>1.0.2</version>
</dependency>

API reference

DatabaseConfig class

API name

Description

setHost(String adbHost)

Sets the hostname or domain name of the AnalyticDB for MySQL 2.0 cluster to connect to.

setPort(int port)

Sets the port of the AnalyticDB for MySQL 2.0 cluster to connect to.

setDatabase(String database)

Sets the database name in the AnalyticDB for MySQL 2.0 cluster to connect to.

setUser(String username)

Sets the username to connect to the AnalyticDB for MySQL 2.0 cluster. Use your AccessKey ID.

For more information about how to obtain an AccessKey ID, see Accounts and Authorizations.

setPassword(String pwd)

Sets the password to connect to the AnalyticDB for MySQL 2.0 cluster. Use your AccessKey secret.

setTable(List<String> table)

Sets a list of table names to write to. Use lowercase table names.

setColumns(String tableName, List<String> columnList)

Sets the field names for a table. To insert data into all fields, use columnList.add("*"). You must set lowercase field names for all tables in the list. Otherwise, the check fails.

setIngnoreInsertError(boolean isIngnoreInsertError)

For all configured tables, specifies whether to ignore errors during insertion. The default is false.

setInsertIgnore(boolean insertIgnore)

For all configured tables, specifies whether to use the insert ignore into statement based on your business scenario.

setEmptyAsNull(boolean emptyAsNull)

For all configured tables, sets empty data to Null. The default is true.

setParallelNumber(int parallelNumber)

For all configured tables, sets the number of concurrent threads for writing to the AnalyticDB for MySQL 2.0 cluster. The default is 4.

setLogger(Logger logger)

Sets the logger object for the client. This uses slf4j.Logger.

setRetryTimes(int retryTimes)

Sets the number of retries when an exception occurs while writing to the AnalyticDB for MySQL 2.0 cluster during a commit. The default is 0.

setRetryIntervalTime(long retryIntervalTime)

Sets the retry interval in milliseconds (ms). The default is 0.

setCommitSize(long commitSize)

Sets the SQL length in bytes for autocommit. The default is 32 KB. We do not recommend changing this setting.

setInsertWithColumnName(boolean insertWithColumnName)

Specifies whether to include column names when constructing the INSERT SQL statement. The default is true. For example, insert into tableName (col1,col2) values ('value1','value2');. If set to false, the statement is similar to insert into tableName values ('value1','value2');. In this case, adding a field to the table might cause write failures. This setting only takes effect when the column list is set to *.

Row class

Interface name

Description

setColumn(int index, Object value)

Sets a value in the Row field list. You must confirm the field order. With this method, Row instances cannot be reused. Each record must use a separate Row instance.

setColumnValues(List<Object> values)

Writes a data row in List format directly to the Row.

updateColumn(int index, Object value)

Updates a value in the Row field list. With this method, Row instances can be reused. Simply update the data within the Row instance.

AdbClient class

Interface name

Description

addRow(String tableName, Row row) / addRows(String tableName, List<Row> rows)

Inserts data in Row format into the corresponding table. Data is cached in the SDK's memory and aggregated by partition. If the SQL length limit is reached, an autocommit is triggered during the addRow or addMap call. If the autocommit fails, the caller must handle the exception, which contains a list of the failed data.

addMap(String tableName, Map<String, String> dataMap) / addMaps(String tableName, List<Map<String, String>> dataMaps)

Similar to addRow, this method supports writing data in map format. If the SQL length limit is reached, an autocommit is triggered during the addRow or addMap call. If the autocommit fails, the caller must handle the exception, which contains a list of the failed data.

addStrictMap(String table, Map<String, String> dataMap) / addStrictMaps(String tableName, List<Map<String, String>> dataMaps)

Similar to addMap or addMaps, but the keys in the input Map must be table fields. If a key is not a field, the client retrieves the table schema again. If the key is still not found, an error is reported. The other functionalities are the same as those of addMap or addMaps. We do not recommend that you use this interface if there is no dynamic schema evolution, because it may affect performance.

commit()

Commits the cached data to the AnalyticDB for MySQL 2.0 cluster. If the commit fails, an exception that contains the failed SQL statements is thrown. The caller must handle this exception.

TableInfo getTableInfo(String tableName)

Retrieves the schema information for the specified table.

List<ColumnInfo> getColumnInfo(String tableName)

Retrieves the list of fields for the specified table. The field class is ColumnInfo. You can use columnInfo.isNullable() to check if a field can be null.

Connection getConnection() throws SQLException

Retrieves a mysql Connection from the client's connection pool. The caller can use the obtained Connection for non-insert operations, in the same way as a Java Database Connectivity (JDBC) connection.

Important

You must release the resources (such as ResultSet, Statement, and Connection) after use.

ColumnInfo class

API Name

Description

boolean isNullable()

Checks if the field can be null.

AdbClientException error codes

Error code

Error code

Description

SQL_LENGTH_LIMIT

100

The SQL length exceeds the limit, which is 32 KB by default.

COMMIT_ERROR_DATA_LIST

101

An exception occurred for some data during the commit. The failed data is returned. You can get the list of failed data (List<String>) using e.getErrData(). This error can occur during addMap(s), addRow(s), and commit operations. You must handle this error code separately.

COMMIT_ERROR_OTHER

102

Other exceptions during the commit.

ADD_DATA_ERROR

103

An exception occurred while adding data.

CREATE_CONNECTION_ERROR

104

An exception occurred while creating a connection.

CLOSE_CONNECTION_ERROR

105

An exception occurred while closing a connection.

CONFIG_ERROR

106

A configuration error occurred in DatabaseConfig.

STOP_ERROR

107

An error occurred while stopping the instance.

OTHER

999

Default exception error code.

Sample code

public class AdbClientUsage {
        public void demo() {
            DatabaseConfig databaseConfig = new DatabaseConfig();
            // The hostname or URL of the AnalyticDB for MySQL 2.0 cluster.
            databaseConfig.setHost("100.100.100.100");
            // The port number of the AnalyticDB for MySQL 2.0 cluster.
            databaseConfig.setPort(8888);
            // The AccessKey ID of the account for the AnalyticDB for MySQL 2.0 cluster.
            databaseConfig.setUser("your db username");
            // The AccessKey secret of the account for the AnalyticDB for MySQL 2.0 cluster.
            databaseConfig.setPassword("your db password");
            databaseConfig.setDatabase("your db name");
            // Set the list of table names to write to.
            List<String> tables = new ArrayList<String>();
            tables.add("your table name");
            tables.add("your table name 2");
            // After a new Client instance is created, the table configuration cannot be modified.
            databaseConfig.setTable(tables);

            // Set the table fields to write to.
            List<String> columns = new ArrayList<String>();
            columns.add("column1");
            columns.add("column2");
            // To write to all fields, use columns.add("*").
            databaseConfig.setColumns("your table name", columns);
            databaseConfig.setColumns("your table name 2", Collections.singletonList("*"));

            // Specifies whether to skip failed insertions.
            // Ignoring insertion failures may cause data loss.
            databaseConfig.setIgnoreInsertError(false);
              // If a column value is empty, it is set to null.
            databaseConfig.setEmptyAsNull(true);
            // Use the insert ignore into method for insertion.
            databaseConfig.setInsertIgnore(true);
            // Retry 3 times if an exception occurs when writing to the AnalyticDB for MySQL 2.0 cluster during a commit.
            databaseConfig.setRetryTimes(3);
            // The retry interval is 1s (1000 ms).
            databaseConfig.setRetryIntervalTime(1000);
            // Initialize the AdbClient. After the instance is initialized, the DatabaseConfig settings cannot be changed.
            AdbClient adbClient = new AdbClient(databaseConfig);

            // Batch data and add it in multiple parts before committing.
            for (int i = 0; i < 10; i++) {
                // Add row(s) to buffer. One instance for one record.
                Row row = new Row(columns.size());
                // Set column.
                // The column index must be the same as the sequence of columns.
                // The column value can be any type. It will be formatted internally according to the column type.
                row.setColumn(0, i); // Number value
                row.setColumn(1, "string value"); // String value
                // If the SQL length limit is reached, an autocommit is triggered during addRow or addMap.
                // If the commit fails, an AdbClientException with the error code COMMIT_ERROR_DATA_LIST is returned.
                adbClient.addRow("your table name", row);
            }
            Row row = new Row();
            row.setColumn(0, 10); // Number value
            row.setColumn(1, "2018-01-01 08:00:00"); // The data type of the second parameter is Date, Timestamp, or Time value.
            adbClient.addRow("your table name 2", row);
            // Update column. The Row instance can be reused.
            row.updateColumn(0, 11);
            row.updateColumn(1, "2018-01-02 08:00:00");
            adbClient.addRow("your table name 2", row);

            // Add the map to the cache.
            Map<String, String> rowMap = new HashMap<String, String>();
            rowMap.put("column1", "124");
            rowMap.put("column2", "string value");
            // Batch data. It is best to add data multiple times before committing.
            adbClient.addMap("your table name", rowMap);

            // Commit the cached data to the AnalyticDB for MySQL 2.0 cluster.
            // After the data is successfully committed, the cached data is purged.
            try {
                adbClient.commit();
            } catch (Exception e) {
            } finally {
                adbClient.stop();
            }
        }
    }

Precautions

  • The AnalyticDB for MySQL Client depends on druid(1.1.10), mysql-connector-java(5.1.45), commons-lang3(3.4), slf4j-api(1.7.25), and slf4j-log4j12(1.7.25). If a version conflict occurs, check the versions of these packages and resolve the conflict.

  • The AnalyticDB for MySQL Client SDK is not thread-safe. In a multi-threaded environment, each thread must maintain its own Client object.

    Note

    We strongly recommend that you do not share an SDK instance across multiple threads. This can cause thread safety issues and create a performance bottleneck at the client.

  • Data is considered successfully written to AnalyticDB for MySQL only after the commit call succeeds.

  • When the client throws an exception, the caller must determine how to handle the exception based on the error code. If a data writing issue occurs, you can retry the submission or log the problematic data and then skip it.

  • Using more write threads does not always improve performance. Because business applications often involve batching data, memory consumption can be significant. The caller must monitor the application's garbage collection (GC) status.

  • Avoid setting a small data batch size, because this reduces the effectiveness of partition-based aggregate writes. You can estimate the number of messages per batch using the following formula: Number of messages * Message length ≈ Number of partitions * 32 KB.

  • The DatabaseConfig settings cannot be changed after new AdbClient is called successfully. All configuration items must be set before the Client object is initialized.