简介
本文通过在 Neo4j Workspace 中查询,串联代码片段并可视化所构建的知识图谱。使用 Google Cloud Vertex AI 而不是 OpenAI 进行嵌入,以及 Neo4j Workspace 中一些很酷的图可视化。
知识图谱是知识的结构化表示,通常以实体(如人、地点、文档或概念)和它们之间的关系为形式。它是为复杂领域建模和理解不同信息之间联系的一种强大方法。知识图谱应用广泛,包括语义搜索、推荐系统、网络分析等。知识图谱也是 RAG(检索增强生成)的绝佳选择,因为它们强加了一个概念结构,而不是随机大小块的简单重叠。在我之前的文章中,我注意到与 RAG 相比,知识图谱(KG)会减少很多幻觉。
Neo4j 是一种流行的图数据库管理系统,专门用于存储、查询和分析图结构数据。它擅长管理相互连接的数据,由于具有原生图存储和处理能力,因此非常适合构建知识图谱。
以下是 Neo4j 如何帮助构建知识图谱:
图形存储: Neo4j将数据存储在节点(代表实体)和关系(代表实体间的连接/链接)中。这种结构非常适合表示知识图谱中的复杂关系。
关系作为核心数据: 在Neo4j中,关系带有属性并能被有效地遍历。这样就可以对实体间的关系进行丰富的建模,捕捉数据的各种细微差别。
图形查询语言(Cypher): Neo4j 使用 Cypher 作为其查询语言。Cypher 专为表达图形数据中的模式和执行图形操作而设计。它是一种声明式语言,允许用户指定要从图中检索的内容。它几乎类似于 SQL 语言,但由于需要专业知识,因此有一定的学习曲线。
遍历和模式匹配:使用 Cypher,你可以遍历图以查找实体之间的模式、关系和路径。这对于查询知识图谱以提取有意义的见解和回答复杂问题至关重要。
可扩展性: Neo4j具有高度的可扩展性,可以处理拥有数百万甚至数十亿节点和关系的大规模知识图谱。它提供集群和分片等功能,可将数据分布到多个服务器上,以提高性能和容错性。在这里,由于我们使用的是免费账户,因此处理能力有限。
集成: Neo4j 提供多种集成选项,允许从数据库、.dump 文件、CSV、TSV 或 API 等不同来源获取数据。这使得从不同来源获取相关数据填充知识图谱变得更加容易。
可视化和分析:Neo4j提供了可视化和分析图表数据的工具,允许用户探索知识图谱的结构,识别模式,并通过交互式可视化获得洞察力。
本文较长,涉及大量代码,而且故障排除也并非易事,因为谷歌搜索有时会带来这样的结果:
看来我们来对地方了,此外,它还非常有用。在这里,我不会给你 pip 安装,所以你必须找出如何设置 Python 环境。
让我们开始吧。首先,访问 https://neo4j.com/,注册免费账户并创建一个新实例:
然后,打开并连接:
你会得到一个带有默认数据库 neo4j 的干净实例:
提示:如果你弄乱了数据库,可以随时将其重置为空白。重新启动需要几分钟时间,但随后就可以正常使用了。
从现在起,我将文章分为三个部分:
第1部分
在为LangChain、Neo4j 和 Vertex AI 创建的环境中启动 Jupyter Notebook。在 Google Cloud Platform 上进行身份验证:
gcloud auth login
现在,我们定义一些变量并导入库。你的秘密管理器中应该有一个来自 Google Cloud 的 key.json 服务账户文件(转到 IAM/Service Accounts),文件名为 GOOGLE_APPLICATION_CREDENTIALS。
import os
from langchain_community.graphs import Neo4jGraph
NEO4J_URI = "neo4j+s://YOUR-DATABASE-NUMBER.databases.neo4j.io"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "YOUR-NEO4J-PASSWORD"
NEO4J_DATABASE = "neo4j"
from google.cloud import secretmanager
from google.cloud import aiplatform
import vertexai
import warnings
import json
import textwrap
# Langchain
from langchain_community.graphs import Neo4jGraph
from langchain_community.vectorstores import Neo4jVector
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQAWithSourcesChain
with warnings.catch_warnings():
warnings.simplefilter('ignore')
def access_secret_version(secret_version_id):
client = secretmanager.SecretManagerServiceClient()
response = client.access_secret_version(name=secret_version_id)
return response.payload.data.decode('UTF-8')
secret_version_id = f"projects/YOUR-PROJECT-NUMBER/secrets/GOOGLE_APPLICATION_CREDENTIALS/versions/latest"
key=access_secret_version(secret_version_id)
os.getenv(key)
vertexai.init(project='YOUR-PROJECT', location='us-central1')
在 Neo4j 终端运行以下命令
SHOW DATABASES yield name
从课程中获取 0000950170-23-027948.json (Form) 文件。也可通过SEC的 EDGAR 系统获取数据,并做好准备:
现在,我们将定义 Neo4j 变量:
## CONSTRUCT KG
VECTOR_INDEX_NAME = 'form_10k_chunks'
VECTOR_NODE_LABEL = 'Chunk'
VECTOR_SOURCE_PROPERTY = 'text'
VECTOR_EMBEDDING_PROPERTY = 'textEmbedding'
json_file = json.load(open("0000950170-23-027948.json"))
json_file['item1'][0:200]
>项目 1. \nBusiness\n\n\nOverview\n\n\nNetApp,Inc.(NetApp,我们或公司)是一家以云为主导、以数据为中心的全球性软件公司。公司成立于 1992 年,总部位于加利福尼亚州圣何塞。在三十多年的创新基础上,我们为客户提供跨混合多云环境管理应用程序和数据的自由。
现在,我们创建 RecursiveCharacterTextSplitter,以便使用 LangChain 创建分块:在这里,分块大小为 1000,分块重叠度为 500 的结果要比 2000 / 200 好得多。
splitter = RecursiveCharacterTextSplitter(
chunk_size = 1000,
chunk_overlap = 500,
length_function = len,
is_separator_regex = False,
)
下面是分割 JSON 数据的函数。我们使用 [:30] 来限制计算需求,因为我们使用的是免费实例:
def split_json_data(file):
chunks = [] # use this to accumlate chunk records
for item in ['item1','item1a','item7','item7a']: # pull these keys from the json
print(f'Processing {item} from {file}')
item_text = json_file[item] # grab the text of the item
item_text_chunks = splitter.split_text(item_text) # split the text into chunks
chunk_seq_id = 0
for chunk in item_text_chunks[:30]: # only take the first 30 chunks
# extract form id from file name
file="0000950170-23-027948.json"
form_id = str(file[0:-6])+"1"
# finally, construct a record with metadata and the chunk text
chunks.append({
'text': chunk,
# metadata from looping...
'f10kItem': item,
'chunkSeqId': chunk_seq_id,
# constructed metadata...
'formId': f'{form_id}', # pulled from the filename
'chunkId': f'{form_id}-{item}-chunk{chunk_seq_id:04d}',
# metadata from file...
'names': json_file['names'],
'cik': json_file['cik'],
'cusip6': json_file['cusip6'],
'source': json_file['source'],
})
chunk_seq_id += 1
print(f'\tSplit into {chunk_seq_id} chunks')
return chunks
然后,我们创建分块:
## CREATE CHUNKS
json_chunks=split_json_data(json_file)
json_chunks[0]
现在,我们使用类似 Neo4j SQL 的语法(Cypher)创建节点,每个节点代表一个特定的块。这样,每个数据块都将有 8 个属性:名称、formId、cik、cusip6、source、f10kItem、chunkSeqId 和文本,这些属性将与 JSON 文件相匹配。
## CREATE NODE WITH PROPERTIES
merge_chunk_node = """
MERGE(mergedChunk:Chunk {chunkId: $chunkParam.chunkId})
ON CREATE SET
mergedChunk.names = $chunkParam.names,
mergedChunk.formId = $chunkParam.formId,
mergedChunk.cik = $chunkParam.cik,
mergedChunk.cusip6 = $chunkParam.cusip6,
mergedChunk.source = $chunkParam.source,
mergedChunk.f10kItem = $chunkParam.f10kItem,
mergedChunk.chunkSeqId = $chunkParam.chunkSeqId,
mergedChunk.text = $chunkParam.text
RETURN mergedChunk
"""
让我们试着理解一下这个Cypher:
MERGE(mergedChunk:Chunk {chunkId: $chunkParam.chunkId}): 此行创建或合并一个标为 Chunk 的节点,该节点的 chunkId 属性等于 $chunkParam.chunkId。如果已经存在具有该 chunkId 的节点,则将合并(如果尚未合并)该节点,并标记为 Chunk。如果不存在,则会创建一个带有该 chunkId 和标签 Chunk 的新节点。
关于创建集: 只有在合并操作后创建了新节点时,才会执行这部分查询。它将设置新创建或合并节点的属性。
mergedChunk.names = $chunkParam.names ... 等等
这些行将根据 $chunkParam 变量中提供的值设置合并后的 Chunk 节点的各种属性(names、formId、cik、cusip6、source、f10kItem、chunkSeqId、text)。
返回合并后的 Chunk: 此行将返回 MERGE 操作后的合并后的 Chunk 节点。如果节点已经存在,则返回现有节点;如果节点是创建或合并的,则返回新创建或合并的节点。
创建具有 8 个属性的节点后,我们将用 JSON 文件中的块填充节点,其中键与属性相匹配:
### POPULATE NODES WITH CHUNKS
node_count = 0
for chunk in json_chunks:
print(f"Creating `:Chunk` node for chunk ID {chunk['chunkId']}")
kg.query(merge_chunk_node,
params={
'chunkParam': chunk
})
node_count += 1
print(f"Created {node_count} nodes")
然后,我们为数据块创建一个向量索引。我将在向量中使用维数 768 和余弦相似度来获得 top_k 结果:
## CREATE VECTOR INDEX
kg.query("""
CREATE VECTOR INDEX `form_10k_chunks` IF NOT EXISTS
FOR (c:Chunk) ON (c.textEmbedding)
OPTIONS { indexConfig: {
`vector.dimensions`: 768,
`vector.similarity_function`: 'cosine'
}}
""")
让我们来看看指数。在这里,你也可以使用 Neo4j 工作区的终端
## SHOW INDEXES
kg.query("""
SHOW VECTOR INDEXES
"""
)
[{‘id’: 2,
‘name’: ‘form_10k_chunks’,
‘state’: ‘ONLINE’,
‘populationPercent’: 100.0,
‘type’: ‘VECTOR’,
‘entityType’: ‘NODE’,
‘labelsOrTypes’: [‘Chunk’],
‘properties’: [‘textEmbedding’],
‘indexProvider’: ‘vector-2.0’,
‘owningConstraint’: None,
‘lastRead’: None,
‘readCount’: None}
现在,我们应该获得 Google Cloud访问令牌了。请记住,谷歌云访问令牌有设定的过期时间,默认情况下是一小时,但根据配置的不同,过期时间可能会更长。因此,如果你明天尝试运行同一台笔记本,它将无法工作,你可能需要重新生成访问令牌。运行:
!gcloud auth print-access-token
复制输出并添加到 "YOUR-GOOGLE-CLOUD-TOKEN "和你的 GCP 项目中,因为现在我们将用Vertex AI Embeddings填充索引。
## POPULATE INDEX
kg.query("""
MATCH (chunk:Chunk) WHERE chunk.textEmbedding IS NULL
WITH chunk, genai.vector.encode(
chunk.text,
"VertexAI",{token: "YOUR-GOOGLE-CLOUD-TOKEN", projectId: 'YOUR-PROJECT'})
AS vector
CALL db.create.setNodeVectorProperty(chunk, "textEmbedding", vector)
""")
让我们来看看 KG 模式:我们还没有关系,只有节点。
## GET SCHEMA
kg.refresh_schema()
print(kg.schema)
让我们测试矢量索引:
def neo4j_vector_search(question):
vector_search_query = """
WITH genai.vector.encode(
$question,
"VertexAI",{token: "YOUR-GOOGLE-CLOUD-TOKEN", projectId: 'YOUR-PROJECT'})
AS question_embedding
CALL db.index.vector.queryNodes($index_name, $top_k, question_embedding) yield node, score
RETURN score, node.text AS text
"""
similar = kg.query(vector_search_query,
params={
'question': question,
'index_name':VECTOR_INDEX_NAME,
'top_k': 10})
return similar
search_results = neo4j_vector_search(
'In a single sentence, tell me about Netapp.'
)
search_results[0]
让我们来看看 Neo4j Workspace 中的节点/块:它们相互之间是断开的。
让我们检查并添加属性:
cypher = """
MATCH (anyChunk:Chunk)
WITH anyChunk
RETURN anyChunk { .names, .source, .formId, .cik, .cusip6 } as formInfo
"""
form_info_list = kg.query(cypher)
node_form=form_info_list[0]['formInfo']
node_form
cypher = """
MERGE (f:Form {formId: $formInfoParam.formId })
ON CREATE
SET f.names = $formInfoParam.names
SET f.source = $formInfoParam.source
SET f.cik = $formInfoParam.cik
SET f.cusip6 = $formInfoParam.cusip6
"""
kg.query(cypher, params={'formInfoParam': node_form})
现在,我们将按部分对表单的块进行排序,顺序为:chunk0000、chunk0001、chunck0002 ...
cypher = """
MATCH (from_same_section:Chunk)
WHERE from_same_section.formId = $formIdParam
AND from_same_section.f10kItem = $f10kItemParam
RETURN from_same_section { .formId, .f10kItem, .chunkId, .chunkSeqId }
ORDER BY from_same_section.chunkSeqId ASC
LIMIT 10
"""
kg.query(cypher, params={'formIdParam': node_form['formId'],
'f10kItemParam': 'item1'})
现在,我们为节点创建一个 NEXT 链接(关系),该链接将以 (:Chunk) - [:NEXT]->(:Chunk) 的格式显示在 KG 模式中。请注意,"Chunk "和 "NEXT "可以是你选择的任何单词。
cypher = """
MATCH (from_same_section:Chunk)
WHERE from_same_section.formId = $formIdParam
AND from_same_section.f10kItem = $f10kItemParam
WITH from_same_section
ORDER BY from_same_section.chunkSeqId ASC
WITH collect(from_same_section) as section_chunk_list
CALL apoc.nodes.link(
section_chunk_list,
"NEXT",
{avoidDuplicates: true}
)
RETURN size(section_chunk_list)
"""
kg.query(cypher, params={'formIdParam': node_form['formId'],
'f10kItemParam': 'item1'})
kg.refresh_schema()
print(kg.schema)
现在我们有了第一个关系:
让我们创建Foem 10 所有部分之间的所有有序关系。
## CREATE ALL RELATIONSHIPS BETWEEN FORM 10 SECTIONS
cypher = """
MATCH (from_same_section:Chunk)
WHERE from_same_section.formId = $formIdParam
AND from_same_section.f10kItem = $f10kItemParam
WITH from_same_section
ORDER BY from_same_section.chunkSeqId ASC
WITH collect(from_same_section) as section_chunk_list
CALL apoc.nodes.link(
section_chunk_list,
"NEXT",
{avoidDuplicates: true}
)
RETURN size(section_chunk_list)
"""
for form10kItemName in ['item1', 'item1a', 'item7', 'item7a']:
kg.query(cypher, params={'formIdParam':node_form['formId'],
'f10kItemParam': form10kItemName})
检查创建的序列:在图片中它是逆时针方向的。
现在,我们在Form10 中创建了 "块 "关系,并将这些 "块 "作为 PART_OF 连接到它们的父节点--F 10K。请注意正在形成的连接: Chunk PART_OF Form
## Connect chunks to their parent form with a PART_OF relationship
cypher = """
MATCH (c:Chunk), (f:Form)
WHERE c.formId = f.formId
MERGE (c)-[newRelationship:PART_OF]->(f)
RETURN count(newRelationship)
"""
kg.query(cypher)
接下来我们要做的是:
## - Modify `NEXT` relationship to have variable length, FROM ZERO TO 1
cypher = """
MATCH (f:Form)-[r:SECTION]->(first:Chunk)
WHERE f.formId = $formIdParam
AND r.f10kItem = $f10kItemParam
RETURN first.chunkId as chunkId, first.text as text
"""
first_chunk_info = kg.query(cypher, params={
'formIdParam': node_form['formId'],
'f10kItemParam': 'item1'
})[0]
cypher = """
MATCH (first:Chunk)-[:NEXT]->(nextChunk:Chunk)
WHERE first.chunkId = $chunkIdParam
RETURN nextChunk.chunkId as chunkId, nextChunk.text as text
"""
kg.query(cypher,
params={'chunkIdParam': first_chunk_info['chunkId']})
next_chunk_info = kg.query(cypher, params={
'chunkIdParam': first_chunk_info['chunkId']
})[0]
cypher = """
MATCH window = (c1:Chunk)-[:NEXT]->(c2:Chunk)-[:NEXT]->(c3:Chunk)
WHERE c1.chunkId = $chunkIdParam
RETURN length(window) as windowPathLength
"""
kg.query(cypher,
params={'chunkIdParam': next_chunk_info['chunkId']})
## CREATE NEXT RELATIONSHIPS
cypher = """
MATCH window=(c1:Chunk)-[:NEXT]->(c2:Chunk)-[:NEXT]->(c3:Chunk)
WHERE c2.chunkId = $chunkIdParam
RETURN nodes(window) as chunkList
"""
# pull the chunk ID from the first
kg.query(cypher,
params={'chunkIdParam': first_chunk_info['chunkId']})
## ADD VARIABLE LENGTH TO NEXT RELATIONSHIPS
cypher = """
MATCH window=
(:Chunk)-[:NEXT*0..1]->(c:Chunk)-[:NEXT*0..1]->(:Chunk)
WHERE c.chunkId = $chunkIdParam
RETURN length(window)
"""
kg.query(cypher,
params={'chunkIdParam': first_chunk_info['chunkId']})
cypher = """
MATCH window=
(:Chunk)-[:NEXT*0..1]->(c:Chunk)-[:NEXT*0..1]->(:Chunk)
WHERE c.chunkId = $chunkIdParam
WITH window as longestChunkWindow
ORDER BY length(window) DESC LIMIT 1
RETURN length(longestChunkWindow)
"""
kg.query(cypher,
params={'chunkIdParam': first_chunk_info['chunkId']})
通过 NEXT 和 PART_OF 语句,我们在 Neo4j 工作区中得到了这个图表:
MATCH p=(Company)-[:NEXT|PART_OF*]->()
RETURN DISTINCT p
LIMIT 25;
如果我们选择其中一个节点,就会看到它的属性(JSON 数据中的键值):
表单的块现在是有序的,所有的块都是表单的 PART_OF。
让我们用 LangChain 测试一下目前所做的工作:创建 Cypher 并定义Vector Store 和 Retriever:
from langchain_community.llms import VertexAI
from langchain_google_vertexai import VertexAIEmbeddings
retrieval_query_window = """
MATCH window=
(:Chunk)-[:NEXT*0..1]->(node)-[:NEXT*0..1]->(:Chunk)
WITH node, score, window as longestWindow
ORDER BY length(window) DESC LIMIT 1
WITH nodes(longestWindow) as chunkList, node, score
UNWIND chunkList as chunkRows
WITH collect(chunkRows.text) as textList, node, score
RETURN apoc.text.join(textList, " \n ") as text,
score,
node {.source} AS metadata
"""
vector_store_extra_text = Neo4jVector.from_existing_index(
embedding=VertexAIEmbeddings(),
url=NEO4J_URI,
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
database="neo4j",
index_name=VECTOR_INDEX_NAME,
text_node_property=VECTOR_SOURCE_PROPERTY,
retrieval_query=retrieval_query_window,
)
# Create a retriever from the vector store
retriever = vector_store_extra_text.as_retriever()
# Create a chatbot Question & Answer chain from the retriever
chain = RetrievalQAWithSourcesChain.from_chain_type(
VertexAI(temperature=0),
chain_type="stuff",
retriever=retriever
)
chain('Who is Netapp ?')
{'question': 'Netapp 是谁?',
'answer': ' Netapp 是一家数据管理公司。\n',
'sources': ' https://www.sec.gov/Archives/edgar/data/1002047 /000095017023027948/0000950170-23-027948-index.htm' }
第2部分
现在,我们将处理课程中的 form13.csv,如下所示:
让我们把它转换成字典:
## ADD COLLECTION FORMS 13s
import csv
all_form13s = []
with open('form13.csv', mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader: # each row will be a dictionary
all_form13s.append(row)
all_form13s[0:3]
[{‘source’: ‘https://sec.gov/Archives/edgar/data/1000275/0001140361-23-039575.txt',
‘managerCik’: ‘1000275’,
‘managerAddress’: ‘ROYAL BANK PLAZA, 200 BAY STREET, TORONTO, A6, M5J2J5’,
‘managerName’: ‘Royal Bank of Canada’,
‘reportCalendarOrQuarter’: ‘2023–06–30’,
‘cusip6’: ‘64110D’,
‘cusip’: ‘64110D104’,
‘companyName’: ‘NETAPP INC’,
‘value’: ‘64395000000.0’,
‘shares’: ‘842850’},{…..
现在,我们将在图表中创建公司节点,并更新公司值,使其与 10-K 表相匹配:
first_form13 = all_form13s[0]
cypher = """
MERGE (com:Company {cusip6: $cusip6})
ON CREATE
SET com.companyName = $companyName,
com.cusip = $cusip
"""
kg.query(cypher, params={
'cusip6':first_form13['cusip6'],
'companyName':first_form13['companyName'],
'cusip':first_form13['cusip']
})
cypher = """
MATCH (com:Company), (form:Form)
WHERE com.cusip6 = form.cusip6
RETURN com.companyName, form.names
"""
kg.query(cypher)
cypher = """
MATCH (com:Company), (form:Form)
WHERE com.cusip6 = form.cusip6
SET com.names = form.names
"""
kg.query(cypher)
现在,我们在公司和 Form-13 节点之间创建 FILED 关系,这意味着 X 公司通过使用 cusip6 识别字段提交了 Form 13。
kg.query("""
MATCH (com:Company), (form:Form)
WHERE com.cusip6 = form.cusip6
MERGE (com)-[:FILED]->(form)
""")
然后,我们为所有提交了 13 号表格以报告其对 NETAPP 投资的公司创建经理节点,从单个节点开始:
kg.query("""
CREATE CONSTRAINT unique_manager
IF NOT EXISTS
FOR (n:Manager)
REQUIRE n.managerCik IS UNIQUE
""")
同时创建经理全文索引,以便进行文字匹配之外的文本搜索:
kg.query("""
CREATE FULLTEXT INDEX fullTextManagerNames
IF NOT EXISTS
FOR (mgr:Manager)
ON EACH [mgr.managerName]
""")
查询全文索引,检查 "royal bank "是否返回 "Royal Bank of Canada":
kg.query("""
CALL db.index.fulltext.queryNodes("fullTextManagerNames",
"royal bank") YIELD node, score
RETURN node.managerName, score
""")
现在,为所有提交 13 号表格的公司创建节点:
cypher = """
MERGE (mgr:Manager {managerCik: $managerParam.managerCik})
ON CREATE
SET mgr.managerName = $managerParam.managerName,
mgr.managerAddress = $managerParam.managerAddress
"""
# loop through all Form 13s
for form13 in all_form13s:
kg.query(cypher, params={'managerParam': form13 })
注意 Neo4j Workspace 中 Form 10 块周围的浅绿色节点。Cypher为
MATCH (n) OPTIONAL MATCH (n)-[r]-(m) RETURN COLLECT(n) AS nodes, COLLECT(r) AS relationships
此时,Form 10K 的 Chunk 会依次连接到 NETAPP 节点,但投资 NETAPP 股份的公司是孤立的。
要查看加拿大皇家银行在 NETAPP INC 的投资情况,请运行此 Cypher:
first_form13 = all_form13s[0]
cypher = """
MATCH (mgr:Manager {managerCik: $investmentParam.managerCik}),
(com:Company {cusip6: $investmentParam.cusip6})
RETURN mgr.managerName, com.companyName, $investmentParam as investment
"""
kg.query(cypher, params={
'investmentParam': first_form13
})
好极了。现在我们来做以下工作:
cypher = """
MATCH (mgr:Manager {managerCik: $ownsParam.managerCik}),
(com:Company {cusip6: $ownsParam.cusip6})
MERGE (mgr)-[owns:OWNS_STOCK_IN {
reportCalendarOrQuarter: $ownsParam.reportCalendarOrQuarter
}]->(com)
ON CREATE
SET owns.value = toFloat($ownsParam.value),
owns.shares = toInteger($ownsParam.shares)
RETURN mgr.managerName, owns.reportCalendarOrQuarter, com.companyName
"""
kg.query(cypher, params={ 'ownsParam': first_form13 })
kg.query("""
MATCH (mgr:Manager {managerCik: $ownsParam.managerCik})
-[owns:OWNS_STOCK_IN]->
(com:Company {cusip6: $ownsParam.cusip6})
RETURN owns { .shares, .value }
""", params={ 'ownsParam': first_form13 })
我们还定义了 OWNS_STOCK_IN 的属性,将参数设置为价值和股份,并在所有经理人中循环。
cypher = """
MATCH (mgr:Manager {managerCik: $ownsParam.managerCik}),
(com:Company {cusip6: $ownsParam.cusip6})
MERGE (mgr)-[owns:OWNS_STOCK_IN {
reportCalendarOrQuarter: $ownsParam.reportCalendarOrQuarter
}]->(com)
ON CREATE
SET owns.value = toFloat($ownsParam.value),
owns.shares = toInteger($ownsParam.shares)
"""
#loop through all Form 13s
for form13 in all_form13s:
kg.query(cypher, params={'ownsParam': form13 })
现在,让我们检查一下我们的 KG 模式:
kg.refresh_schema()
print(textwrap.fill(kg.schema, 60))
创建的关系如下:
(:Chunk)-[:NEXT]->(:Chunk)
(:Chunk)-[:PART_OF]->(:Form)
(:Form)-[:SECTION]->(:Chunk)
(:公司)-[:文件]->(:表格)
(:经理)-[:OWNS_STOCK_IN]->(:公司)
我们的知识图谱外观如何?运行下面的 Cypher:两个数据集群之间存在连接。
MATCH (n)
OPTIONAL MATCH (n)-[r]-(m)
RETURN COLLECT(DISTINCT n) AS nodes, COLLECT(DISTINCT r) AS relationships
将公司限制在 25 家以内,并仔细研究 KG 的这一部分:
MATCH p=()-[:OWNS_STOCK_IN]->() RETURN p LIMIT 25;
cypher = """
MATCH (chunk:Chunk)
RETURN chunk.chunkId as chunkId
"""
chunk_rows = kg.query(cypher)
chunk_first_row = chunk_rows[0]
ref_chunk_id = chunk_first_row['chunkId']
cypher = """
MATCH (:Chunk {chunkId: $chunkIdParam})-[:PART_OF]->(f:Form)
RETURN f.source
"""
for i in range(0,len(chunk_rows)):
chunk_first_row = chunk_rows[i]
kg.query(cypher, params={'chunkIdParam': chunk_first_row['chunkId']})
我们对公司 FILED Form13 做同样的处理:
cypher = """
MATCH (:Chunk {chunkId: $chunkIdParam})-[:PART_OF]->(f:Form),
(com:Company)-[:FILED]->(f)
RETURN com.companyName as name
"""
for i in range(0,len(chunk_rows)):
chunk_first_row = chunk_rows[i]
kg.query(cypher, params={'chunkIdParam': chunk_first_row['chunkId']})
为了获得投资者的数量,我们要运行 Cypher:
cypher = """
MATCH (:Chunk {chunkId: $chunkIdParam})-[:PART_OF]->(f:Form),
(com:Company)-[:FILED]->(f),
(mgr:Manager)-[:OWNS_STOCK_IN]->(com)
RETURN com.companyName,
count(mgr.managerName) as numberOfinvestors
"""
for i in range(0,len(chunk_rows)):
chunk_first_row = chunk_rows[i]
kg.query(cypher, params={
'chunkIdParam': chunk_first_row['chunkId']
})
让我们获得一份投资于 NETAPP INC(拥有其股份)的公司名单..:
cypher = """
MATCH (:Chunk {chunkId: $chunkIdParam})-[:PART_OF]->(f:Form),
(com:Company)-[:FILED]->(f),
(mgr:Manager)-[owns:OWNS_STOCK_IN]->(com)
RETURN mgr.managerName + " owns " + owns.shares +
" shares of " + com.companyName +
" at a value of $" +
apoc.number.format(toInteger(owns.value)) AS text
LIMIT 10
"""
kg.query(cypher, params={
'chunkIdParam': ref_chunk_id
})
最后部分
现在一切准备就绪,让我们来查询整个知识图谱:我们将提供 VertexAI 响应的指令,以及 Cyphers 的示例,以便 LangChain 生成 Cyphers 并根据生成的 Cyphers 给出结果:
from langchain.prompts.prompt import PromptTemplate
from langchain.chains import GraphCypherQAChain
prompt_template = """Task:Generate Cypher statement to query a graph database.
Instructions:
Use only the provided relationship types and properties in the schema.
Do not use any other relationship types or properties that are not provided.
Schema:
{schema}
Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.
Examples: Here are a few examples of generated Cypher statements for particular questions:
# What investment firms are in San Francisco?
MATCH (mgr:Manager)-[:LOCATED_AT]->(mgrAddress:Address)
WHERE mgrAddress.city = 'San Francisco'
RETURN mgr.managerName
# What investment firms are near Santa Clara?
MATCH (address:Address)
WHERE address.city = "Santa Clara"
MATCH (mgr:Manager)-[:LOCATED_AT]->(managerAddress:Address)
WHERE point.distance(address.location,
managerAddress.location) < 10000
RETURN mgr.managerName, mgr.managerAddress
# What does Palo Alto Networks do?
CALL db.index.fulltext.queryNodes(
"fullTextCompanyNames",
"Palo Alto Networks"
) YIELD node, score
WITH node as com
MATCH (com)-[:FILED]->(f:Form),
(f)-[s:SECTION]->(c:Chunk)
WHERE s.f10kItem = "item1"
RETURN c.text
# Give me a list of 10 companies and the value invested by them in NETAPP INC.
MATCH
(com:Company)-[:FILED]->(f),
(mgr:Manager)-[owns:OWNS_STOCK_IN]->(com)
RETURN mgr.managerName + " owns " + owns.shares +
" shares of " + com.companyName +
" at a value of $" +
apoc.number.format(toInteger(owns.value)) AS text
LIMIT 10
The question is:
{question}"""
cypher_prompt = PromptTemplate(
input_variables=["schema", "question"],
template=prompt_template
)
cypherChain = GraphCypherQAChain.from_llm(
VertexAI(temperature=0),
graph=kg,
verbose=True,
cypher_prompt=cypher_prompt,
)
让我们运行问题,检查答案和生成的密码:
cypherChain.run("How many shares of NETAPP INC does Royal Bank of Canada own?")
让我们将文本答案与之前生成的投资者名单进行比较:
太好了。现在,让我们检查一下生成的加密语言在 Neo4j Workspace 中运行时是否有效:
842850. 同样正确。
现在,让我们来了解一下 KG 本身的结构:请注意,我没有说 NETAPP INC.
答案和 Cypher 又说对了:
另一个也是对的:
我们还可以查询 10K 表格的分块:
然而,我们无法利用这一公开数据集对图表进行追踪,因为没有上下文关联。
总而言之,虽然代码肯定还有修补和整理的余地,但完成最小可行图是我的目标。我们已经掌握了基本要领--构建数据库、导入和处理 JSON 和 CSV 文件、设置节点和连接,以及制作一些非常酷的可视化效果。