Neo4J展示数据任务的依赖关系及相关属性

2019-06-21  本文已影响0人  风筝flying

背景

工作中希望将复杂的数据Job的属性及相互间的依赖关系通过图形化展示,以便于清晰的展示路径依赖。现有的数据Job的元数据信息存储在关系型数据库,格式如下(数据为测试数据):


image.png

字段含义:

图形数据库采用Neo4J数据库,开发语言为Python3.7,开发IDE采用Pycharm/Jupyter

初始化图数据库

首先导入当前的全量数据存为DataFrame,为了讲解方便,数据我下载到了本地Excel文件,实际部署时改成连接关系型数据库

from py2neo import Graph,Node,Relationship,NodeMatcher,RelationshipMatcher
import numpy as np
import pandas as pd
basic_dataframe=pd.read_excel('D:\\jobrelation\\test.xlsx',sheet_name=0,usecols='A:F')

数据导入后,需要检查下数据的质量,做一些必要的数据清洗

print(incre_dataframe.isnull().any())#查看列中是否有空值
incre_dataframe.dtypes#查看列数据类型
incre_dataframe['id']=incre_dataframe['id'].astype(str)#更改列的数据类型
incre_dataframe['lvl']=incre_dataframe['lvl'].astype(str)
incre_dataframe['depencies'].fillna('config_time_run',inplace=True)#处理空值

在确保数据质量后,连接图数据库

graph=Graph("http://localhost:7474",username='neo4j',password='passwd')

然后分别来初始化各节点

for i in range(basic_dataframe.shape[0]):
    try:
        n=Node('Job',name=basic_dataframe.iloc[i]['name'],JobId=basic_dataframe.iloc[i]['id'])
        graph.create(n)
    except Exception as e:
        print(e)
        print(basic_dataframe.iloc[i]['id'])
        print(i)
lvl_dict={'1':'low','2':'medium','3':'high'}
for name,group in basic_dataframe.groupby('lvl'):
    na=Node('Lvl',LvlId=name,LvlName=lvl_dict.get(name))
    graph.create(na)
owner_dict={}
owner_start_id=100000
for name,group in basic_dataframe.groupby('owner'):
    owner_dict[name]=owner_start_id
    na=Node('Owner',OwnerId=owner_start_id,OwnerName=name)
    graph.create(na)
    owner_start_id+=1
user_dict={}
user_start_id=200000
for name,group in basic_dataframe.groupby('user'):    
    user_dict[name]=user_start_id
    na=Node('User',UserId=user_start_id,UserName=name)
    graph.create(na)
    user_start_id+=1

如上,4类节点已创建完成,接下来创建各节点间的关系。

matcher=NodeMatcher(graph)
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
    job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
    user_node=matcher.match('User').where(UserId=user_dict.get(getattr(row,'user'))).first()
    r=Relationship(user_node,'Create',job_node)
    graph.create(r)
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
    job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
    owner_node=matcher.match('Owner').where(OwnerId=owner_dict.get(getattr(row,'owner'))).first()
    r=Relationship(job_node,'Belongto',owner_node)
    graph.create(r)
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
    job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
    lvl_node=matcher.match('Lvl').where(LvlId=getattr(row,'lvl')).first()
    r=Relationship(job_node,'Level',lvl_node)
    graph.create(r)
for row in basic_dataframe.itertuples(index=True,name='Pandas'):
    job_node=matcher.match('Job').where(JobId=getattr(row,'id')).first()
    depencies_job=str(getattr(row,'depencies'))
    if depencies_job!='config_time_run' and depencies_job.find(':')!=-1:
        for index,val in enumerate(depencies_job.split(':')):
            depency_job_node=matcher.match('Job').where(JobId=val).first()
            if not depency_job_node is None:
                r=Relationship(job_node,'Depency',depency_job_node)
                graph.create(r)
    elif depencies_job!='config_time_run' and depencies_job.find(':')==-1:
        depency_job_node=matcher.match('Job').where(JobId=depencies_job).first()
        if not depency_job_node is None:
            r=Relationship(job_node,'Depency',depency_job_node)
            graph.create(r)
    else:
        pass

至此,全量数据的图数据库初始化已经完成,接下来实现增量的更新。

数据的增量更新

数据的增量更新考虑过两种方案,一是开启MySQL的binlog,实时同步元数据的更新操作;二是通过最后更新时间字段,每10分钟取一次增量数据,进行更新。综合考虑了项目的实时性要求和开发复杂度,选择了第二种方案。

incre_dataframe=pd.read_excel('d:\\jobrelation\\test.xlsx',sheet_name=1,usecols='A:F')
max_owner_id=int(graph.run('match (owner:Owner) return max(owner.OwnerId)').to_data_frame().iat[0,0])#找出当前最大的ownerId,并转成int
for name,group in incre_dataframe.groupby('owner'):
    if(nodematcher.match("Owner").where(OwnerName=name).first() is None):
        max_owner_id+=1
        na=Node('Owner',OwnerId=max_owner_id,OwnerName=name)
        graph.create(na)
max_user_id=int(graph.run('match (user:User) return max(user.UserId)').to_data_frame().iat[0,0])
for name,group in incre_dataframe.groupby('user'):
    if(nodematcher.match("User").where(UserName=name).first() is None):
        max_user_id+=1
        na=Node('User',UserId=max_user_id,UserName=name)
        graph.create(na)

接下来是对增量数据中的Job信息的处理,本案中采取的解决办法是对增量数据中的JobId,如果在当前的Job节点中存在,则删除该JobId及其的相应关系,然后依据增量数据重建该JobId及其关系

#job_node_dict={}
for row in incre_dataframe.itertuples(index=True,name='Pandas'):
    job_node=nodematcher.match('Job').where(JobId=getattr(row,'id')).first()
    #job_node_dict[getattr(row,'id')]=(0 if(job_node is None) else 1)
    if not job_node is None:
        cql='match(job:Job{JobId:"'+getattr(row,'id')+'"}) detach delete job'
        graph.run(cql)
    n=Node('Job',name=getattr(row,'name'),JobId=getattr(row,'id'))
    graph.create(n)
for row in incre_dataframe.itertuples(index=True,name='Pandas'):
    job_node=nodematcher.match('Job').where(JobId=getattr(row,'id')).first()
    name=str(getattr(row,'name'))
    depencies_job=str(getattr(row,'depencies'))
    user=str(getattr(row,'user'))
    owner=str(getattr(row,'owner'))
    lvl=str(getattr(row,'lvl'))
    #job和依赖job间的关系
    if depencies_job!='config_time_run' and depencies_job.find(':')!=-1:
        for index,val in enumerate(depencies_job.split(':')):
            depency_job_node=nodematcher.match('Job').where(JobId=val).first()
            if not depency_job_node is None:
                r=Relationship(job_node,'Depency',depency_job_node)
                graph.create(r)
    elif  depencies_job!='config_time_run' and depencies_job.find(':')==-1:
        depency_job_node=nodematcher.match('Job').where(JobId=depencies_job).first()
        if not depency_job_node is None:
            r=Relationship(job_node,'Depency',depency_job_node)
            graph.create(r)
    else:
        pass
    #job和Owner的关系
    owner_node=nodematcher.match('Owner').where(OwnerName=owner).first()
    r=Relationship(job_node,'Belongto',owner_node)
    graph.create(r)
    #job和level的关系
    lvl_node=nodematcher.match('Lvl').where(LvlId=lvl).first()
    r=Relationship(job_node,'Level',lvl_node)
    graph.create(r)
    #job和user的关系
    user_node=nodematcher.match('User').where(UserName=user).first()
    r=Relationship(user_node,'Create',job_node)
    graph.create(r)

目前效果及后续方向

在图数据库的效果如下:[图片上传中...(image.png-12ee9d-1561103426948-0)]


image.png

目前的效果需要在Neo4j的数据界面通过Cypher来查询,接下来希望能开发web页面,方便非开发人员使用。ps:稍微看了下图数据库与web结合的例子,前端这些东东真是头大啊!如果找不到前端资源,就只能硬着头皮自己来O(∩_∩)O哈哈~

上一篇 下一篇

猜你喜欢

热点阅读