本文介绍如何通过Lindorm流引擎实现用户自定义标量函数(UDF):将字符串转换为由MD5算法加密后的HexString。

背景信息

Lindorm流引擎可100%兼容Flink SQL。在Flink SQL提供的内置函数不能满足使用需求时,您可通过在流引擎中使用UDF来实现功能的扩展。

前提条件

注意事项

如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。
  • ECS实例与Lindorm实例属于同一专有网络。

操作步骤

  1. 创建Project实现UDF,示例如下。具体语法,请参见Flink UDF实现
    public class MD5HexFunction extends ScalarFunction {
    
        public String eval(String originalString) throws NoSuchAlgorithmException{
            MessageDigest md5 = MessageDigest.getInstance("MD5");
            byte[] md5bytes = md5.digest(originalString.getBytes(StandardCharsets.UTF_8));
            return toHexString(md5bytes);
        }
    
        public String toHexString(byte[] bytes){
            StringBuilder hexString = new StringBuilder();
            for (int i = 0; i < bytes.length; i++) {
                String hex = Integer.toHexString(0xFF & bytes[i]);
                if (hex.length() == 1) {
                    hexString.append('0');
                }
                hexString.append(hex);
            }
            return hexString.toString();
        }
    }
  2. 执行以下命令,将UDF压缩为JAR包。
    mvn clean install
  3. 将JAR包上传至文件引擎。
    ${HADOOP_HOME}/bin/hadoop fs -put <JAR包名称> hdfs://<实例ID>/jars/
    参数说明
    • JAR包名称:压缩后的JAR包名称。
    • 实例ID:Lindorm实例的ID。
  4. 使用Kafka客户端将源数据写入Kafka Topic中。
    1. 创建Kafka Topic。
      ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic user_visits --create
      参数说明

      Lindorm Stream Kafka地址:控制台上获取的流引擎Kafka连接地址。例如ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080。如何获取,请参见查看连接地址

    2. 将源数据写入Kafka Topic中。
      ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic user_visits
      {"id": "user_1", "ip":"192.XXX.X.4", "view_time": "1.75"}
      {"id": "user_2", "ip":"192.XXX.XX.3", "view_time": "0.93"}
      {"id": "user_3", "ip":"192.XXX.XX.7", "view_time": "1.87"}
      {"id": "user_4", "ip":"192.XXX.X.42", "view_time": "1.11"}
      {"id": "user_5", "ip":"192.XXX.X.14", "view_time": "1.43"}
      {"id": "user_6", "ip":"192.XXX.XX.56", "view_time": "0.37"}
  5. 使用宽表引擎创建结果表。
    1. 通过Lindorm-cli连接宽表引擎。连接方法,请参见通过Lindorm-cli连接并使用宽表引擎
    2. 创建结果表,用于保存计算任务的处理结果。
      CREATE TABLE IF NOT EXISTS md5hex (
        md5hex VARCHAR,
        id VARCHAR,
        ip VARCHAR,
        view_time DOUBLE,
      primary key (md5hex, id) );
  6. 通过Lindorm-cli连接流引擎。
    ./lindorm-sqlline -url <Lindorm Stream SQL地址>
    参数说明

    Lindorm Stream SQL地址:控制台上获取的流引擎SQL连接地址。例如jdbc:streamsql:url=http://ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30060。如何获取,请参见查看连接地址

  7. 使用Flink SQL提交流引擎计算任务,读取Kafka Topic中的数据,并将处理结果保存在结果表中。
    CREATE FJOB string_to_md5hex(
        CREATE FUNCTION MD5HexFunction AS 'com.alibaba.lindorm.stream.udf.MD5HexFunction'
        USING JAR 'hdfs:///jars/stream-udf-1.0-SNAPSHOT-jar-with-dependencies.jar';--此处为UDF压缩后的JAR包名称
        --Kafka Source表
        CREATE TABLE uservisits(
            `id` VARCHAR,
            `ip` VARCHAR,
            `view_time` DOUBLE
        )WITH(
            'connector'='kafka',
            'topic'='user_visits',--此处为Kafka Topic
            'scan.startup.mode'='earliest-offset',
            'properties.bootstrap.servers'='ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080'--流引擎Kafka连接地址
            'format'='json');
        CREATE TABLE md5hex(
            `md5hex` VARCHAR,
            `id` VARCHAR,
            `ip` VARCHAR,
            `view_time` DOUBLE,
            PRIMARY KEY (`md5hex`, `id`) NOT ENFORCED
        ) WITH (
            'connector'='lindorm',
            'seedServer'='ld-bp****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'--宽表引擎HBase Java API连接地址
            'userName'='root',
            'password'='root',
            'tableName'='md5hex',
            'namespace'='default');--WITH中为连接器配置
        INSERT INTO md5hex
        SELECT MD5HexFunction(id), id , ip, view_time
        FROM uservisits;
    );
    说明 在流引擎中使用宽表的连接器配置,请参见配置流引擎的宽表连接器
  8. 使用宽表引擎查询处理结果。
    1. 通过Lindorm-cli连接宽表引擎。连接方法,请参见通过Lindorm-cli连接并使用宽表引擎
    2. 查询处理结果。
      SELECT * FROM md5hex;
      查询结果如下:
      +----------------------------------+--------+---------------+-----------+
      |              md5hex              |   id   |      ip       | view_time |
      +----------------------------------+--------+---------------+-----------+
      | 15e1576abc700ddfd9438e6ad1c86100 | user_2 | 192.XXX.XX.3  | 0.93      |
      | 1ab162d05561afc0289e1533693b197c | user_6 | 192.XXX.XX.56 | 0.37      |
      | 3f49044c1469c6990a665f46ec6c0a41 | user_1 | 192.XXX.X.4   | 1.75      |
      | a6f601b7c855d45c6b5b182ab32a67c0 | user_3 | 192.XXX.XX.7  | 1.87      |
      | bb640eb8250ff322567a401240dd6a2e | user_4 | 192.XXX.X.42  | 1.11      |
      | fa890200036e527ebb5cba50e1c0450f | user_5 | 192.XXX.X.14  | 1.43      |
      +----------------------------------+--------+---------------+-----------+