python nebula图数据库常用操作

2022-12-22  本文已影响0人  越大大雨天

前言

使用场景为:依赖NebulaGraph3.2.0图数据库,对一些数据节点做关联拓线查询,比如输入IP, 可查询展示该IP归属的地理位置、关联的域名、并继续往下根据域名查询解析的URL 、注册信息等,并以为图形的形式进行展示, 技术栈语言为Python。


展示样例

依赖安装及使用

常用查询

1. python创建nebula连接

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
# define a config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('127.0.0.1', 9669)], config)

# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', 'nebula')

# select space
session.execute('USE nba')

# show tags
result = session.execute('SHOW TAGS')
print(result)

# release session
session.release()

# option 2 with session_context, session will be released automatically
with connection_pool.session_context('root', 'nebula') as session:
    session.execute('USE nba')
    result = session.execute('SHOW TAGS')
    print(result)

# close the pool
connection_pool.close()

2. 常用nGQL语句及说明

2.1 统计查询

通过以下三步可获取图空间的点、边数量统计信息:

SUBMIT JOB STATS;
SHOW JOB <job_id>;
SHOW STATS;

2.2 常用数据查询语法

最常用match语法查询,更加灵活
match pattern详细语法参考: https://docs.nebula-graph.com.cn/3.2.0/3.ngql-guide/1.nGQL-overview/3.graph-patterns/

类代码示例

代码包含python flask nebula客户端初始化、关联查询、结果解析

from nebula3.data import DataObject
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
import pandas as pd
from typing import Dict, Union
from nebula3.data.ResultSet import ResultSet
from nebula3.mclient import MetaCache
from nebula3.sclient.GraphStorageClient import GraphStorageClient
from collections import defaultdict


