本文以公开的保险数据集为例,基于云原生数据库PolarDB,在保险理赔场景中执行图查询,以识别异常理赔记录和欺诈团伙。例如,查询与欺诈保单涉及的相同理赔人的其他保单,或识别欺诈保单投保人的社交关系,以便进行欺诈预警。PolarDB在关系型数据库的基础上,提供图分析能力,为企业的统一数据管理和分析提供强有力的支持。
关于图数据库引擎
图分析是数据科学中的一个重要领域,专注于通过图结构表示数据,并执行各种计算与分析任务。图结构由节点(或称为顶点)和边组成,节点通常代表实体,而边则表示实体之间的关系。图计算广泛应用于社交网络分析、推荐系统、知识图谱、路径优化等多个领域。
PolarDB PostgreSQL版高度兼容Apache AGE的图引擎,支持对知识图谱的存储和查询检索,能够在同一个数据库集群上同时使用标准的ANSI SQL和图查询语言openCypher进行查询。
完全兼容PolarDB PostgreSQL版
AGE是PolarDB PostgreSQL版的一个扩展,可以在现有的PolarDB数据库中使用,且无需重新构建数据库。AGE继承了PolarDB所有强大功能,包括事务、并发控制、以及多种索引和优化技术。
统一的图形和关系型查询
AGE允许同时处理关系型数据和图形数据,支持在同一个查询中混合使用SQL和图查询语言,使得处理复杂的数据模型更加容易和高效。
支持Cypher查询语言
AGE支持使用Cypher查询语言。Cypher查询语言专为图数据库设计,语法简单且灵活,提供了一种直观的方式来进行图数据的查询和操作。
高性能
结合PolarDB的优化技术和专为图数据设计的索引,AGE能够高效地处理大规模图形数据和复杂的图形查询。
综上, 借助于AGE强大的能力,PolarDB可以简单、高效地处理各类图查询。
业务场景
场景描述
保险理赔欺诈通常基于保险提供商所拥有的患者、疾病及索赔等数据,分析与被保险人相关的理赔申请、疾病等实体之间的关联关系,以识别异常理赔记录并揭示潜在的欺诈团伙。
数据和模型
数据来源于保险领域公开数据集。数据包括保险行业的基本元素,数据模型可以抽象为下图所示:
点:投保人(policyholder)、保单(incharge)、理赔(claim)、病人(patient)、疾病(disease)。
边:疾病-病人(has_disease)、投保人-理赔(policyholder_of_claim)、保单-理赔(inchagre_of_claim)、病人-理赔(insured_of_claim)、相似理赔(similar_claim)、投保人关联(policyholder_connection)。
属性:姓名(name),是否高危(high_risk)、风险分数(risk_score)、疾病名称(disease_name)、相似度(similarity_score)、关联等级(level)、理赔时间(claim_date)、保额(charge)等。
最佳实践
数据库准备
仅PolarDB PostgreSQL版 14且内核小版本14.12.24.0及以上版本支持图引擎插件,详细说明请参考快速入门。
安装插件。
CREATE EXTENSION age;
将插件加入需要使用此插件的数据库或用户的搜索路径和预加载库中。
ALTER DATABASE <dbname> SET search_path = public,ag_catalog; ALTER USER <username> SET search_path = public,ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age'; ALTER USER <username> SET session_preload_libraries TO 'age';
数据入库
创建图。使用位于
ag_catalog
命名空间中的create_graph
函数创建图。SELECT create_graph('graph');
插入节点和边。由于下载的数据为CSV文件,未包含所需的ID信息,因此需要对数据进行转换后再进行入库操作。本文附录中提供了将数据转换为PolarDB中的vertex和edge的Python脚本,转换结果如下所示:
投保人(policyholder)
SELECT create_vlabel('graph','policyholder'); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3068',fname:'ADAM',lname:'OCHSENBEIN',risk_score:'88',high_risk:'1'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3069',fname:'MALINDA',lname:'MEHSERLE',risk_score:'42',high_risk:'0'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:policyholder {policyholder_id:'PH3070',fname:'SANDRA',lname:'KUHTA',risk_score:'20',high_risk:'0'}) $$ ) as (n agtype); ...
理赔(claim)
- Create vlabel SELECT create_vlabel('graph','claim'); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3571',charge:'6517.53',claim_date:'2013-08-11 00:00:00',duration:'13',insured_id:'28523',diagnosis:'no exception',person_incharge_id:'PI23070',type:'services',policyholder_id:'PH9507'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3572',charge:'49273.65',claim_date:'2017-02-10 00:00:00',duration:'3',insured_id:'1220',diagnosis:'no exception',person_incharge_id:'PI21197',type:'services',policyholder_id:'PH406'}) $$ ) as (n agtype); SELECT * FROM cypher('graph', $$ CREATE (:claim {claim_id:'C3573',charge:'52005.98',claim_date:'2014-06-29 00:00:00',duration:'27',insured_id:'23735',diagnosis:'no exception',person_incharge_id:'PI22361',type:'services',policyholder_id:'PH7911'}) $$ ) as (n agtype); ...
投保人(policyholder)和理赔(claim)的关联关系
SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1528' AND b.policyholder_id = 'PH2963' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1529' AND b.policyholder_id = 'PH1353' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1530' AND b.policyholder_id = 'PH1071' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1531' AND b.policyholder_id = 'PH8102' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); SELECT * FROM cypher('graph', $$ MATCH (a:claim), (b:policyholder) WHERE a.claim_id = 'C1532' AND b.policyholder_id = 'PH4768' CREATE (a)-[e:RELTYPE ]->(b) RETURN e$$) as (e agtype); ...
将转换后的结果保存为sql文件,配合客户端工具,如psql等可完成数据导入。
使用示例
简单查询
数据统计
统计各种类型节点数量。
SELECT count(*) FROM cypher('graph', $$ MATCH (v) RETURN v $$) as (v agtype);
返回结果如下:
count -------- 120567
理赔(claim)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:claim) RETURN v $$) as (v agtype);
返回结果如下:
count -------- 100001
投保人(policyholder)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:policyholder) RETURN v $$) as (v agtype);
返回结果如下:
count ------- 10006
保单(incharge)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:incharge) RETURN v $$) as (v agtype);
返回结果如下:
count ------- 10001
疾病(disease)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:disease) RETURN v $$) as (v agtype);
返回结果如下:
count ------- 393
病人(patient)
SELECT count(*) FROM cypher('graph', $$ MATCH (v:patient) RETURN v $$) as (v agtype);
返回结果如下:
count ------- 166
过滤查询、排序查询
查询理赔单C4377的投保、理赔、被保情况。
SELECT 'policyholder_id' as type, policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(policyholder:policyholder)
RETURN policyholder.policyholder_id
$$) AS (policyholder_id agtype)
UNION
SELECT 'incharge_id', incharge_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:incharge)
RETURN v.incharge_id
$$) AS (incharge_id agtype)
UNION
SELECT 'patient_id', patient_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(v:patient)
RETURN v.patient_id
$$) AS (patient_id agtype);
返回结果如下:
type | policyholder_id
-----------------+-----------------
patient_id | "11279"
policyholder_id | "PH3759"
incharge_id | "PI26607"
通用场景
K阶邻居
已知保单C4377为欺诈保单,查询和理赔单C4377有相同理赔病人的理赔单,说明该理赔人有涉嫌骗保的嫌疑。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:patient)<-[]-(c:claim) RETURN c.claim_id $$) AS (claim_id agtype);
返回结果如下:
?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C3679" claim_id | "C96545" claim_id | "C26586" claim_id | "C26754" claim_id | "C87278" claim_id | "C87603" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"
查询已知欺诈保单C4377的投保人的社交关系,可以对这些人的理赔情况提前预警。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(a:policyholder)-[r*1..3]->(p:policyholder) RETURN p.policyholder_id $$) AS (policyholder_id agtype);
返回结果如下:
?column? | policyholder_id -----------------+----------------- policyholder_id | "PH52532" policyholder_id | "PH11283" policyholder_id | "PH11328" policyholder_id | "PH1" policyholder_id | "PH5" policyholder_id | "PH512" policyholder_id | "PH1569" policyholder_id | "PH4722" policyholder_id | "PH4731"
路径检索
查询投保人PH3759和投保人PH4722的路径,分析投保人之间的关联关系。
SELECT *
FROM cypher('graph', $$
MATCH path = (:policyholder {policyholder_id: 'PH3759'})-[r*1..3]->(:policyholder {policyholder_id: 'PH4722'})
RETURN path
$$) AS (v agtype);
返回结果如下:
-------
[{"id": 844424930136988, "label": "policyholder", "properties": {"fname": "KURTIS", "lname": "ALKEMA", "high_risk": "1", "risk_score": "78", "policyholder_id": "PH3759"}}::vertex, {"id": 2251799813685487, "label": "RELTYPE", "end_id": 844424930133473, "start_id": 844424930136988, "properties": {"level": "65"}}::edge, {"id": 844424930133473, "label": "policyholder", "properties": {"fname": "TERRA", "lname": "SWARB", "high_risk": "0", "risk_score": "25", "policyholder_id": "PH512"}}::vertex, {"id": 2251799813685546, "label": "RELTYPE", "end_id": 844424930138502, "start_id": 844424930133473, "properties": {"level": "62"}}::edge, {"id": 844424930138502, "label": "policyholder", "properties": {"fname": "VETA", "lname": "SEDLACK", "high_risk": "0", "risk_score": "31", "policyholder_id": "PH1569"}}::vertex, {"id": 2251799813685594, "label": "RELTYPE", "end_id": 844424930136281, "start_id": 844424930138502, "properties": {"level": "92"}}::edge, {"id": 844424930136281, "label": "policyholder", "properties": {"fname": "DEANNA", "lname": "BALSER", "high_risk": "0", "risk_score": "36", "policyholder_id": "PH4722"}}::vertex]::path
共同邻居
查询保单C4377和保单C67594的共同邻居,从而找到两个保单的共同投保人。
SELECT 'policyholder_id', policyholder_id FROM cypher('graph', $$
MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(:claim {claim_id: 'C67594'})
RETURN p.policyholder_id
$$) AS (policyholder_id agtype);
返回结果如下:
?column? | policyholder_id
-----------------+-----------------
policyholder_id | "PH3759"
协同推荐
已知保单C4377为欺诈保单,查找和保单C4377有共同投保人的保单,从而找到欺诈疑似涉诈保单。
SELECT 'claim_id', claim_id FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[]->(p:policyholder)<-[]-(c:claim) RETURN c.claim_id $$) AS (claim_id agtype);
返回结果如下:
?column? | claim_id ----------+---------- claim_id | "C28963" claim_id | "C96545" claim_id | "C3679" claim_id | "C87603" claim_id | "C26754" claim_id | "C26586" claim_id | "C87278" claim_id | "C69395" claim_id | "C67594" claim_id | "C96155" claim_id | "C10160"
与已知涉诈保单C4377相似度最大的保单,返回前20个:
WITH t AS ( SELECT claim_id, replace(trim(both '"' from to_jsonb(properties)::text), '\"', '"') AS similarity FROM cypher('graph', $$ MATCH (:claim {claim_id: 'C4377'})-[e]->(c:claim) RETURN properties(e), c.claim_id $$) AS (properties agtype, claim_id agtype) ) SELECT claim_id, replace((similarity::jsonb->'similarity_score')::text, '"','')::integer AS s FROM t ORDER BY s DESC LIMIT 20;
返回结果如下:
claim_id | s ----------+---- "C67594" | 13 "C69395" | 13 "C10160" | 13 "C87603" | 13 "C28963" | 13 "C3679" | 13 "C26754" | 13 "C96155" | 13 "C26586" | 13 "C87278" | 13 "C96545" | 13 "C20113" | 8 "C70759" | 8 "C28785" | 8 "C12793" | 8 "C59736" | 8 "C38059" | 8 "C34068" | 8 "C71827" | 8 "C15760" | 8
总结
利用PolarDB PostgreSQL版的图分析能力进行图数据分析。PolarDB结合AGE扩展,提供图数据计算分析的功能,包括使用Cypher查询语言,高效处理查询图数据,为企业的统一数据管理和分析,提供强有力的支撑。
试用体验
您可以访问PolarDB免费试用页面,选择试用“云原生数据库PolarDB PostgreSQL版”,体验Ganos的图计算能力。
附录
数据转换脚本。
import csv
import os
def convert_vertex_csv(file_path, graph):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
# create vlabel
print("------------------------------------------------")
print("-- Create vlabel")
print("SELECT create_vlabel('{}','{}');".format(graph, file_name))
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{}', $$ CREATE (:{} {}) $$ ) as (n agtype);".format(graph, file_name, p))
def convert_edge_csv(file_path, graph, from_type, to_type):
file_name = os.path.splitext(os.path.basename(file_path))[0].lower()
with open(file_path, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
header = next(reader)
for row in reader:
p = ""
for h in header:
if (h.endswith("ID")):
continue;
if p != "":
p += ","
else:
p += "{"
p += "{}:'{}'".format(h.lower(), row[header.index(h)].strip())
if p != "":
p += "}"
print("SELECT * FROM cypher('{0}', $$ MATCH (a:{1}), (b:{2}) WHERE a.{1}_id = '{3}' AND "
"b.{2}_id = '{4}' CREATE (a)-[e:RELTYPE {5} ]->(b) RETURN e$$) as (e agtype);".format(graph, from_type, to_type, row[0].strip(), row[1].strip(), p))
def generate_graph_csv(directory, graph):
print("------------------------------------------------")
print("-- Create graph")
print("SELECT create_graph('{}');".format(graph))
print("------------------------------------------------")
print("-- Create vertex")
convert_vertex_csv(directory + "/POLICYHOLDER.csv", graph)
convert_vertex_csv(directory + "/INCHARGE.csv", graph)
convert_vertex_csv(directory + "/PATIENT.csv", graph)
convert_vertex_csv(directory + "/CLAIM.csv", graph)
convert_vertex_csv(directory + "/DISEASE.csv", graph)
print("------------------------------------------------")
print("-- Create edge")
convert_edge_csv(directory + "/POLICYHOLDER_CONNECTION.csv", graph, 'policyholder','policyholder')
convert_edge_csv(directory + "/INCHARGE_OF_CLAIM.csv", graph,'claim', 'incharge')
convert_edge_csv(directory + "/CLAIM_SIMILARITY.csv", graph, 'claim','claim')
convert_edge_csv(directory + "/POLICYHOLDER_OF_CLAIM.csv", graph,'claim', 'policyholder')
convert_edge_csv(directory + "/INSURED_OF_CLAIM.csv", graph, 'claim','patient')
convert_edge_csv(directory + "/HAS_DISEASE.csv", graph, 'patient','disease')
generate_graph_csv("analyzing-insurance-claims-using-ibm-db2-graph-master/data", "graph")