在本文中,我们将结合这两个生成式 AI 用例来创建一个数据分析师聊天机器人的原型,该聊天机器人可以从预先创建或动态检索的报告数据中获取数据,并可以通过利用Google BigQuery Vector Store来学习和提高其响应质量,以便存储过去对用户问题的成功答案以供日后参考。
Google BigQuery Vector Store 是一项完全托管的服务,允许用户在 Google BigQuery 中高效地存储和查询矢量嵌入。通过直接集成到 BigQuery 中,矢量存储利用了 BigQuery 的可扩展性、性能和强大的安全基础架构,为管理海量矢量数据集提供了经济高效的解决方案。
LangChain 为与 BigQuery 向量存储进行交互提供了更高层次的抽象,简化了嵌入生成、向量存储和相似性搜索的过程。LangChain 处理嵌入生成(使用预训练模型)、将向量上传到 BigQuery 表以及根据所选距离度量执行相似性搜索。
本用例中使用的生成式人工智能示例是用 Python 编写的,出于开发目的,在 Google BigQuery python 笔记本中运行。下图显示了关键组件以及它们如何相互配合以回答用户输入的问题。
1. 数据源(BigQuery): 损益数据存放在 BigQuery 中,组织成 pl_reports、profit_and_loss_report_account_group 等表。
2. 矢量存储(BigQuery): BigQueryVectorStore 表用于存储预先创建的损益分析文本的嵌入。这样就可以进行高效的相似性搜索,快速找到与用户问题相关的上下文。表 pl_reports_vector_storage 保存这些嵌入。单独的向量存储表 successful_qa_pairs 用于存储成功的问答对,以便学习和提高系统性能。
# Create a BigQuery client
client = bigquery.Client.from_service_account_json(service_account_file)
# Initialize embedding model
embedding_model = VertexAIEmbeddings(
model_name="textembedding-gecko@latest",
project=project
)
# Initialize BigQueryVectorStore containing P&L report analysis texts
vector_store = BigQueryVectorStore(
project_id=project,
dataset_name=dataset,
table_name="pl_reports_vector_storage",
location=location,
embedding=embedding_model,
)
def load_vector_storage():
query = f"""
SELECT date_month as month, COALESCE(report_analysis, '') || COALESCE(invoice_analysis, '') || COALESCE(recurring_payments_analysis, '') as content
FROM `{project}.{dataset}.pl_reports`
"""
df = client.query(query).to_dataframe()
for _, row in df.iterrows():
month = row['month']
text = row['content']
metadata = {'month': month} # Removed analysis_type as it's no longer needed
vector_store.add_texts([text], metadatas=[metadata])
print("Vector storage loaded successfully.")
3. LLM(OpenAI 的 GPT-4): 核心智能由 OpenAI 的 GPT-4 提供,它充当问题解答引擎,决定预先创建的分析回答问题的可能性,并提供自然语言处理功能。
def should_query_vector_store(question):
prompt = PromptTemplate(
input_variables=["question", "content_description", "valid_time_range"],
template="""Given the following question and description of the content in a vector store,
determine if the vector store is likely to contain information that can answer the question.
Consider the following:
1. Does the question ask about any of the key metrics or analyses mentioned in the content description?
2. Does the question fall within the time frame covered by the vector store? Valid time range: {valid_time_range}
3. Is the level of detail requested (category, subcategory, account group) available in the vector store?
Respond with 'Yes' if the vector store is likely to contain relevant information, or 'No' if it's unlikely or unclear.
Question: {question}
Vector Store Content Description:
{content_description}
Decision (Yes/No):
Explanation:"""
)
chain = LLMChain(llm=llm, prompt=prompt)
response = chain.run(question=question,
content_description=vector_store_content_description,
valid_time_range=f"{valid_time_range[0].strftime('%B %Y')} to {valid_time_range[1].strftime('%B %Y')}")
# Extract the decision from the response
decision_match = re.search(r'Decision \(Yes/No\):\s*(Yes|No)', response, re.IGNORECASE)
if decision_match:
decision = decision_match.group(1).lower()
return decision == 'yes'
else:
# If no clear decision is found in the expected format, look for a 'Yes' at the beginning of the response
if response.strip().lower().startswith('yes'):
return True
else:
return False
4. Langchain Langchain 负责协调不同组件之间的交互。它管理代理、内存、链和工具包,以提供一个连贯高效的工作流程。
5. SQL 代理(Langchain): SQL 代理用于在向量存储不包含足够信息来回答问题时直接查询 BigQuery。
6. 嵌入模型(Vertex AI):Google Vertex AI 的 textembedding-gecko@latest 可创建文本数据的向量嵌入,从而在向量存储中进行语义搜索,以尝试找到类似问题的成功答案示例,用于微调用户的查询。
# Initialize embedding model
embedding_model = VertexAIEmbeddings(
model_name="textembedding-gecko@latest",
project=project
)
def get_similar_qa(question: str, k: int = 3):
"""Retrieve similar Q&A pairs from the vector store."""
print(f"Searching for similar Q&A pairs to: {question}")
similar_qa = qa_vector_store.similarity_search(question, k=k)
print(f"Retrieved {len(similar_qa)} similar Q&A pairs")
converted_results = []
for i, item in enumerate(similar_qa):
if isinstance(item, Document):
converted_results.append(item)
print(f"Document {i+1} (already Document):")
print(f" Page content: {item.page_content[:100]}...")
print(f" Metadata: {item.metadata}")
else:
# Handle cases where the item might be a dict or have a different structure
text = item.get('text', item.get('page_content', ''))
metadata = {key: value for key, value in item.items() if key not in ['text', 'page_content']}
doc = Document(page_content=text, metadata=metadata)
converted_results.append(doc)
print(f"Document {i+1} (converted to Document):")
print(f" Page content: {text[:100]}...")
print(f" Metadata: {metadata}")
return converted_results
那么,这些组件和数据存储是如何结合在一起回答用户查询的呢?回答问题的过程如下图所示: