如果您在大数据计算服务MaxCompute SQL中要对数据进行加解密,可采用UDF结合KMS实例的方式来实现。本文介绍如何在MaxCompute UDF中使用KMS实例对数据进行加解密。
方案概述
- 在加密流程中,在UDF初始化阶段完成KMS实例Client初始化并生成数据密钥,在数据处理阶段使用数据密钥进行信封加密。对于大数量的数据加密建议通过凭据获取数据密钥的方式,减少数据密钥生成的数量。 
- 在解密流程中,在UDF初始化阶段完成KMS实例Client初始化和线程池、Cache初始化,在数据处理阶段对数据密钥密文进行解密,使用解密后得到的数据密钥明文对数据字段进行解密。 
- 加密数据字段存储类型为字符型,存储格式:[数据密钥长度|固定长度4][数据密钥密文][原始字段加密后密文] 
由于MaxCompute UDF并发度很高,因此不建议您在UDF中通过密钥对目标数据直接加密,建议采用信封加密即通过KMS实例生成数据密钥,使用数据密钥对目标数据加密。更多信封加密信息,请参见使用KMS密钥进行信封加密。
前置条件
- 提前购置KMS软件密钥实例或者硬件密钥实例。具体操作,请参见购买和启用KMS实例。 
- 创建用户主密钥。由于信封加密仅支持GCM加密模式,所以在创建主密钥时密钥类型只能选择对称密钥。关于密钥规格以及加密模式的详细信息,请参见密钥管理类型和密钥规格。 
- 创建接入点并保存ClientKey,获取实例CA证书。具体操作,请参见创建应用接入点、获取KMS实例CA证书。 
代码样例
如果您对性能有要求,建议使用Java样例的SynchronousKeyEncryptUDF。
java样例
1、依赖安装
<dependencies>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-udf</artifactId>
            <version>0.48.6-public</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>alibabacloud-kms-java-sdk</artifactId>
            <version>1.2.5</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun</groupId>
                    <artifactId>tea</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.2.3</version>
        </dependency>
</dependencies>2、定义工具类UDFUtils.java
package com.aliyun.kms.sample;
import com.aliyun.tea.utils.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class UDFUtils {
    private UDFUtils() {
        // do noting
    }
    public static boolean isNull(String value) {
        return StringUtils.isEmpty(value) || "\\N".equals(value);
    }
    public static String getSynchronousKeyVersion(String udfIdentify, String keyIdentify) {
        UUID uuid = null;
        byte[] bytes = null;
        if (StringUtils.isEmpty(keyIdentify)) {
            bytes = udfIdentify.getBytes(StandardCharsets.UTF_8);
            uuid = UUID.nameUUIDFromBytes(bytes);
            return uuid.toString();
        }
        bytes = (udfIdentify + "/" + keyIdentify).getBytes(StandardCharsets.UTF_8);
        uuid = UUID.nameUUIDFromBytes(bytes);
        return uuid.toString();
    }
    public static <K, V> Map<K, V> getLRUMap(int capacity) {
        return new LRUMap<K, V>(capacity);
    }
    private static class LRUMap<K, V> extends LinkedHashMap<K, V> {
        private final int capacity;
        private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private final Lock readLock = readWriteLock.readLock();
        private final Lock writeLock = readWriteLock.writeLock();
        public LRUMap(int capacity) {
            super(capacity, 0.75f, true);
            this.capacity = capacity;
        }
        @Override
        public V get(Object key) {
            readLock.lock();
            try {
                return super.get(key);
            } finally {
                readLock.unlock();
            }
        }
        @Override
        public V put(K key, V value) {
            writeLock.lock();
            try {
                return super.put(key, value);
            } finally {
                writeLock.unlock();
            }
        }
        @Override
        protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
            return size() > capacity;
        }
    }
}
方法说明
| 方法名称 | 功能说明 | 
| isNull | 判断空字符串。 | 
| getSynchronousKeyVersion | 获取版本号(UDF标识+函数入参Version)。 | 
3、定义常量类UDFConstant.java
常量类定义配置文件参数名称,参数值请参见配置变量。
package com.aliyun.kms.sample;
public class UDFConstant {
    private UDFConstant() {
        // do nothing
    }
    public static final String KMS_UDF_CONF_NAME = "kms_udf_conf.properties";
    public static final String UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId";
    public static final String KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file";
    public static final String KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password";
    public static final String KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file";
    public static final String KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint";
    public static final String KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name";
    public static final String KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name";
    public static final String KMS_ENDPOINT_KEY = "udf.kms.endpoint";
    public static final String ENCRYPT_DATA_KEY_FORMAT = "%04d";
    public static final int GCM_IV_LENGTH = 12;
    public static final int GCM_IV_BASE64_LENGTH = 16;
    public static final int GCM_TAG_LENGTH = 16;
}
参数说明
| 参数名称 | 参数说明 | 
| KMS_UDF_CONF_NAME | 配置文件名称 | 
| UDF_ENCRYPT_KEY_ID_KEY | KMS密钥ID | 
| KMS_CLIENT_KEY_FILE_KEY | ClientKey身份凭证内容 | 
| KMS_CLIENT_KEY_PASSWORD_KEY | ClientKey身份凭证口令 | 
| KMS_INSTANCE_CA_FILE_KEY | 实例CA证书 | 
| KMS_INSTANCE_ENDPOINT_KEY | KMS实例的接入点endpoint | 
| KMS_SYNCHRONOUS_SECRET_NAME_KEY | 同步凭据名称 | 
| KMS_RAM_SECRET_NAME_KEY | RAM凭据名称 | 
| KMS_ENDPOINT_KEY | 共享网关接入点endpoint | 
4、加密UDF
- 对于数据量大的表建议使用 - com.aliyun.kms.sample.SynchronousKeyEncryptUDF类作为高性能加密UDF。
- 对于数据量不大的表,用户UDF并行度低可以使用 - com.aliyun.kms.sample.EncryptUDF类作为加密UDF。
SynchronousKeyEncryptUDF示例
SynchronousKeyEncryptUDF针对数据量大、并行度高的情况,借助KMS凭据管理将Version和数据密钥作为凭据存入KMS,避免同一个Version下多密钥的情况,减少密钥的生成数量,从而提高性能。数据密钥生成流程图如下:
- 创建凭据xxxxSynchronousSecret - 在KMS控制台或调用KMS API创建通用凭据,凭据名称为xxxxSynchronousSecret,凭据值为任意值。UDF用其作为密钥的存储容器,对应UDF配置项udf.synchronous.secret.name。更多关于创建凭据的信息,请参见管理及使用通用凭据。 
- 创建RAM凭据xxxxRamSecret - 在KMS控制台或调用KMS API创建RAM凭据,凭据值为调用共享网关KMS服务账号的AK/SK,UDF用其作为调用OpenAPI的身份凭证。对应UDF配置项udf.ram.secret.name。更多关于创建RAM凭据、创建KMS服务账号的AK/SK的信息,请参见管理及使用RAM凭据、创建AccessKey。凭据值示例如下: - { "AccessKeyId":"LTAI****************", "AccessKeySecret":"yourAccessKeySecret" }
- 定义加密实现类SynchronousKeyEncryptUDF.java - package com.aliyun.kms.sample; import com.aliyun.kms.kms20160120.TransferClient; import com.aliyun.kms.kms20160120.model.KmsConfig; import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.*; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import com.aliyun.tea.TeaException; import com.aliyun.tea.utils.StringUtils; import com.google.gson.Gson; import org.apache.commons.codec.binary.Base64; import javax.crypto.Cipher; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; public class SynchronousKeyEncryptUDF extends UDF { private static Gson gson = new Gson(); private static Client instanceClient; private static com.aliyun.kms.kms20160120.Client shareClient; private static Map<String, DataKeyObject> dataKeyObjectMap; private static Base64 base64 = new Base64(); private String runTaskIdentify; private String keySecretName; private String keyId; private Properties properties = new Properties(); @Override public void setup(ExecutionContext ctx) throws UDFException, IOException { super.setup(ctx); try { InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME); properties.load(in); // 数据密钥生成维度,如果按照项目使用getRunningProject方法,如果按照表使用getTableInfo方法, runTaskIdentify = ctx.getRunningProject(); dataKeyObjectMap = new ConcurrentHashMap<>(); keySecretName = properties.getProperty(UDFConstant.KMS_SYNCHRONOUS_SECRET_NAME_KEY); keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY); if (StringUtils.isEmpty(keySecretName)) { throw new UDFException("the secret name is null"); } buildInstanceClient(ctx); checkSecretExist(); buildShareClient(); } catch (Exception e) { throw new UDFException(e); } } public String evaluate(String keyIdentify, String data) { if (UDFUtils.isNull(data)) { return data; } byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); byte[] iv = null; byte[] cipherTextBytes = null; try { String keyVersion = UDFUtils.getSynchronousKeyVersion(runTaskIdentify, keyIdentify); if (!dataKeyObjectMap.containsKey(keyVersion)) { dataKeyObjectMap.put(keyVersion, getDataKeyObject(keyVersion)); } DataKeyObject dataKeyObject = dataKeyObjectMap.get(keyVersion); iv = new byte[UDFConstant.GCM_IV_LENGTH]; SecureRandom random = new SecureRandom(); random.nextBytes(iv); Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); SecretKeySpec keySpec = new SecretKeySpec(dataKeyObject.plaintextBytes, "AES"); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv); cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec); cipherTextBytes = cipher.doFinal(dataBytes); return dataKeyObject.encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private DataKeyObject getDataKeyObject(String keyVersion) throws Exception { try { return getDataKeyObjectByGetSecretValue(keyVersion); } catch (TeaException e) { if ("Forbidden.ResourceNotFound".equals(e.code)) { return buildDataKeyObject(keyVersion); } } return null; } private DataKeyObject getDataKeyObjectByGetSecretValue(String keyVersion) throws Exception { String secretData = getSecretValue(keyVersion); DataKeyPersistentObject dataKeyPersistentObject = gson.fromJson(secretData, DataKeyPersistentObject.class); byte[] plaintextBytes = base64.decode(dataKeyPersistentObject.plaintext); return new DataKeyObject(plaintextBytes, dataKeyPersistentObject.encryptedDataKeyPart); } private DataKeyObject buildDataKeyObject(String keyVersion) throws Exception { GenerateDataKeyRequest request = new GenerateDataKeyRequest(); request.setKeyId(keyId); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GenerateDataKeyResponse response = instanceClient.generateDataKeyWithOptions(request, runtimeOptions); String plaintext = response.getBody().plaintext; byte[] plaintextBytes = base64.decode(plaintext); String encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob; DataKeyObject dataKeyObject = new DataKeyObject(plaintextBytes, encryptedDataKeyPart); DataKeyPersistentObject dataKeyPersistentObject = new DataKeyPersistentObject(plaintext, encryptedDataKeyPart); PutSecretValueRequest putSecretValueRequest = new PutSecretValueRequest(); putSecretValueRequest.secretData = dataKeyPersistentObject.toJsonString(); putSecretValueRequest.secretName = keySecretName; putSecretValueRequest.versionId = keyVersion; try { shareClient.putSecretValue(putSecretValueRequest); } catch (TeaException e) { if ("Rejected.ResourceExist".equals(e.code)) { return getDataKeyObjectByGetSecretValue(keyVersion); } } return dataKeyObject; } @Override public void close() throws UDFException, IOException { super.close(); } private void buildInstanceClient(ExecutionContext ctx) throws Exception { try { KmsConfig config = new KmsConfig(); String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY); byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName); config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8)); String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY); byte[] caFileContent = ctx.readResourceFile(caFileName); config.setCa(new String(caFileContent, StandardCharsets.UTF_8)); config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY)); config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY)); instanceClient = new TransferClient(config); } catch (Exception e) { throw new UDFException(e); } } private void checkSecretExist() throws Exception { getSecretValue(null); } private String getSecretValue(String versionId) throws Exception { GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(keySecretName); if (!StringUtils.isEmpty(versionId)) { getSecretValueRequest.setVersionId(versionId); } KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); return getSecretValueResponse.body.secretData; } private void buildShareClient() throws Exception { String ramSecretName = properties.getProperty(UDFConstant.KMS_RAM_SECRET_NAME_KEY); GetSecretValueRequest getSecretValueRequest = new GetSecretValueRequest(); getSecretValueRequest.setSecretName(ramSecretName); KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions(); GetSecretValueResponse getSecretValueResponse = instanceClient.getSecretValueWithOptions(getSecretValueRequest, runtimeOptions); AccessKeyObject accessKeyObject = gson.fromJson(getSecretValueResponse.body.secretData, AccessKeyObject.class); com.aliyun.teaopenapi.models.Config shareConfig = new com.aliyun.teaopenapi.models.Config() .setAccessKeyId(accessKeyObject.AccessKeyId) .setAccessKeySecret(accessKeyObject.AccessKeySecret) .setEndpoint(properties.getProperty(UDFConstant.KMS_ENDPOINT_KEY)); shareClient = new com.aliyun.kms.kms20160120.Client(shareConfig); } private static class AccessKeyObject implements Serializable { private String AccessKeyId; private String AccessKeySecret; } private final static class DataKeyObject { private byte[] plaintextBytes; private String encryptedDataKeyPart; public DataKeyObject() { } public DataKeyObject(byte[] plaintextBytes, String encryptedDataKeyPart) { this.plaintextBytes = plaintextBytes; this.encryptedDataKeyPart = encryptedDataKeyPart; } } private final static class DataKeyPersistentObject implements Serializable { private String plaintext; private String encryptedDataKeyPart; public DataKeyPersistentObject() { } public DataKeyPersistentObject(String plaintext, String encryptedDataKeyPart) { this.plaintext = plaintext; this.encryptedDataKeyPart = encryptedDataKeyPart; } public String toJsonString() { return gson.toJson(this); } } }- 主要方法说明- 方法名称 - 功能说明 - setup - 读取配置文件,完成KMS实例SDK 和阿里云SDK初始化以及校验初始凭据是否存在。 - buildInstanceClient - 初始化KMS实例SDK。 - buildShareClient - 获取RAM凭据中存储的AK完成阿里云SDK初始化。 - getDataKeyObjectByGetSecretValue - 获取凭据中存储的密钥。 - buildDataKeyObject - 生成新密钥并存放在凭据中。 - evaluate - 加密数据,输出加密结果。 说明- 使用的KMS API如下: - 实例API:GenerateDataKey生成数据密钥、GetSecretValue获取凭据值。 
- OpenAPI:PutSecretValue存入凭据值。 
 
EncryptUDF示例
定义加密UDF实现类EncryptUDF.java
package com.aliyun.kms.sample;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.GenerateDataKeyRequest;
import com.aliyun.kms20160120.models.GenerateDataKeyResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Properties;
public class EncryptUDF extends UDF {
    private static Client client;
    private static String plaintext;
    private static byte[] plaintextBytes;
    private static String encryptedDataKeyPart;
    private static Base64 base64 = new Base64();
    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent, StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            String keyId = properties.getProperty(UDFConstant.UDF_ENCRYPT_KEY_ID_KEY);
            client = new TransferClient(config);
            GenerateDataKeyRequest request = new GenerateDataKeyRequest();
            request.setKeyId(keyId);
            KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
            GenerateDataKeyResponse response = client.generateDataKeyWithOptions(request, runtimeOptions);
            plaintext = response.getBody().plaintext;
            plaintextBytes = base64.decode(plaintext);
            encryptedDataKeyPart = String.format(UDFConstant.ENCRYPT_DATA_KEY_FORMAT, response.getBody().ciphertextBlob.length()) + response.getBody().ciphertextBlob;
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }
    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
        byte[] iv = null;
        byte[] cipherTextBytes = null;
        try {
            iv = new byte[UDFConstant.GCM_IV_LENGTH];
            SecureRandom random = new SecureRandom();
            random.nextBytes(iv);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(plaintextBytes, "AES");
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec);
            cipherTextBytes = cipher.doFinal(dataBytes);
            return encryptedDataKeyPart + base64.encodeAsString(iv) + base64.encodeAsString(cipherTextBytes);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }
}
主要方法说明
| 方法名称 | 功能说明 | 
| setup | 读取配置文件,完成KMS实例SDK初始化,生成数据密钥密文。 | 
| evaluate | 加密数据,输出加密结果。 | 
5、解密UDF
定义解密实现类DecryptUDF.java
package com.aliyun.kms.sample;
import com.aliyun.dkms.gcs.openapi.models.Config;
import com.aliyun.kms.kms20160120.TransferClient;
import com.aliyun.kms.kms20160120.model.KmsConfig;
import com.aliyun.kms.kms20160120.model.KmsRuntimeOptions;
import com.aliyun.kms20160120.Client;
import com.aliyun.kms20160120.models.DecryptRequest;
import com.aliyun.kms20160120.models.DecryptResponse;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
public class DecryptUDF extends UDF {
    private static Client client;
    private static Map<String, byte[]> dataKeyMap;
    private static final int BASE_STEP=4;
    private static Base64 base64 = new Base64();
    @Override
    public void setup(ExecutionContext ctx) throws UDFException, IOException {
        super.setup(ctx);
        try {
            InputStream in = ctx.readResourceFileAsStream(UDFConstant.KMS_UDF_CONF_NAME);
            Properties properties = new Properties();
            properties.load(in);
            KmsConfig config = new KmsConfig();
            String clientKeyFileName = properties.getProperty(UDFConstant.KMS_CLIENT_KEY_FILE_KEY);
            byte[] clientKeyFileContent = ctx.readResourceFile(clientKeyFileName);
            config.setClientKeyContent(new String(clientKeyFileContent, StandardCharsets.UTF_8));
            String caFileName = properties.getProperty(UDFConstant.KMS_INSTANCE_CA_FILE_KEY);
            byte[] caFileContent = ctx.readResourceFile(caFileName);
            config.setCa(new String(caFileContent,StandardCharsets.UTF_8));
            config.setPassword(properties.getProperty(UDFConstant.KMS_CLIENT_KEY_PASSWORD_KEY));
            config.setEndpoint(properties.getProperty(UDFConstant.KMS_INSTANCE_ENDPOINT_KEY));
            dataKeyMap = UDFUtils.getLRUMap(1000);
            client = new TransferClient(config);
        } catch (Exception e) {
            throw new UDFException(e);
        }
    }
    public String evaluate(String data) {
        if (UDFUtils.isNull(data)) {
            return data;
        }
        if (data.length() <= BASE_STEP) {
            return data;
        }
        int dataLength = Integer.valueOf(data.substring(0, 4));
        if (data.length() <= BASE_STEP + dataLength) {
            return data;
        }
        String dataKeyCiphertext = data.substring(4, 4 + dataLength);
        try {
            byte[] dataKeyBytes = getDataKeyPlaintext(dataKeyCiphertext);
            Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
            SecretKeySpec keySpec = new SecretKeySpec(dataKeyBytes, "AES");
            byte[] iv = base64.decode(data.substring(4 + dataLength, 4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            byte[] cipherText = base64.decode(data.substring(4 + dataLength + UDFConstant.GCM_IV_BASE64_LENGTH));
            GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(UDFConstant.GCM_TAG_LENGTH * 8, iv);
            cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec);
            byte[] decryptedData = cipher.doFinal(cipherText);
            return new String(decryptedData, "UTF-8");
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
    private byte[] getDataKeyPlaintext(String dataKeyCiphertext) throws Exception {
        if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
            synchronized (dataKeyCiphertext.intern()) {
                DecryptRequest request = new DecryptRequest();
                request.ciphertextBlob = dataKeyCiphertext;
                KmsRuntimeOptions runtimeOptions = new KmsRuntimeOptions();
                DecryptResponse response = client.decryptWithOptions(request, runtimeOptions);
                if (!dataKeyMap.containsKey(dataKeyCiphertext)) {
                    dataKeyMap.put(dataKeyCiphertext, base64.decode(response.getBody().plaintext));
                }
            }
        }
        return dataKeyMap.get(dataKeyCiphertext);
    }
    @Override
    public void close() throws UDFException, IOException {
        super.close();
    }
}
主要方法说明
| 方法名称 | 功能说明 | 
| setup | 读取配置文件,完成KMS实例SDK初始化。 | 
| getDataKeyPlaintext | 获取数据密钥明文。 | 
| evaluate | 使用数据密钥明文解密数据,输出解密结果。 | 
6、创建UDF SQL
- 创建UDF SQL前要将UDF实现类打成的jar包资源添加至MaxCompute项目中。 
- 本文创建加密函数encrypt_udf、高性能加密函数sync_key_encrypt_udf、解密函数decrypt_udf三个函数。您可根据需要删减函数,删减函数只需删除对应的JAR和CREATE语句。 
创建语句如下所示:
# 将UDF实现类打成的jar包资源添加至MaxCompute项目中
ADD JAR encrypt_udf.jar;
ADD JAR sync_key_encrypt_udf.jar;
ADD JAR decrypt_udf.jar;
# 将依赖的配置文件资源添加至MaxCompute项目中
ADD FILE kms_udf_conf.properties;
# 将依赖的kms实例clientkey文件资源添加至MaxCompute项目中
ADD FILE clientkey_xxxx.json;
# 将依赖的kms实例ca证书文件资源添加至MaxCompute项目中
ADD FILE PrivateKmsCA_xxxx.pem;
# 创建UDF加密函数encrypt_udf
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 创建UDF高性能加密函数sync_key_encrypt_udf
CREATE FUNCTION sync_key_encrypt_udf AS 'com.aliyun.kms.sample.SynchronousKeyEncryptUDF'
USING 'sync_key_encrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 创建UDF解密函数decrypt_udf 
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.jar,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
参数说明
| 实现类 | jar包名称 | UDF函数名称 | 功能说明 | 
| EncryptUDF.java | encrypt_udf.jar | encrypt_udf | 加密函数 | 
| SynchronousKeyEncryptUDF.java | sync_key_encrypt_udf.jar | sync_key_encrypt_udf | 高性能加密函数 | 
| DecryptUDF.java | decrypt_udf.jar | decrypt_udf | 解密函数 | 
7、配置变量
UDF实现类所需变量统一存放在配置文件kms_udf_conf.properties,高性能函数增加了凭据相关配置。
高性能配置文件
#设置变量
#设置密钥Id
udf.kms.keyId=key-xxxxxxxxxxxx
#设置client key文件名
udf.kms.clientkey.file=clientkey_xxxx.json
#设置client key密码
udf.kms.clientkey.password=xxxxxxxx
#设置实例的ca证书
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#设置实例的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com
#设置同步凭据名称
udf.synchronous.secret.name=xxxxSynchronousSecretName
#设置ram凭据名称
udf.ram.secret.name=xxxxRamSecret
#设置共享网关endpoint
udf.kms.endpoint=kms-vpc.cn-xxx.aliyuncs.com增加了凭据相关配置:同步凭据名称、RAM凭据名称、共享网关endpoint。
普通配置文件
#设置变量
#设置密钥Id
udf.kms.keyId=key-xxxxxxxxxxxx
#设置client key文件名
udf.kms.clientkey.file=clientkey_xxxx.json
#设置client key密码
udf.kms.clientkey.password=xxxxxxxx
#设置实例的ca证书
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#设置实例的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com8、函数使用
--使用UDF函数加密数据
SELECT encrypt_udf(column_name) FROM my_table;
--使用UDF函数加密数据,version为凭据版本,默认填null
SELECT sync_key_encrypt_udf(version,column_name) FROM my_table;
--使用UDF函数解密数据
SELECT decrypt_udf(column_name) FROM my_table;Python样例
1、依赖安装requirements.txt
pyodps==0.11.6.2
alibabacloud-kms-python-sdk==1.1.32、定义工具类kms_udf_utils.py
# coding=utf-8
import uuid
from collections import OrderedDict
def is_null(value):
    return len(value) == 0 or value == '\\N'
def generate_uuid(key_version):
    return uuid.uuid5(uuid.NAMESPACE_DNS, key_version)
def get_config_value(config_dict, dict_key):
    if dict_key not in config_dict:
        raise ValueError("can not find key[{}]" % dict_key)
    return config_dict[dict_key]
def properties_to_dict(cache_file):
    config_dict = {}
    for line in cache_file:
        line = line.strip()
        if not line:
            continue
        k, v = line.split("=")
        config_dict[k.strip()] = v.strip()
    return config_dict
def object_to_dict(obj):
    return obj.__dict__
class LRUCache(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()
    def get(self, key):
        if key not in self.cache:
            return -1
        value = self.cache.pop(key)  # Remove the item to update the order
        self.cache[key] = value  # Re-insert it to the end
        return value
    def put(self, key, value):
        if key in self.cache:
            self.cache.pop(key)  # Remove the existing entry to update the order
        elif len(self.cache) == self.capacity:
            self.cache.popitem(last=False)  # Remove the least recently used item
        self.cache[key] = value
    def contains_key(self, key):
        return key in self.cache
3、定义常量类kms_udf_constant.py
# coding=utf-8
UDF_CONFIG_CACHE_FILE = "kms_udf_conf.properties"
KMS_CLIENT_KEY_FILE_KEY = "udf.kms.clientkey.file"
KMS_CLIENT_KEY_PASSWORD_KEY = "udf.kms.clientkey.password"
KMS_INSTANCE_ENDPOINT_KEY = "udf.kms.instance.endpoint"
KMS_INSTANCE_CA_FILE_KEY = "udf.kms.instance.ca.file"
KMS_SYNCHRONOUS_SECRET_NAME_KEY = "udf.synchronous.secret.name"
KMS_RAM_SECRET_NAME_KEY = "udf.ram.secret.name"
KMS_REGION_KEY = "udf.kms.regionId"
UDF_ENCRYPT_KEY_ID_KEY = "udf.kms.keyId"
ENCRYPT_DATA_KEY_FORMAT = "%04d"
GCM_IV_LENGTH = 12
GCM_TAG_LENGTH = 16
DEFAULT_NUMBER_OF_BYTES = 32参数说明
| 参数名称 | 参数说明 | 
| UDF_CONFIG_CACHE_FILE | 配置文件名称 | 
| KMS_CLIENT_KEY_FILE_KEY | SDK身份凭证内容 | 
| KMS_CLIENT_KEY_PASSWORD_KEY | SDK身份凭证口令 | 
| KMS_INSTANCE_ENDPOINT_KEY | KMS实例的接入点endpoint | 
| KMS_INSTANCE_CA_FILE_KEY | 实例CA证书 | 
| KMS_SYNCHRONOUS_SECRET_NAME_KEY | 同步凭据名称 | 
| KMS_RAM_SECRET_NAME_KEY | RAM凭据名称 | 
| KMS_ENDPOINT_KEY | 共享网关接入点endpoint | 
| UDF_ENCRYPT_KEY_ID_KEY | KMS密钥ID | 
4、加密UDF
定义加密实现类encrypt_udf.py
# coding=utf-8
import base64
from alibabacloud_kms20160120.models import GenerateDataKeyRequest
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import random
import string
import kms_udf_constant
import kms_udf_utils
class EncryptUDF(object):
    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        resp = self.generate_data_key()
        self.encrypted_data_key_part = (kms_udf_constant.ENCRYPT_DATA_KEY_FORMAT %
                                        len(resp.body.ciphertext_blob) + resp.body.ciphertext_blob)
        self.plaintext = resp.body.plaintext
        self.ciphertext_blob = resp.body.ciphertext_blob
    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        iv = bytes(''.join(random.sample(string.ascii_letters + string.digits, kms_udf_constant.GCM_IV_LENGTH)),
                   encoding="utf-8")
        data_bytes = data.encode("utf-8")
        encryptor = Cipher(
            algorithms.AES(base64.b64decode(self.plaintext)),
            modes.GCM(iv),
        ).encryptor()
        ciphertext = encryptor.update(data_bytes) + encryptor.finalize()
        return self.encrypted_data_key_part + str(base64.b64encode(iv), "utf-8") + str(
            base64.b64encode(ciphertext + encryptor.tag), "utf-8")
    def generate_data_key(self):
        request = GenerateDataKeyRequest()
        request.key_id = self.keyId
        request.number_of_bytes = kms_udf_constant.DEFAULT_NUMBER_OF_BYTES
        runtime = KmsRuntimeOptions(
            ca=self.ca_file_path
        )
        return self.instance_client.generate_data_key_with_options(request, runtime)
5、解密UDF
定义解密实现类decrypt_udf.py
# coding=utf-8
import base64
from alibabacloud_kms20160120 import models as kms_20160120_models
from alibabacloud_kms_kms20160120.models import KmsConfig, KmsRuntimeOptions
from alibabacloud_kms_kms20160120.transfer_client import TransferClient
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from odps.distcache import get_cache_file
import kms_udf_constant
import kms_udf_utils
from kms_udf_utils import LRUCache
class DecryptUDF(object):
    def __init__(self):
        cache_file = get_cache_file(kms_udf_constant.UDF_CONFIG_CACHE_FILE)
        self.config_dict = kms_udf_utils.properties_to_dict(cache_file)
        cache_file.close()
        kms_config = KmsConfig(
            protocol='https',
            client_key_file=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_FILE_KEY),
            password=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_CLIENT_KEY_PASSWORD_KEY),
            endpoint=kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_ENDPOINT_KEY),
        )
        self.ca_file_path = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.KMS_INSTANCE_CA_FILE_KEY)
        self.keyId = kms_udf_utils.get_config_value(self.config_dict, kms_udf_constant.UDF_ENCRYPT_KEY_ID_KEY)
        self.instance_client = TransferClient(kms_config=kms_config)
        self.data_key_cache = LRUCache(1000)
    def evaluate(self, data):
        if kms_udf_utils.is_null(data):
            return data
        if len(data) < 4:
            return data
        data_key_len = int(data[0:4])
        if len(data) < 4 + data_key_len:
            return data
        data_key_ciphertext = data[4:4 + data_key_len]
        if not self.data_key_cache.contains_key(data_key_ciphertext):
            decrypt_request = kms_20160120_models.DecryptRequest(
                ciphertext_blob=data_key_ciphertext
            )
            decrypt_runtime = KmsRuntimeOptions(
                ca=self.ca_file_path
            )
            decrypt_response = self.instance_client.decrypt_with_options(decrypt_request, decrypt_runtime)
            self.data_key_cache.put(data_key_ciphertext, base64.b64decode(decrypt_response.body.plaintext))
        data_key = self.data_key_cache.get(data_key_ciphertext)
        envelope_cipher_bytes = base64.b64decode(data[4 + data_key_len:])
        iv = envelope_cipher_bytes[0:kms_udf_constant.GCM_IV_LENGTH]
        cipher_bytes = envelope_cipher_bytes[
                   kms_udf_constant.GCM_IV_LENGTH:len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH]
        tag = envelope_cipher_bytes[len(envelope_cipher_bytes) - kms_udf_constant.GCM_TAG_LENGTH:]
        decryptor = Cipher(algorithms.AES(data_key), modes.GCM(iv, tag)).decryptor()
        return str(decryptor.update(cipher_bytes) + decryptor.finalize())
