了解RAG
在深入研究代码之前,让我们简要地讨论一下RAG。这是一种结合了像GPT-4这样的模型的文本生成能力与信息检索的方法,以提供精确、上下文相关的信息。大型语言模型(LLM)可能并不总是能够访问我们需要的特定数据。RAG允许我们输入相关数据(比如我们的播客剧本),这样模型就可以提供相关的答案。
这里是一个简单的流程:我们加载我们的数据(比如播客剧本),将其分解成更小的片段,对这些片段进行嵌入,并将它们存储在一个向量数据库中。
当我们进行查询时,模型会将我们的问题与这些数据段进行比较。最相关的文本被检索出来,并连同我们的问题一起输入到大型语言模型(LLM)中,生成一个准确的回答。本质上,RAG检索相关信息并使用它来回答我们的询问。
总的来说,这就是RAG所做的。它检索与你的查询相关的信息片段,然后利用这些信息来生成答案。
应用程序如何工作
这个应用程序非常好用:将YouTube播客的URL粘贴到Streamlit界面,输入你的OpenAI密钥,你就会得到该播客的摘要。然后,你可以提出具体问题,比如,“有哪些书籍推荐?”或“有哪些最佳的睡眠提示?”
代码分为三个主要部分:从YouTube获取数据、生成摘要和答案,以及Streamlit前端界面。还有一个可选的第四部分,用于高级RAG用法,但是基础版本就很好用了。我们来深入代码吧!
第一部分:与YouTube互动
首先,我们的应用连接到YouTube,检索视频标题和字幕——了解播客内容的关键。
功能1:获取视频标题
该功能使用视频URL获取YouTube视频的标题。它发送一个网络请求,解析HTML以找到标题标签,并返回标题。这是我们了解每个视频内容的第一步。
import requests
from bs4 import BeautifulSoup
def get_youtube_video_title(video_url):
response = requests.get(video_url)
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('meta', property='og:title')
return title['content'] if title else "Title not found"
功能 2:收集字幕
接下来,我们获取视频的字幕。我们使用标题提取功能并运用LangChain的YoutubeLoader来加载字幕。这些转录文本将用于我们的RAG系统以生成相关回应。
from langchain.document_loaders import YoutubeLoader
from langchain.schema import Document
def fetch_youtube_captions(video_url):
title = get_youtube_video_title(video_url)
loader = YoutubeLoader.from_youtube_url(video_url)
docs = loader.load()
if docs and len(docs) > 0:
intro_sentence = "This is the title of the video/transcription/conversation: "
title_content = intro_sentence + title
docs[0] = Document(page_content=title_content + "\n\n" + docs[0].page_content)
return docs
第二部分:数据处理与人工智能整合
核心应用逻辑:把对话转换成数据
我们将把播客分割成可消化的片段,为我们的人工智能转换处理,并存储它们以便轻松检索和生成摘要。
环境设置和全局变量
我们从设置编程环境和定义全局变量开始。这包括初始化一个数据库和我们对话的内存系统。
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from sklearn.cluster import KMeans
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langchain.schema import Document
import numpy as np
from langchain.chains import ConversationalRetrievalChain
import logging
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
logging.basicConfig(level=logging.INFO)
# Initialize global variables
global_chromadb = None
global_documents = None
global_short_documents = None
数据管理功能
这些函数对于管理你的应用程序数据至关重要。reset_globals 用于重置全局变量,init_chromadb 初始化一个已处理数据的数据库。
# Initialize the memory outside the function so it persists across different calls
conversation_memory = ConversationBufferMemory(
memory_key="chat_history",
max_len=50,
input_key="question",
output_key="answer",
return_messages=True,
)
# Function to reset global variables
def reset_globals():
global global_chromadb, global_documents, global_short_documents
global_chromadb = None
global_documents = None
global_short_documents = None
# Reset the conversation memory
if conversation_memory:
conversation_memory.clear()
def init_chromadb(openai_api_key):
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
global_chromadb = Chroma.from_documents(documents=global_short_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))
处理字幕和生成摘要
process_and_cluster_captions:在这里,我们处理YouTube字幕,为分析和回应生成做准备。这里是处理过程:
def process_and_cluster_captions(captions, openai_api_key, num_clusters=12):
global global_documents, global_short_documents
logging.info("Processing and clustering captions")
# Log the first 500 characters of the captions to check their format
logging.info(f"Captions received (first 500 characters): {captions[0].page_content[:500]}")
caption_content = captions[0].page_content
# Ensure captions is a string before processing
if not isinstance(caption_content, str):
logging.error("Captions are not in the expected string format")
return []
# Create longer chunks for summary
summary_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0, separators=["\n\n", "\n", " ", ""])
summary_docs = summary_splitter.create_documents([caption_content])
# Create shorter chunks for QA
qa_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=0, separators=["\n\n", "\n", " ", ""])
qa_docs = qa_splitter.create_documents([caption_content])
# Process for summary
summary_embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key).embed_documents([x.page_content for x in summary_docs])
kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(summary_embeddings)
closest_indices = [np.argmin(np.linalg.norm(summary_embeddings - center, axis=1)) for center in kmeans.cluster_centers_]
representative_docs = [summary_docs[i] for i in closest_indices]
# Store documents globally
global_documents = summary_docs # For summary
global_short_documents = qa_docs # For QA
init_chromadb(openai_api_key) # Initialize database with longer chunks
return representative_docs
generate_summary: 该函数采用处理后的字幕,并利用OpenAI的AI模型生成视频的清晰简洁摘要。具体操作如下:
这种方法确保了生成的摘要既全面又直接,捕捉了播客的精髓。
def generate_summary(representative_docs, openai_api_key, model_name):generate_summary(representative_docs, openai_api_key, model_name):
logging.info("Generating summary")
llm4 = ChatOpenAI(model_name=model_name, temperature=0.2, openai_api_key=openai_api_key)
# Concatenate texts for summary
summary_text = "\n".join([doc.page_content for doc in representative_docs])
summary_prompt_template = PromptTemplate(
template=(
"Create a concise summary of a podcast conversation based on the text provided below. The text consists of selected, representative sections from different parts of the conversation. "
"Your task is to synthesize these sections into a single cohesive and concise summary. Focus on the overarching themes and main points discussed throughout the podcast. "
"The summary should give a clear and complete understanding of the conversation's key topics and insights, while omitting any extraneous details. It should be engaging and easy to read, ideally in one or two paragraphs. Keep it short where possible"
"\n\nSelected Podcast Sections:\n{text}\n\nSummary:"
),
input_variables=["text"]
)
# Load summarizer chain
summarize_chain = load_summarize_chain(llm=llm4, chain_type="stuff", prompt=summary_prompt_template)
# Run the summarizer chain
summary = summarize_chain.run([Document(page_content=summary_text)])
logging.info("Summary generation completed")
return summarypythonCopy code
回答用户问题
answer_question:我们 RAG 系统的核心。该函数使用处理后的数据来回答特定的用户查询,展示了检索增强生成的强大功能。
通过这个过程,用户接收到针对性的、有根据的回应,增强了他们与播客内容的互动
def answer_question(question, openai_api_key, model_name):
llm4 = ChatOpenAI(model_name=model_name, temperature=0, openai_api_key=openai_api_key)
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
init_chromadb(openai_api_key, documents=global_short_documents)
logging.info(f"Answering question: {question}")
chatTemplate = """
You are an AI assistant tasked with answering questions based on context from a podcast conversation. Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to three sentences or less, focusing on the most relevant information.
Chat Messages (if relevant): {chat_history}
Question: {question}
Context from Podcast: {context}
Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm4,
chain_type="stuff",
retriever=global_chromadb.as_retriever(search_type="mmr", search_kwargs={"k":12}),
memory=conversation_memory,
#return_source_documents=True,
combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
)
# Log the current chat history
current_chat_history = conversation_memory.load_memory_variables({})
logging.info(f"Current Chat History: {current_chat_history}")
response = qa_chain({"question": question})
logging.info(f"this is the result: {response}")
output = response['answer']
return output
第三部分:Streamlit 用户界面
激活应用程序
最后,我们使用 Streamlit 创建一个交互式的 web 应用程序,将所有内容整合在一起。用户可以输入 YouTube 链接,提问,并获取 AI 生成的摘要和答案。下面的代码是我自己的界面版本,你可以随意使用我的或根据你的偏好自定义它。
import streamlit as st
from youtuber import fetch_youtube_captions
from agent import process_and_cluster_captions, generate_summary, answer_question, reset_globals
# Set Streamlit page configuration with custom tab title
st.set_page_config(page_title="?GPTpod", page_icon="?", layout="wide")
def user_query(question, openai_api_key, model_name):
"""Process and display the query response."""
# Add the user's question to the conversation
st.session_state.conversation.append((f"{question}", "user-message"))
# Check if this query has been processed before
if question not in st.session_state.processed_questions:
# Process the query
answer = answer_question(question, openai_api_key, model_name)
if isinstance(answer, str):
st.session_state.conversation.append((f"{answer}", "grimoire-message"))
else:
st.session_state.conversation.append(("Could not find a proper answer.", "grimoire-message"))
st.rerun()
# Mark this question as processed
st.session_state.processed_questions.add(question)
# Initialize session state
if 'conversation' not in st.session_state:
st.session_state.conversation = []
st.session_state.asked_questions = set()
st.session_state.processed_questions = set()
# Sidebar for input and operations
with st.sidebar:
st.title("GPT Podcast Surfer???")
st.image("img.png")
# Expandable Instructions
with st.expander("? How to use:", expanded=False):
st.markdown("""
- ? **Enter your OpenAI API Key.**
- ? **Paste a YouTube URL.**
- ?♂️ **Click 'Run it' to process.**
- ?️♂️ **Ask questions in the chat.**
""")
# Model selection in the sidebar
model_choice = st.sidebar.selectbox("Choose Model:",
("GPT-4 Turbo", "GPT-3.5 Turbo"),
index=0) # Default to GPT-4 Turbo
# Map friendly names to actual model names
model_name_mapping = {
"GPT-4 Turbo": "gpt-4-1106-preview",
"GPT-3.5 Turbo": "gpt-3.5-turbo"
}
selected_model = model_name_mapping[model_choice]
st.session_state['selected_model'] = model_name_mapping[model_choice]
# Input for OpenAI API Key
openai_api_key = st.text_input("Enter your OpenAI API Key:", type="password")
# Save the API key in session state if it's entered
if openai_api_key:
st.session_state['openai_api_key'] = openai_api_key
youtube_url = st.text_input("Enter YouTube URL:")
# Button to trigger processing
if st.button("?Run it"):
if openai_api_key:
if youtube_url and 'processed_data' not in st.session_state:
reset_globals()
with st.spinner('?? GPT is cooking up your podcast... hang tight for a few secs?'):
captions = fetch_youtube_captions(youtube_url)
if captions:
representative_docs = process_and_cluster_captions(captions, st.session_state['openai_api_key'])
summary = generate_summary(representative_docs, st.session_state['openai_api_key'], selected_model)
st.session_state.processed_data = (representative_docs, summary)
if 'summary_displayed' not in st.session_state:
st.session_state.conversation.append((f"Here's a rundown of the conversation: {summary}", "summary-message"))
guiding_message = "Feel free to ask me anything else about it! :)"
st.session_state.conversation.append((guiding_message, "grimoire-message"))
st.session_state['summary_displayed'] = True
else:
st.error("Failed to fetch captions.")
else:
st.warning("Please add the OpenAI API key first.")
# Main app logic
for message, css_class in st.session_state.conversation:
role = "assistant" if css_class in ["grimoire-message", "summary-message", "suggestion-message"] else "user"
with st.chat_message(role):
st.markdown(message)
# Chat input field
if prompt := st.chat_input("Ask me anything about the podcast..."):
user_query(prompt, st.session_state.get('openai_api_key', ''), st.session_state.get('selected_model', 'gpt-4-1106-preview'))
这是前端的样子:
第四部分:可选的RAG-Fusion以增强结果
RAG-Fusion的概念
RAG-Fusion 是我为进行一点升级而添加的实验性功能。虽然这不是必需的,但它提供了改进的结果,但代价是稍微多了一些代码和一点速度。这个想法是为了提高人工智能的理解和响应准确性。
为什么要使用RAG-Fusion?
RAG-Fusion旨在不仅解码用户询问的内容,还解码他们想要询问的内容,更深入地挖掘常被忽视的洞见。
RAG-Fusion的工作流程
RAG-Fusion的关键功能
def reciprocal_rank_fusion(results: list[list], k=60):
fused_scores = {}
for docs in results:
# the docs are returned in sorted order of relevance
for rank, doc in enumerate(docs):
doc_str = dumps(doc)
logging.info(f"Serialized Document: {doc_str}")
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
previous_score = fused_scores[doc_str]
fused_scores[doc_str] += 1 / (rank + k)
reranked_results = [
loads(doc)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]
logging.info(f"Reciprocal Rank Fusion applied. Reranked Results: {reranked_results[:10]}") # Log top 10 results
return reranked_results
def generate_multiple_queries(question, llm):
prompt = PromptTemplate(
input_variables=["question"],
template="""You are an AI language model assistant. Your task is to OUTPUT 4
different versions of the given user question to retrieve relevant documents from a vector
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search.
Provide these alternative questions separated by newlines.
Original question: {question}""",
)
# Create a chain with the language model and the prompt
llm_chain = LLMChain(llm=llm, prompt=prompt, output_parser=StrOutputParser())
# Run the chain
response = llm_chain.run({"question": question})
queries = response.split("\n")
logging.info(f"Generated Queries: {queries}")
return queries
def answer_question(question, openai_api_key, model_name):
llm4 = ChatOpenAI(model_name=model_name, temperature=0.1, openai_api_key=openai_api_key)
global global_chromadb, global_short_documents
if global_chromadb is None and global_short_documents is not None:
init_chromadb(openai_api_key, documents=global_short_documents)
logging.info(f"Answering question: {question}")
# Generate multiple queries
queries = generate_multiple_queries(question, llm4)
# Retrieve documents for each query
results = []
for query in queries:
retrieved_docs_with_scores = global_chromadb.similarity_search_with_score(query, k=8)
# Log the number of documents retrieved for each query and the first 3 docs
logging.info(f"Retrieved {len(retrieved_docs_with_scores)} documents for query '{query}': {retrieved_docs_with_scores[:3]}")
results.append(retrieved_docs_with_scores)
# Apply reciprocal rank fusion
reranked_results = reciprocal_rank_fusion(results)
logging.info(f"Number of reranked documents: {len(reranked_results)}")
#extract the Document object only
reranked_documents = [doc for doc, _ in reranked_results]
# Create a new Chroma instance with reranked results
custom_chromadb = Chroma.from_documents(documents=reranked_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))
chatTemplate = """
You are an AI assistant tasked with answering questions based on context from a podcast conversation.
Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to four sentences or less, focusing on the most relevant information.
Chat Messages (if relevant): {chat_history}
Question: {question}
Context from Podcast: {context}
Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm4,
chain_type="stuff",
retriever=custom_chromadb.as_retriever(search_type="similarity", search_kwargs={"k":10}),
memory=conversation_memory,
return_source_documents=True,
combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
)
# Log the current chat history
current_chat_history = conversation_memory.load_memory_variables({})
logging.info(f"Current Chat History: {current_chat_history}")
response = qa_chain({"question": question})
logging.info(f"Final response: {response}")
output = response['answer']
return output
总结
就这样!一份关于创建一个互动式、人工智能驱动的播客应用程序的逐步指南。