Flink Connector 使用
Connector 准备
参考管理自定义连接器,上传igraph Connector的jar包。
上传成功后即可在查看Connector页面看到已配置的属性列表。
Propoerties详解
|
名称 |
类型 |
是否必须 |
描述 |
|
endpoint |
string |
是 |
igraph实例对应的endpoint |
|
username |
string |
是 |
igraph请求的username |
|
password |
string |
是 |
igraph请求的password |
|
graph_name |
string |
是 |
要写入的igraph图名 |
|
label_name |
string |
是 |
要写入的igraph边或者节点名 |
|
pk_field |
string |
是 |
igraph sink表的pk字段 |
|
sk_field |
string |
否 |
igraph sink表的sk字段 |
|
cmd_field |
string |
是 |
igraph sink表的cmd字段,取值为add、delete,用于表示这条记录是新增或删除请求 |
|
request_retry |
int |
否,默认0 |
请求igraph的重试次数,默认为0 |
|
dry_run |
boolean |
否,默认false |
如果dry_run为true,只做测试,不会真实写入igraph |
|
async |
boolean |
否,默认为 false |
打开会使用异步模式,写入性能会有提升 |
Flink SQL demo
节点(kv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'cmd_field' = 'cmd',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
边(kkv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey VARCHAR
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'sk_field' = 'skey',
'cmd_field' = 'add',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
需要注意内容:
-
pk_field为写入igraph表的主键,必填字段
-
cmd_field取值为add、delete,表示是插入或删除请求。如果是add请求,默认需要传入表的全部字段,如果是update请求默认需要全部字段,如果需要支持部分字段的更新连续管理员配置
-
如果对边进行delete操作,只填写pkey 则默认删除这个起点出发的全部边,如果填写pkey和skey则只删除一条边数据
-
graph_name 为写入igraph的图名
-
label_name 为写入igraph图的节点或者边的label
-
request_retry,为了避免网络抖动等原因,设置写入igraph的重试请求次数
-
dry_run,为true时用作测试,会打log,不写igraph。