天门市规划建设局网站,中国足球最新消息,成都做公众号推广的公司,百度文库个人登录文章目录 环境准备拓扑设计构建流程设计文件流设计交互解析算法实现数据库交互NER解析相似度计算 基于数据库的文件生成从数据库中读取字段将字段后处理后保存为文件 基于文件的知识图谱构建bug修改与算法优化图数据库连接问题批量构建知识图谱问题批量删除边问题空值处理问题去… 文章目录 环境准备拓扑设计构建流程设计文件流设计交互解析算法实现数据库交互NER解析相似度计算 基于数据库的文件生成从数据库中读取字段将字段后处理后保存为文件 基于文件的知识图谱构建bug修改与算法优化图数据库连接问题批量构建知识图谱问题批量删除边问题空值处理问题去重时的大小写问题加速构建边优化 环境准备
本任务中知识图谱里的内容主要有两个来源一个是数据库中的字段值一个是从数据库中的描述字段中通过NER提取出的关键词。
因此除了在服务器上安装neo4j图数据库环境在本地安装py2neo包负责与图数据库间交互还要安装pymysql包负责与数据库间交互安装paddlenlp完成NER任务。
拓扑设计
从数据库中提取有用的信息并对应于节点和关系使用presson绘制出知识图谱的整体拓扑。其中蓝色为节点黄色为不同节点间的关系虚线为反向边绿色同同一类型节点间的关系基于相似性度量。
选中节点点击数据属性可以为不同节点添加属性并记录其在数据库中的来源
构建流程设计
其实知识图谱的构建常常是节点和边同时构建的但由于我们的数据量非常大同时构建逻辑比较复杂所以采取了先统一建节点后统一建边的逻辑。
节点构建时有两个来源一个是从数据库直接读取一个是从数据库中的一个描述字段中通过NER提取关键词来构建。
边在构建时除了这两个来源还有一种来源是同一类型节点间基于相似性计算出连边。
文件流设计
构建节点时为了让流程更为清晰可控首先根据预先定义的节点数据库CSV文件和NER解析JSON文件去生成节点数据CSV文件再根据节点数据CSV文件去生成节点。
节点数据库CSV文件名为节点名称内容包含数据库名、字段和属性三列其中字段是数据库中的字段名属性是节点中的属性名。
注意第一行的第三列必须是name第二列如果以加号开头说明要确保字段的唯一性下面是一个常规示例 写这个文件时可以做一些骚操作例如在字段中将SQL语句融入进去比如说我想构建一个坐标节点其名字为“坐标X坐标Y”这个文件的后两列就可以写成CONCAT_WS(,, zbx, zby),name。
节点数据CSV文件第一列是name后面是其他属性相当于把上面的节点数据库CSV文件横过来。以坐标节点为例 构建边时首先根据预先定义的关系数据库CSV文件和NER解析JSON文件以及相似性计算算法去生成关系数据CSV文件再根据关系数据CSV文件去生成边。
关系数据库CSV文件名为关系名称内容只有一行包含数据库,起点字段,起点label,起点所用属性,终点字段,终点label,终点所用属性这几列。其中label指的是头实体和尾实体的节点类型属性是查询时用到的属性。
关系数据CSV文件列数不固定第一部分是连接时用到的头实体节点属性形如name1, axx1, bxx1,...第二部分是连接时用到的尾实体节点属性形如name2, axx2, bxx2, ...最后两列固定为label1, label2对应头实体节点类型和尾实体节点类型以时间相似的关系为例 交互解析算法实现
为了将数据库中字段解析为知识图谱的内容需要完成以下几部分的功能一是直接与数据库交互二是通过NER解析文本描述内容三是计算时间和地点间的相似度。
数据库交互
定义下述数据库交互类输入SQL查询语句后将返回结果封装为dataframe。
import pymysql
import pandas as pd
class SQLSelector:def __init__(self):# 初始化数据库连接self.db_config {host:,user:,password:,db:,charset:utf8}self.cursor, self.connection self.connect_db()def connect_db(self):try:connection pymysql.connect(**self.db_config)cursor connection.cursor()return cursor, connectionexcept Exception as e:print(fError connecting to the database: {e})raisedef close_db(self):try:self.cursor.close()self.connection.close()except Exception as e:print(fError closing the database connection: {e})def __del__(self):# 关闭数据库连接self.close_db()def execute_db(self, sql_query):# 返回执行状态和结果try:self.cursor.execute(sql_query)execute_result pd.DataFrame(self.cursor.fetchall(), columns[desc[0] for desc in self.cursor.description])execute_state Trueexcept Exception as e:execute_result str(e)execute_state Falsereturn execute_state, execute_resultNER解析
定义下列文本解析类由于没有经过训练直接使用paddlenlp做解析并把结果保存为json因此结果不是很准确后续需要后处理下面的代码做了脱敏处理。
from paddlenlp import Taskflow
import pandas as pd
import json
from utils import write_json_file, read_json_file, is_id_card, count_elements_in_nested_list
from tqdm import tqdmtqdm.pandas()class PaddleNLP:def __init__(self):self.ner_file_path ./Data/NER_datadef paddle_ner(self, batch_size, input_list):ner Taskflow(ner, batch_sizebatch_size)print(start NER)result_list ner(input_list)return result_listdef paddle_ie(self, batch_size, input_list, schema):ner Taskflow(knowledge_mining, batch_sizebatch_size)print(start IE)result_list ner(input_list)return result_listdef ner_data_save(self, path):ner_source_data pd.read_json(path)ner_source_data[] ner_source_data[].fillna(此处为空)input_list ner_source_data[].tolist()ner_result_list self.paddle_ner(batch_size50, input_listinput_list)write_json_file(self.ner_file_path ./ner_result_no_na.json, ner_result_list)def ie_data_save(self, path):ner_source_data pd.read_json(path)ner_source_data[] ner_source_data[].fillna(此处为空)input_list ner_source_data[].tolist()[0:100]schema []ner_result_list self.paddle_ie(batch_size10, input_listinput_list, schemaschema)print(ner_result_list)def combine_ner_data(self, ner_path, source_data_path):ner_data read_json_file(ner_path)print(len(ner_data))source_df pd.read_json(source_data_path)print(len(source_df) - len(ner_data))source_df[ner_result] ner_dataoutput_file_path self.ner_file_path /ner_data.jsonsource_df.to_json(output_file_path, force_asciiFalse, orientrecords)def analysis_ner_data(self, data_path):ner_df pd.read_json(data_path)results ner_df.progress_apply(combined_entity_check, axis1)ner_df[] results[]ner_df[] results[]ner_df.drop(ner_result, axis1, inplaceTrue)output_file_path self.ner_file_path /updated_ner_data.jsonner_df.to_json(output_file_path, force_asciiFalse, orientrecords)def check_data(self, data_path):ner_data pd.read_json(data_path)qtr_xm ner_data[qtr_xm]print(count_elements_in_nested_list(qtr_xm))def combined_entity_check(row):# 提取人物类实体human_entities set(item[0] for item in row[ner_result] if item[1] 人物类_实体)xy row[].split(,) if row[] else []sa row[].split(,) if row[] else []check_result []qtr_xm []return pd.Series([check_result, qtr_xm], index[check_result, qtr_xm])if __name__ __main__:paddle_nlp PaddleNLP()ner_data_path paddle_nlp.ner_file_path /updated_ner_data.jsonsource_data_path paddle_nlp.ner_file_path /ner_source.jsonpaddle_nlp.check_data(ner_data_path)相似度计算
基于KD-Tree计算坐标和时间相似度
def filter_coordinate_nodes(nodes, label, threshold_distance0.02):print(计算坐标相似度中...)coordinates np.array([(float(node[1]), float(node[2])) for node in nodes if node[1] ! NULL and node[2] ! NULL
])# Build KD-treekdtree cKDTree(coordinates)# Query pairs within the threshold distancepairs kdtree.query_pairs(threshold_distance, output_typendarray)unique_pairs set()for i, j in pairs:pair (min(nodes[i][0], nodes[j][0]), max(nodes[i][0], nodes[j][0]), label, label)unique_pairs.add(pair)print(计算坐标相似度完成)return list(unique_pairs)def filter_time_nodes(nodes, label, threshold21600):print(Calculating time similarity...)# Extract timestamps from nodestimestamps np.array([int(datetime.strptime(time_str, %Y-%m-%d %H:%M:%S).timestamp())if time_str.strip() and time_str.strip() ! NULLelse Nonefor time_str in nodes])# Build KD-treekdtree cKDTree(timestamps.reshape(-1, 1))# Query pairs within the threshold time differencepairs kdtree.query_pairs(threshold, output_typendarray)unique_pairs set()for i, j in pairs:pair (min(nodes[i], nodes[j]), max(nodes[i], nodes[j]), label, label)unique_pairs.add(pair)print(Time similarity calculation completed!)return list(unique_pairs)经过比较其运行速度不如下面这种矩阵运算
def filter_coordinate_nodes(nodes, label, threshold_distance0.001):print(计算坐标相似度中...)# 提取坐标信息并转换为NumPy数组coordinates []for node in nodes:if node[1] ! NULL and node[2] ! NULL:x_str re.sub([^0-9.], , str(node[1]))y_str re.sub([^0-9.], , str(node[2]))if x_str and y_str: # 确保字符串不为空coordinates.append((float(x_str), float(y_str)))coordinates np.array(coordinates)# 使用广播计算两两之间的绝对值距离unique_pairs set()for i in tqdm(range(len(coordinates)), desc相似度计算):# 计算点i与其他所有点的距离distances np.sqrt(np.sum((coordinates - coordinates[i]) ** 2, axis1))# 找到距离小于阈值的点close_points np.where(distances threshold_distance)[0]# 避免将点i与自身比较close_points close_points[close_points ! i]for j in close_points:pair (min(nodes[i][0], nodes[j][0]), max(nodes[i][0], nodes[j][0]), label, label)unique_pairs.add(pair)print(计算坐标相似度完成)return list(unique_pairs)def filter_time_nodes(nodes, label, threshold10800):print(计算时间相似度中...)placeholder -1# 将时间字符串转换为时间戳对于无效值使用占位符timestamps np.array([int(datetime.strptime(time_str, %Y-%m-%d %H:%M:%S).timestamp())if time_str.strip() and time_str.strip() ! NULLelse placeholder # 使用占位符代替 Nonefor time_str in nodes])# 使用广播计算两两之间的距离unique_pairs set()# 使用 tqdm 包裹您的循环来显示进度条for i in tqdm(range(len(timestamps)), desc计算进度):if timestamps[i] is not None:# 计算时间点 i 与其他所有时间点的差异distances np.abs(timestamps - timestamps[i])# 找到小于阈值且不是自己的时间点close_points np.where((distances threshold) (distances ! 0))[0]for j in close_points:pair (min(nodes[i], nodes[j]), max(nodes[i], nodes[j]), label, label)unique_pairs.add(pair)print(计算时间相似度完成)return list(unique_pairs)基于数据库的文件生成
下面是根据生成节点和关系数据库CSV文件去生成数据CSV文件的部分基于NER和基于相似度的文件生成逻辑与之类似在此略过。
从数据库中读取字段
import os
import pandas as pd
class DbInfoReader:def __init__(self):# 数据库CSV文件存放路径self.node_file_path ./NodeDbself.rel_file_path ./RelDb# 必要时只对filter里面的做更新self.node_filter []def generate_node_info(self):node_files os.listdir(self.node_file_path)for node_file in node_files:node_db_dict {}df pd.read_csv(os.path.join(self.node_file_path, node_file), index_colFalse)# CSV文件名代表节点label第一列是数据库名第二列是数据库字段第三列是对应属性名# 注意第一行的第三列必须是name# 第二列如果以加号开头说明要确保该字段的唯一性assert df.iloc[0,2] namelabel os.path.splitext(os.path.basename(node_file))[0]if len(self.node_filter) 0 or label in self.node_filter:for _, row in df.iterrows():# 转化为字典data_dict row.to_dict()key_name data_dict.pop(数据库)if key_name not in node_db_dict:node_db_dict[key_name] {}node_db_dict[key_name][data_dict[字段]] data_dict[属性]yield label, node_db_dictdef generate_rel_info(self):rel_files os.listdir(self.rel_file_path)for rel_file in rel_files:# CSV格式库,起点字段,起点label,起点所用属性,终点字段,终点label,终点所用属性df pd.read_csv(os.path.join(self.rel_file_path, rel_file), index_colFalse)label os.path.splitext(os.path.basename(rel_file))[0]for _, row in df.iterrows():# 转化为字典data_dict row.to_dict()yield label, data_dictif __name__ __main__:dbinforeader DbInfoReader()dbinforeader.generate_rel_info()将字段后处理后保存为文件
from SQLSelector import *
from DbInfoReader import *
from utils import generate_sql_dict, read_json, is_name
from tqdm import tqdm
class DbInfoSaver:def __init__(self):# 属性CSV文件存放路径self.node_file_path ./NodeFileself.rel_file_path ./RelFileself.ner_file_path ./Data/NER_dataself.split_files []self.delete_unknown_files []self.dbinforeader DbInfoReader()self.sqlselector SQLSelector()self.sql_dict generate_sql_dict()def save_node_file_in_sql(self):# 节点来源一从数据库里直接建节点for label, node_db_dict in self.dbinforeader.generate_node_info():data pd.DataFrame()for db, pair in node_db_dict.items():# 添加唯一约束columns []for db_name, node_name in pair.items():ddb_name db_nameif db_name[0] :ddb_name DISTINCT db_name[1:]columns.append({} AS {}.format(ddb_name, node_name))column_str , .join(columns)sql_query SELECT {} FROM {}.format(column_str, db)results self.sqlselector.execute_db(sql_query)[1]data pd.concat([data, results], ignore_indexTrue)# 使用字典对字段值做映射异常值映射为未知if label in self.sql_dict:for col, dic in self.sql_dict[label].items():data[col] data[col].map(lambda x: dic.get(x, 未知))# print(col, set(data[col]))# 对多值情况做分割if label in self.split_files:new_df data[name].str.split(,, expandTrue).reset_index(dropTrue).drop_duplicates().stack().reset_index(dropTrue).drop_duplicates()new_df pd.DataFrame(new_df)new_df.columns [name]data new_df# 对每一列删除多余空格for col in data.columns:data[col] data[col].apply(lambda x: x.strip() if isinstance(x, str) else x)data data.drop_duplicates(subset data.columns).reset_index(dropTrue)# 删除每行全部未知的节点if label in self.delete_unknown_files:cols_to_check data.columns[1:]to_drop data[cols_to_check].eq(未知) | data[cols_to_check].isna() | data[cols_to_check].eq()data data[~to_drop.all(axis1)]data.to_csv(os.path.join(self.node_file_path, f{label}.csv), indexFalse)def save_rel_file_in_sql(self):# 关系来源一从数据库里直接建关系for label, rel_db_dict in tqdm(self.dbinforeader.generate_rel_info()):# 构建起点和终点的 SELECT 子句start_column {} AS {}.format(rel_db_dict[起点字段], rel_db_dict[起点所用属性]1)end_column {} AS {}.format(rel_db_dict[终点字段], rel_db_dict[终点所用属性]2)# 构建 SQL 查询语句sql_query SELECT DISTINCT {}, {} FROM {}.format(start_column, end_column, rel_db_dict[库])# 执行 SQL 查询results self.sqlselector.execute_db(sql_query)[1]results[label1] rel_db_dict[起点label]results[label2] rel_db_dict[终点label]# 对每一列删除多余空格for col in results.columns:results[col] results[col].apply(lambda x: x.strip() if isinstance(x, str) else x)results results.drop_duplicates(subsetresults.columns).reset_index(dropTrue)# 保存结果到 CSV 文件results.to_csv(os.path.join(self.rel_file_path, f{label}.csv), indexFalse)if __name__ __main__:dbinfosaver DbInfoSaver()dbinfosaver.save_node_file_in_sql()基于文件的知识图谱构建
在生成了节点和关系的数据CSV文件之后直接读取文件内容并生成知识图谱中的节点和关系全流程走完。
import pandas as pd
from py2neo import Graph, Node, NodeMatcher, Relationship
import os
from tqdm import tqdm
import logging# Set up logging configuration
logging.basicConfig(filenameerror_log.txt, levellogging.ERROR)class GraphGenerator:def __init__(self) - None:self.node_file_path ./NodeFileself.rel_file_path ./RelFileself.graph Graph(http://localhost:7474, auth(neo4j, 123456), nameneo4j)def generate_nodes_list(self, csv_file_path):将节点CSV文件中的数据导入到图数据库中:param graph: 图数据库:param csv_file_path: csv文件的路径# 读取CSV文件df pd.read_csv(csv_file_path, index_colFalse)# df.dropna(subset[name], inplaceTrue)df.fillna(NULL, inplaceTrue)nodes []label os.path.splitext(os.path.basename(csv_file_path))[0]# 遍历数据将数据导入到图数据库中for _, row in tqdm(df.iterrows(), totallen(df), descProcessing Rows):# 转化为字典data_dict row.to_dict()# 创建一个节点并添加到列表中node Node(label, **data_dict)nodes.append(node)return nodesdef generate_relationship_list(self, graph: Graph, csv_file_path):将边CSV文件中的数据导入到图数据库中:param graph: 图数据库:param csv_file_path: csv文件的路径# 读取CSV文件df pd.read_csv(csv_file_path, index_colFalse)df.fillna(NULL, inplaceTrue)relationship_label os.path.splitext(os.path.basename(csv_file_path))[0]start_label df[label1][0]end_label df[label2][0]start_keys [x for x in list(df.columns[0:-2]) if x[-1] 1]node_start_keys [x[:-1] for x in start_keys]end_keys [x for x in list(df.columns[0:-2]) if x[-1] 2]node_end_keys [x[:-1] for x in end_keys]# 预先加载所有需要的节点实现节点的大小写不敏感快速匹配算法start_nodes list(NodeMatcher(graph).match(start_label))end_nodes list(NodeMatcher(graph).match(end_label))start_nodes_map {tuple(node[key].lower() if isinstance(node[key], str) else node[key] for key in node_start_keys): node for node in start_nodes}end_nodes_map {tuple(node[key].lower() if isinstance(node[key], str) else node[key] for key in node_end_keys): node for node in end_nodes}# 创建一个空的关系列表relationships []# 遍历数据将数据导入到图数据库中for _, row in tqdm(df.iterrows(), totallen(df), descProcessing Rows):# 转化为字典data_dict row.to_dict()# 查询节点start_node start_nodes_map.get(tuple(data_dict[key].lower() if isinstance(data_dict[key], str) else data_dict[key] for key in start_keys))end_node end_nodes_map.get(tuple(data_dict[key].lower() if isinstance(data_dict[key], str) else data_dict[key] for key in end_keys))# 未找到相关节点if start_node is None or end_node is None:error_message fError: Node not found for relation {data_dict}print(error_message) # This will print the error message to the consolelogging.error(error_message)continue# 创建关系并添加到关系列表relationship Relationship(start_node, relationship_label, end_node)relationships.append(relationship)return relationshipsdef create_nodes_or_relationships(self, graph: Graph, nodes_or_relations):batch_size 10000for batch in [nodes_or_relations[i:ibatch_size] for i in range(0, len(nodes_or_relations), batch_size)]:tx graph.begin()for data in batch:tx.create(data)graph.commit(tx)def generate_all(self, modeall):根据指定的模式生成节点、关系或两者。参数:- mode (str): 指定操作模式。all 生成节点和关系node 仅生成节点rel 仅生成关系。返回:无if mode in [all, node]:# 遍历节点文件夹下的所有节点文件# 每次先清空图数据库self.graph.delete_all()for node_file in tqdm(os.listdir(self.node_file_path), desc处理节点csv中:):if node_file.endswith(.csv):csv_path os.path.join(self.node_file_path, node_file)nodes self.generate_nodes_list(csv_path)self.create_nodes_or_relationships(graphself.graph, nodes_or_relationsnodes)if mode in [all, rel]:# 遍历边文件夹下的所有边文件# 每次先清空所有边if mode rel:rel_types self.graph.schema.relationship_typesfor rel_type in rel_types:query fMATCH (n)-[r:{rel_type}]-(m) DELETE rself.graph.run(query)for edge_file in tqdm(os.listdir(self.rel_file_path), desc处理边csv中:):if edge_file.endswith(.csv):csv_path os.path.join(self.rel_file_path, edge_file)relationships self.generate_relationship_list(self.graph, csv_path)self.create_nodes_or_relationships(graphself.graph, nodes_or_relationsrelationships)if __name__ __main__:graph_generator GraphGenerator()graph_generator.generate_all(modenode)print(graph_generator)进入浏览器后查看知识图谱建立成功 bug修改与算法优化
图数据库连接问题
一开始使用Graph(http://localhost:7474, auth(neo4j, 123456))连接图数据库在执行tx graph.begin()这句会报错
py2neo.errors.ProtocolError: Cannot decode response content as JSON连接的时候加上name参数就好了。
批量构建知识图谱问题
构建函数的实现一开始为
def create_nodes_or_relationships(self, graph: Graph, nodes_or_relations):tx graph.begin()for node_or_relation in nodes_or_relations:tx.create(node_or_relation)graph.commit(tx)这样会导致下列两种问题
py2neo.errors.ProtocolError: Cannot decode response content as JSON
[Transaction.TransactionNotFound] Unrecognized transaction id. Transaction may have timed out and been rolled back.如果改成下面这种一个一个建就不会报错所以分析应该是一次建立的节点或关系太多了
def create_nodes_or_relationships(self, graph: Graph, nodes_or_relations):for node_or_relation in nodes_or_relations:tx graph.begin()tx.create(node_or_relation)graph.commit(tx)为了加速改成了现在这种一次批量建10000个的方法。
批量删除边问题
由于建边时出了一点问题试图将所有边删除删边语句一开始为MATCH ()-[r]-() DELETE r会导致下列错误
py2neo.errors.DatabaseError: [Statement.ExecutionFailed] Java heap space应该是边的数量太多了所以我改成现在这种每次删除一类边。
空值处理问题
图数据库中节点的属性不能为空值解决方法是将CSV数据文件的空值替换为NULL也就是这句
df.fillna(NULL, inplaceTrue)去重时的大小写问题
数据库中使用DISTINCT关键词去重这是大小写不敏感的无法区分大写和小写字母dataframe数据使用drop_duplicates方法去重这是大小写敏感的
当这两种方法同时用于生成数据文件就会造成不匹配问题后来通过在节点匹配时加入lower()方法统一转换为小写解决。
加速构建边优化
在构建边的时候首先要找到对应的头实体和尾实体之前的匹配算法是使用了内置的全局匹配
matcher NodeMatcher(graph)
start_node matcher.match(data_dict[label1], **start_property).first()
end_node matcher.match(data_dict[label2], **end_property).first()这样跑是能跑但是速度会非常慢因为每次都从所有的节点里面找。我们可以观察到对某种特定的关系头实体和尾实体都属于某种特定的节点类型因此可以先把所有这一类型的节点存到一个字典里再在这个字典里做匹配这也是目前实现的算法。