6、创建UDF SQL
本文创建加密函数encrypt_udf、解密函数decrypt_udf两个函数。您可根据需要删减函数,删减函数只需删除对应的JAR和CREATE语句。
# 将以上程序及requirements.txt相关依赖加入资源
ADD PY decrypt_udf.py;
ADD PY encrypt_udf.py;
# 将依赖的配置文件资源添加至MaxCompute项目中
ADD FILE kms_udf_conf.properties;
# 将依赖的kms实例clientkey文件资源添加至MaxCompute项目中
ADD FILE clientkey_xxxx.json;
# 将依赖的kms实例ca证书文件资源添加至MaxCompute项目中
ADD FILE PrivateKmsCA_xxxx.pem;
# 创建加密UDF函数
CREATE FUNCTION encrypt_udf AS 'com.aliyun.kms.sample.EncryptUDF'
USING 'encrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';
# 创建解密UDF函数
CREATE FUNCTION decrypt_udf AS 'com.aliyun.kms.sample.DecryptUDF'
USING 'decrypt_udf.py,kms_udf_conf.properties,clientkey_xxxx.json,PrivateKmsCA_xxxx.pem';参数说明
| 依赖名称 | UDF函数名称 | 功能说明 | 
| encrypt_udf.py | encrypt_udf | 加密函数 | 
| decrypt_udf.py | decrypt_udf | 解密函数 | 
7、配置变量
UDF实现类所需变量统一存放在配置文件kms_udf_conf.properties。
#设置变量
#设置密钥Id
udf.kms.keyId=key-xxxxxxxxxxxx
#设置client key文件名
udf.kms.clientkey.file=clientkey_xxxx.json
#设置client key密码
udf.kms.clientkey.password=xxxxxxxx
#设置实例的ca证书
udf.kms.instance.ca.file=PrivateKmsCA_xxxx.pem
#设置实例的endpoint
udf.kms.instance.endpoint=xxxxx.cryptoservice.kms.aliyuncs.com8、函数使用
-- 使用UDF函数加密数据
SELECT encrypt_udf(column_name) FROM my_table;
-- 使用UDF函数解密数据
SELECT decrypt_udf(column_name) FROM my_table;