本文介绍如何通过Lindorm流引擎实现用户自定义标量函数(UDF):将字符串转换为由MD5算法加密后的HexString。
背景信息
Lindorm流引擎可100%兼容Flink SQL。在Flink SQL提供的内置函数不能满足使用需求时,您可通过在流引擎中使用UDF来实现功能的扩展。
前提条件
- 已下载开源Kafka客户端。如何下载,请参见通过开源Kafka客户端写入Lindorm流引擎数据。
- 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单。
- 已获取流引擎的SQL连接地址和Kafka连接地址。如何获取,请参见查看连接地址。
- 已获取宽表引擎的SQL连接地址和HBase Java API连接地址。如何获取,请参见查看连接地址。
注意事项
如果您的应用部署在ECS实例,且想要通过专有网络访问Lindorm实例,则需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
- 所在地域相同,并建议所在可用区相同(以减少网络延时)。
- ECS实例与Lindorm实例属于同一专有网络。
操作步骤
- 创建Project实现UDF,示例如下。具体语法,请参见Flink UDF实现。
public class MD5HexFunction extends ScalarFunction { public String eval(String originalString) throws NoSuchAlgorithmException{ MessageDigest md5 = MessageDigest.getInstance("MD5"); byte[] md5bytes = md5.digest(originalString.getBytes(StandardCharsets.UTF_8)); return toHexString(md5bytes); } public String toHexString(byte[] bytes){ StringBuilder hexString = new StringBuilder(); for (int i = 0; i < bytes.length; i++) { String hex = Integer.toHexString(0xFF & bytes[i]); if (hex.length() == 1) { hexString.append('0'); } hexString.append(hex); } return hexString.toString(); } }
- 执行以下命令,将UDF压缩为JAR包。
mvn clean install
- 将JAR包上传至文件引擎。
参数说明${HADOOP_HOME}/bin/hadoop fs -put <JAR包名称> hdfs://<实例ID>/jars/
- JAR包名称:压缩后的JAR包名称。
- 实例ID:Lindorm实例的ID。
- 使用Kafka客户端将源数据写入Kafka Topic中。
- 使用宽表引擎创建结果表。
- 通过Lindorm-cli连接流引擎。
参数说明./lindorm-sqlline -url <Lindorm Stream SQL地址>
Lindorm Stream SQL地址:控制台上获取的流引擎SQL连接地址。例如
jdbc:streamsql:url=http://ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30060
。如何获取,请参见查看连接地址 - 使用Flink SQL提交流引擎计算任务,读取Kafka Topic中的数据,并将处理结果保存在结果表中。
CREATE FJOB string_to_md5hex( CREATE FUNCTION MD5HexFunction AS 'com.alibaba.lindorm.stream.udf.MD5HexFunction' USING JAR 'hdfs:///jars/stream-udf-1.0-SNAPSHOT-jar-with-dependencies.jar';--此处为UDF压缩后的JAR包名称 --Kafka Source表 CREATE TABLE uservisits( `id` VARCHAR, `ip` VARCHAR, `view_time` DOUBLE )WITH( 'connector'='kafka', 'topic'='user_visits',--此处为Kafka Topic 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='ld-bp****-proxy-stream.lindorm.rds.aliyuncs.com:30080'--流引擎Kafka连接地址 'format'='json'); CREATE TABLE md5hex( `md5hex` VARCHAR, `id` VARCHAR, `ip` VARCHAR, `view_time` DOUBLE, PRIMARY KEY (`md5hex`, `id`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-bp****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020'--宽表引擎HBase Java API连接地址 'userName'='root', 'password'='root', 'tableName'='md5hex', 'namespace'='default');--WITH中为连接器配置 INSERT INTO md5hex SELECT MD5HexFunction(id), id , ip, view_time FROM uservisits; );
说明 在流引擎中使用宽表的连接器配置,请参见配置流引擎的宽表连接器。 - 使用宽表引擎查询处理结果。