Flink Connector 使用
Connector 准备
参考管理自定义连接器,上传igraph Connector的jar包。
上传成功如图所示
Propoerties详解
名称 | 类型 | 是否必须 | 描述 |
endpoint | string | 是 | igraph实例对应的endpoint |
username | string | 是 | igraph请求的username |
password | string | 是 | igraph请求的password |
graph_name | string | 是 | 要写入的igraph图名 |
label_name | string | 是 | 要写入的igraph边或者节点名 |
pk_field | string | 是 | igraph sink表的pk字段 |
sk_field | string | 否 | igraph sink表的sk字段 |
cmd_field | string | 是 | igraph sink表的cmd字段,取值为add、delete,用于表示这条记录是新增或删除请求 |
request_retry | int | 否,默认0 | 请求igraph的重试次数,默认为0 |
dry_run | boolean | 否,默认false | 如果dry_run为true,只做测试,不会真实写入igraph |
async | boolean | 否,默认为 false | 打开会使用异步模式,写入性能会有提升 |
Flink SQL demo
节点(kv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'cmd_field' = 'cmd',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
边(kkv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey VARCHAR
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'sk_field' = 'skey',
'cmd_field' = 'add',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
需要注意内容:
pk_field为写入igraph表的主键,必填字段
cmd_field取值为add、delete,表示是插入或删除请求。如果是add请求,默认需要传入表的全部字段,如果是update请求默认需要全部字段,如果需要支持部分字段的更新连续管理员配置
如果对边进行delete操作,只填写pkey 则默认删除这个起点出发的全部边,如果填写pkey和skey则只删除一条边数据
graph_name 为写入igraph的图名
label_name 为写入igraph图的节点或者边的label
request_retry,为了避免网络抖动等原因,设置写入igraph的重试请求次数
dry_run,为true时用作测试,会打log,不写igraph。