Flink Connector 配置

Flink Connector 使用

Connector 准备

参考管理自定义连接器,上传igraph Connector的jar包。

上传成功如图所示

image.png

image.png

Propoerties详解

名称

类型

是否必须

描述

endpoint

string

igraph实例对应的endpoint

username

string

igraph请求的username

password

string

igraph请求的password

graph_name

string

要写入的igraph图名

label_name

string

要写入的igraph边或者节点名

pk_field

string

igraph sink表的pk字段

sk_field

string

igraph sink表的sk字段

cmd_field

string

igraph sink表的cmd字段,取值为add、delete,用于表示这条记录是新增或删除请求

request_retry

int

否,默认0

请求igraph的重试次数,默认为0

dry_run

boolean

否,默认false

如果dry_run为true,只做测试,不会真实写入igraph

async

boolean

否,默认为 false

打开会使用异步模式,写入性能会有提升

Flink SQL demo

节点(kv)

--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
 cmd varchar,
 content varchar,
 gatewayTime bigint,
 instnceId varchar,
 pkey varchar
 )
WITH (
 'connector' = 'sls',
 'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
 'accessId' = 'xxx',
 'accessKey' = 'xxx',
 'startTime' = '2022-11-08 23:28:00',
 'project' = 'igraph-cn-shanghai',
 'logStore' = 'log',
 'consumerGroup' = 'xxx',
 'batchGetSize' = '1'
 );


CREATE TEMPORARY TABLE igraph_sink (
 cmd varchar,
 content varchar,
 gatewayTime bigint,
 instnceId varchar,
 pkey varchar
) with (
 'connector' = 'igraph',
 'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com', 
 'username' = '{igraphUserName}', 
 'password' = '{igraphPassword}', 
 'pk_field' = 'pkey', 
 'cmd_field' = 'cmd',
 'graph_name' = '{graphName}',
 'label_name' = '{nodeName}',
 'dry_run' = 'false'
);

INSERT INTO igraph_sink
 select * from bhv_source;

边(kkv)

--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
 cmd varchar,
 content varchar,
 gatewayTime bigint,
 instnceId varchar,
 pkey varchar,
 skey VARCHAR
 )
WITH (
 'connector' = 'sls',
 'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
 'accessId' = 'xxx',
 'accessKey' = 'xxx',
 'startTime' = '2022-11-08 23:28:00',
 'project' = 'igraph-cn-shanghai',
 'logStore' = 'log',
 'consumerGroup' = 'xxx',
 'batchGetSize' = '1'
 );


 CREATE TEMPORARY TABLE igraph_sink (
 cmd varchar,
 content varchar,
 gatewayTime bigint,
 instnceId varchar,
 pkey varchar,
 skey varchar
) with (
 'connector' = 'igraph',
 'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com', 
 'username' = '{igraphUserName}', 
 'password' = '{igraphPassword}', 
 'pk_field' = 'pkey', 
 'sk_field' = 'skey',
 'cmd_field' = 'add',
 'graph_name' = '{graphName}',
 'label_name' = '{nodeName}',
 'dry_run' = 'false'
);

INSERT INTO igraph_sink
 select * from bhv_source;

需要注意内容:

  • pk_field为写入igraph表的主键,必填字段

  • cmd_field取值为add、delete,表示是插入或删除请求。如果是add请求,默认需要传入表的全部字段,如果是update请求默认需要全部字段,如果需要支持部分字段的更新连续管理员配置

  • 如果对边进行delete操作,只填写pkey 则默认删除这个起点出发的全部边,如果填写pkey和skey则只删除一条边数据

  • graph_name 为写入igraph的图名

  • label_name 为写入igraph图的节点或者边的label

  • request_retry,为了避免网络抖动等原因,设置写入igraph的重试请求次数

  • dry_run,为true时用作测试,会打log,不写igraph。