class NebulaClient:
    """
    nebula 查询客户端,定制化为tl图谱查询
    """
    def __init__(self, graph_servers=None, meta_servers=None, **nebula_config):
        self.graph_servers = graph_servers
        self.meta_servers = meta_servers
        self.username = None
        self.password = None
        self.space = None
        self.config = Config()
        self.graph_pool = ConnectionPool()
        self.graph_storage_client = None
        self._init_config(**nebula_config)

    def _init_config(self, **nebula_config):
        self.username = nebula_config.pop("username", "")
        self.password = nebula_config.pop("password", "")
        self.space = nebula_config.pop("space", "")
        for key, value in nebula_config.items():
            setattr(self.config, key, value)
        if self.graph_servers:
            self.graph_pool.init(self.graph_servers, self.config)
        if self.meta_servers:
            self.graph_storage_client = GraphStorageClient(MetaCache(self.meta_servers))

    def init_app(self, app):
        nebula_conf = app.config.get("nebula") or {}
        nebula_graphd_conf = nebula_conf.pop("graphd", {})
        nebula_metad_conf = nebula_conf.pop("metad", {})

        graph_hosts = nebula_graphd_conf.pop("host", [])
        meta_hosts = nebula_metad_conf.pop("host", [])

        self.graph_servers = [(item.split(":")[0], item.split(":")[1]) for item in graph_hosts]
        self.meta_servers = [(item.split(":")[0], item.split(":")[1]) for item in meta_hosts]
        self._init_config(**nebula_conf)

    def ngql_query(self, gql, space=None):
        if not space:
            space = self.space
        with self.graph_pool.session_context(self.username, self.password) as session:
            session.execute(f'USE {space}')
            result = session.execute(gql)
            return result

    def match_id_relation_edge(self, vid, space=None, variable_length_edges: Union[None, tuple, list] = None) -> ResultSet:
        """
        space: 命名空间
        vid:起始查询vid
        variable_length_edges: 指定路径长度范围, 用两个元素的数组或者元祖表示最小->最大长度, 例如[1,2]
        return: 边和点的查询集ResultSet

        """
        if not space:
            space = self.space
        if not variable_length_edges:
            # 默认只查询一个层级
            variable_length_edges = [0, 1]
        # 根据起始vid,向下关联查询
        gql = f'MATCH (source)-[e*{variable_length_edges[0]}..{variable_length_edges[1]}]->(target) WHERE ( id(source)  == "{vid}") RETURN source as source,e,target as target LIMIT 100'
        print(f"gql: {gql}")
        e_result = self.ngql_query(gql, space=space)
        return e_result

    def match_id_relation_edge_result_to_df(self, result: ResultSet) -> Union[pd.DataFrame, None]:
        """
        build list for each column, and transform to dataframe
        """
        if result.is_succeeded():
            source_v = result.column_values("source")
            edge_v = result.column_values("e")
            target_v = result.column_values("target")
            d: Dict[str, list] = {
                "source": [self._parse_node(source) for source in source_v],
                "edge": [self._parse_edge(edge) for edge in edge_v],
                "target_v": [self._parse_node(target) for target in target_v]
            }
            return pd.DataFrame.from_dict(d)
        return None

    def match_id_relation_edge_result_to_struct(self, result: ResultSet):
        if result.is_succeeded() and not result.is_empty():
            source_v = result.column_values("source")
            edge_v = result.column_values("e")
            target_v = result.column_values("target")

            source_data = self._parse_node(source_v[0])
            target_data_list = [self._parse_node(target) for target in target_v]
            relationship = defaultdict(list)
            for target in target_data_list:
                target_type = target.get("type")
                # 若目标点和原点是同一个对象, 不做关联
                if target.get("id") == source_data.get("id"):
                    continue
                relationship[target_type].append(target)

            source_data.update({
                "relation": relationship
            })
            return source_data

    def _parse_node(self, node):
        node = node.as_node()
        tag = node.tags()[0]
        node_parsed = {
            "id": node.get_id().as_string(),
            "type": tag,
            "isAlarm": False,
            "info": {k: self._parse_wrapper_value(v) for k, v in node.properties(tag).items()}
        }
        return node_parsed

    def _parse_edge(self, edge):
        edge = edge.as_list()[0].as_relationship()
        edge_parsed = {
            "edge_name": edge.edge_name(),
            "info": {k: self._parse_wrapper_value(v) for k, v in edge.properties().items()},
            "start_vertex_id": edge.start_vertex_id().as_string(),
            "end_vertex_id": edge.end_vertex_id().as_string(),
        }
        return edge_parsed

    @staticmethod
    def _parse_wrapper_value(value: DataObject.ValueWrapper):
        _value = value.get_value().value
        if isinstance(_value, bytes):
            return _value.decode(encoding="utf-8")
        return _value

    def scan_vertex(self, tag_name, space_name=None, *args, **kwargs):
        if not space_name:
            space_name = self.space
        resp = self.graph_storage_client.scan_vertex(
            space_name=space_name,
            tag_name=tag_name,
            *args, **kwargs)
        all_res = []
        while resp.has_next():
            result = resp.next()
            for vertex_data in result:
                all_res.append(vertex_data)
                print(vertex_data)
        return all_res

    def close(self):
        if hasattr(self.graph_pool, "close"):
            self.graph_pool.close()


if __name__ == '__main__':
    graph_servers = [("localhost", 9669), ("localhost", 9669), ("localhost", 9669)]
    meta_servers = [("localhost", 9559), ("localhost", 9559), ("localhost", 9559)]
    nebula_tool = NebulaClient(graph_servers=graph_servers, meta_servers=meta_servers, username="root", password="nebula", space="test", max_connection_pool_size=10)
    # result = nebula_tool.ngql_query(space="tl_vast", gql='match (v) return v limit 10')
    # query_id_relations = 'match (a)-[e]-(b) where id(a) == "002b500b73952c997db130214ef03b26" return e;'
    # result = nebula_tool.ngql_query(space="tl_vast", gql='MATCH p = (source_v)-[e*1..1]->(target_v) WHERE ( id(source_v)  == "002b500b73952c997db130214ef03b26") RETURN p LIMIT 100')
    result = nebula_tool.match_id_relation_edge(vid="002b500b73952c997db130214ef03b26")
    df_result = nebula_tool.match_id_relation_edge_result_to_df(result)
    output_result = nebula_tool.match_id_relation_edge_result_to_struct(result)
    # print(nebula_tool.scan_vertex())
    print(result)
    print(df_result)
    print(output_result)
上一篇下一篇

猜你喜欢

热点阅读