打造你的AI播客助手

2023年12月19日 由 alex 发表 277 0

了解RAG


在深入研究代码之前,让我们简要地讨论一下RAG。这是一种结合了像GPT-4这样的模型的文本生成能力与信息检索的方法,以提供精确、上下文相关的信息。大型语言模型(LLM)可能并不总是能够访问我们需要的特定数据。RAG允许我们输入相关数据(比如我们的播客剧本),这样模型就可以提供相关的答案。


这里是一个简单的流程:我们加载我们的数据(比如播客剧本),将其分解成更小的片段,对这些片段进行嵌入,并将它们存储在一个向量数据库中。


2


当我们进行查询时,模型会将我们的问题与这些数据段进行比较。最相关的文本被检索出来,并连同我们的问题一起输入到大型语言模型(LLM)中,生成一个准确的回答。本质上,RAG检索相关信息并使用它来回答我们的询问。


3


总的来说,这就是RAG所做的。它检索与你的查询相关的信息片段,然后利用这些信息来生成答案。


应用程序如何工作


这个应用程序非常好用:将YouTube播客的URL粘贴到Streamlit界面,输入你的OpenAI密钥,你就会得到该播客的摘要。然后,你可以提出具体问题,比如,“有哪些书籍推荐?”或“有哪些最佳的睡眠提示?”


代码分为三个主要部分:从YouTube获取数据、生成摘要和答案,以及Streamlit前端界面。还有一个可选的第四部分,用于高级RAG用法,但是基础版本就很好用了。我们来深入代码吧!


第一部分:与YouTube互动


首先,我们的应用连接到YouTube,检索视频标题和字幕——了解播客内容的关键。


功能1:获取视频标题


该功能使用视频URL获取YouTube视频的标题。它发送一个网络请求,解析HTML以找到标题标签,并返回标题。这是我们了解每个视频内容的第一步。


import requests
from bs4 import BeautifulSoup
def get_youtube_video_title(video_url):
    response = requests.get(video_url)
    soup = BeautifulSoup(response.content, 'html.parser')
    title = soup.find('meta', property='og:title')
    return title['content'] if title else "Title not found"


功能 2:收集字幕


接下来,我们获取视频的字幕。我们使用标题提取功能并运用LangChain的YoutubeLoader来加载字幕。这些转录文本将用于我们的RAG系统以生成相关回应。


from langchain.document_loaders import YoutubeLoader
from langchain.schema import Document
def fetch_youtube_captions(video_url):
    title = get_youtube_video_title(video_url)
    loader = YoutubeLoader.from_youtube_url(video_url)
    docs = loader.load()
    if docs and len(docs) > 0:
        intro_sentence = "This is the title of the video/transcription/conversation: "
        title_content = intro_sentence + title
        docs[0] = Document(page_content=title_content + "\n\n" + docs[0].page_content)
    return docs


第二部分:数据处理与人工智能整合


核心应用逻辑:把对话转换成数据


我们将把播客分割成可消化的片段,为我们的人工智能转换处理,并存储它们以便轻松检索和生成摘要。


环境设置和全局变量


我们从设置编程环境和定义全局变量开始。这包括初始化一个数据库和我们对话的内存系统。


from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from sklearn.cluster import KMeans
from langchain.chat_models import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langchain.schema import Document
import numpy as np
from langchain.chains import ConversationalRetrievalChain
import logging
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory

logging.basicConfig(level=logging.INFO) 
# Initialize global variables
global_chromadb = None
global_documents = None
global_short_documents = None


数据管理功能


这些函数对于管理你的应用程序数据至关重要。reset_globals 用于重置全局变量,init_chromadb 初始化一个已处理数据的数据库。


# Initialize the memory outside the function so it persists across different calls
conversation_memory = ConversationBufferMemory(
    memory_key="chat_history",
    max_len=50,
    input_key="question",
    output_key="answer",
    return_messages=True,
)
# Function to reset global variables
def reset_globals():
    global global_chromadb, global_documents, global_short_documents
    global_chromadb = None
    global_documents = None
    global_short_documents = None
    # Reset the conversation memory
    if conversation_memory:
        conversation_memory.clear()
def init_chromadb(openai_api_key):
    global global_chromadb, global_short_documents
    if global_chromadb is None and global_short_documents is not None:
        global_chromadb = Chroma.from_documents(documents=global_short_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))


处理字幕和生成摘要


process_and_cluster_captions:在这里,我们处理YouTube字幕,为分析和回应生成做准备。这里是处理过程:


  1. 初始数据检查:我们开始时会验证字幕的格式,确保它们适合处理。
  2. 划分字幕:字幕被划分为更小的片段,为两个目的服务:一部分用于总结,另一部分用于问答。这种划分对于有效处理特定任务至关重要。
  3. 按相关性聚类:我们使用KMeans算法对总结片段进行聚类。这是一个策略性的举措,用来过滤冗余内容,并仅关注播客中最有代表性的部分。通过从每个群集中选择一个片段,我们确保AI收到既多样性又简洁的信息,完美适合创建有意义的摘要。
  4. 全球可访问性:分割后的字幕全球存储,便于摘要和查询问题的访问。


def process_and_cluster_captions(captions, openai_api_key, num_clusters=12):
    global global_documents, global_short_documents 
    logging.info("Processing and clustering captions")
    
    # Log the first 500 characters of the captions to check their format
    logging.info(f"Captions received (first 500 characters): {captions[0].page_content[:500]}")
    caption_content = captions[0].page_content
    # Ensure captions is a string before processing
    if not isinstance(caption_content, str):
        logging.error("Captions are not in the expected string format")
        return []
    
    # Create longer chunks for summary
    summary_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0, separators=["\n\n", "\n", " ", ""])
    summary_docs = summary_splitter.create_documents([caption_content])
    # Create shorter chunks for QA
    qa_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=0, separators=["\n\n", "\n", " ", ""])
    qa_docs = qa_splitter.create_documents([caption_content])
    
    # Process for summary
    summary_embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key).embed_documents([x.page_content for x in summary_docs])
    kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(summary_embeddings)
    closest_indices = [np.argmin(np.linalg.norm(summary_embeddings - center, axis=1)) for center in kmeans.cluster_centers_]
    representative_docs = [summary_docs[i] for i in closest_indices]
    # Store documents globally
    global_documents = summary_docs  # For summary
    global_short_documents = qa_docs  # For QA
    init_chromadb(openai_api_key)  # Initialize database with longer chunks
    return representative_docs


generate_summary: 该函数采用处理后的字幕,并利用OpenAI的AI模型生成视频的清晰简洁摘要。具体操作如下:


  1. 结合关键片段:我们将聚类过程中选定的片段融合,形成连续的叙述。这确保了摘要覆盖了播客不同的方面。
  2. 引导式AI摘要:使用自定义提示,我们指导AI专注于创建一个简洁且包含信息丰富的摘要。这一步对于引导AI的注意力集中于总体主题和重要观点非常关键,避免了不必要的细节。
  3. 执行摘要链:AI接下来处理这段合并的文本,并生成摘要,为用户快速了解播客的核心内容提供帮助。


这种方法确保了生成的摘要既全面又直接,捕捉了播客的精髓。


def generate_summary(representative_docs, openai_api_key, model_name):generate_summary(representative_docs, openai_api_key, model_name):
    logging.info("Generating summary")
    llm4 = ChatOpenAI(model_name=model_name, temperature=0.2, openai_api_key=openai_api_key)
    # Concatenate texts for summary
    summary_text = "\n".join([doc.page_content for doc in representative_docs])
    summary_prompt_template = PromptTemplate(
        template=(
            "Create a concise summary of a podcast conversation based on the text provided below. The text consists of selected, representative sections from different parts of the conversation. "
            "Your task is to synthesize these sections into a single cohesive and concise summary. Focus on the overarching themes and main points discussed throughout the podcast. "
            "The summary should give a clear and complete understanding of the conversation's key topics and insights, while omitting any extraneous details. It should be engaging and easy to read, ideally in one or two paragraphs. Keep it short where possible"
            "\n\nSelected Podcast Sections:\n{text}\n\nSummary:"
        ),
        input_variables=["text"]
    )
    # Load summarizer chain
    summarize_chain = load_summarize_chain(llm=llm4, chain_type="stuff", prompt=summary_prompt_template)
    # Run the summarizer chain
    summary = summarize_chain.run([Document(page_content=summary_text)])
    logging.info("Summary generation completed")
    return summarypythonCopy code


回答用户问题


answer_question:我们 RAG 系统的核心。该函数使用处理后的数据来回答特定的用户查询,展示了检索增强生成的强大功能。


  1. 数据库初始化:它首先检查包含处理过的片段的数据库是否就绪。如果没有,它会初始化数据库以便进行高效检索。
  2. 检索和响应生成:然后,该功能结合用户的查询与从我们数据库检索到的相关数据。这种检索是为了寻找与问题最相关的上下文,确保AI的回应准确且相关。
  3. 生成上下文感知的答案:AI装备了问题和最匹配的上下文,并制定了一个简明扼要、直接解答用户查询的答案。


通过这个过程,用户接收到针对性的、有根据的回应,增强了他们与播客内容的互动


def answer_question(question, openai_api_key, model_name):
    llm4 = ChatOpenAI(model_name=model_name, temperature=0, openai_api_key=openai_api_key)
    global global_chromadb, global_short_documents
    if global_chromadb is None and global_short_documents is not None:
        init_chromadb(openai_api_key, documents=global_short_documents)
    
    logging.info(f"Answering question: {question}")
    chatTemplate = """
    You are an AI assistant tasked with answering questions based on context from a podcast conversation. Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to three sentences or less, focusing on the most relevant information.
    Chat Messages (if relevant): {chat_history}
    Question: {question} 
    Context from Podcast: {context} 
    Answer:"""
    QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=llm4, 
        chain_type="stuff", 
        retriever=global_chromadb.as_retriever(search_type="mmr", search_kwargs={"k":12}),
        memory=conversation_memory,
        #return_source_documents=True,
        combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
    )
    # Log the current chat history
    current_chat_history = conversation_memory.load_memory_variables({})
    logging.info(f"Current Chat History: {current_chat_history}")
    response = qa_chain({"question": question}) 
    logging.info(f"this is the result: {response}")
    output = response['answer']    
    return output


第三部分:Streamlit 用户界面


激活应用程序


最后,我们使用 Streamlit 创建一个交互式的 web 应用程序,将所有内容整合在一起。用户可以输入 YouTube 链接,提问,并获取 AI 生成的摘要和答案。下面的代码是我自己的界面版本,你可以随意使用我的或根据你的偏好自定义它。


import streamlit as st
from youtuber import fetch_youtube_captions
from agent import process_and_cluster_captions, generate_summary, answer_question, reset_globals
# Set Streamlit page configuration with custom tab title
st.set_page_config(page_title="?GPTpod", page_icon="?", layout="wide")
def user_query(question, openai_api_key, model_name):
    """Process and display the query response."""
    # Add the user's question to the conversation
    st.session_state.conversation.append((f"{question}", "user-message"))
    # Check if this query has been processed before
    if question not in st.session_state.processed_questions:
        # Process the query
        answer = answer_question(question, openai_api_key, model_name)
        if isinstance(answer, str):
            st.session_state.conversation.append((f"{answer}", "grimoire-message"))
        else:
            st.session_state.conversation.append(("Could not find a proper answer.", "grimoire-message"))
        
        st.rerun()
        # Mark this question as processed
        st.session_state.processed_questions.add(question)

# Initialize session state
if 'conversation' not in st.session_state:
    st.session_state.conversation = []
    st.session_state.asked_questions = set()
    st.session_state.processed_questions = set()
# Sidebar for input and operations
with st.sidebar:
    st.title("GPT Podcast Surfer???")
    st.image("img.png") 
    # Expandable Instructions
    with st.expander("? How to use:", expanded=False):
        st.markdown("""
            - ? **Enter your OpenAI API Key.**
            - ? **Paste a YouTube URL.**
            - ?‍♂️ **Click 'Run it' to process.**
            - ?️‍♂️ **Ask questions in the chat.**
        """)
    # Model selection in the sidebar
    model_choice = st.sidebar.selectbox("Choose Model:", 
                                        ("GPT-4 Turbo", "GPT-3.5 Turbo"), 
                                        index=0)  # Default to GPT-4 Turbo
    # Map friendly names to actual model names
    model_name_mapping = {
        "GPT-4 Turbo": "gpt-4-1106-preview",
        "GPT-3.5 Turbo": "gpt-3.5-turbo"
    }
    selected_model = model_name_mapping[model_choice]
    st.session_state['selected_model'] = model_name_mapping[model_choice]

    # Input for OpenAI API Key
    openai_api_key = st.text_input("Enter your OpenAI API Key:", type="password")
    # Save the API key in session state if it's entered
    if openai_api_key:
        st.session_state['openai_api_key'] = openai_api_key
    youtube_url = st.text_input("Enter YouTube URL:")
    # Button to trigger processing
    if st.button("?Run it"):
        if openai_api_key:
            if youtube_url and 'processed_data' not in st.session_state:
                reset_globals()
                with st.spinner('?‍? GPT is cooking up your podcast... hang tight for a few secs?'):
                    captions = fetch_youtube_captions(youtube_url)
                    if captions:
                        representative_docs = process_and_cluster_captions(captions, st.session_state['openai_api_key'])
                        summary = generate_summary(representative_docs, st.session_state['openai_api_key'], selected_model)
                        st.session_state.processed_data = (representative_docs, summary)
                        if 'summary_displayed' not in st.session_state:
                            st.session_state.conversation.append((f"Here's a rundown of the conversation: {summary}", "summary-message"))
                            guiding_message = "Feel free to ask me anything else about it! :)"
                            st.session_state.conversation.append((guiding_message, "grimoire-message"))
                            st.session_state['summary_displayed'] = True
                    else:
                        st.error("Failed to fetch captions.")
        else:
            st.warning("Please add the OpenAI API key first.")

# Main app logic
for message, css_class in st.session_state.conversation:
    role = "assistant" if css_class in ["grimoire-message", "summary-message", "suggestion-message"] else "user"
    with st.chat_message(role):
        st.markdown(message)

# Chat input field
if prompt := st.chat_input("Ask me anything about the podcast..."):
    user_query(prompt, st.session_state.get('openai_api_key', ''), st.session_state.get('selected_model', 'gpt-4-1106-preview'))


这是前端的样子:


4


第四部分:可选的RAG-Fusion以增强结果


RAG-Fusion的概念


RAG-Fusion 是我为进行一点升级而添加的实验性功能。虽然这不是必需的,但它提供了改进的结果,但代价是稍微多了一些代码和一点速度。这个想法是为了提高人工智能的理解和响应准确性。


为什么要使用RAG-Fusion?


  • 填补空缺:它通过生成和重新评估用户查询的多个版本,克服了标准RAG的一些局限性。这确保了更广泛和更精确的搜索。
  • 更好的搜索结果:它结合了互惠排序融合和自定义向量评分,导致更全面和精确的答案。


RAG-Fusion旨在不仅解码用户询问的内容,还解码他们想要询问的内容,更深入地挖掘常被忽视的洞见。


RAG-Fusion的工作流程


  1. 转化查询:我们首先使用一个语言模型将原始用户查询翻译成几个相似但不同的问题。这个多角度的方法对于彻底的搜索至关重要。
  2. 增强向量搜索:每个新查询都进行向量搜索,收集多样化的结果。
  3. 智能重排名:使用互惠排序融合,我们重新组织所有这些结果,优先考虑最相关的结果。
  4. 精心制作的最终输出:最顶部的结果,连同新的查询,引导语言模型生成一个由更广泛上下文信息所支持的回答。


RAG-Fusion的关键功能


  • reciprocal_rank_fusion:这个函数根据搜索结果的相关性分数重新排列它们,确保首先考虑最优答案。
  • generate_multiple_queries:它会创建初始查询的几个变体,扩展搜索范围。
  • answer_question:这是所有元素汇集的地方。该函数首先生成多个查询,检索每个查询的文档,并应用互惠排序融合。然后它使用一个包含这些精炼结果的自定义数据库来指导AI制作出更为明智和准确的回答。


def reciprocal_rank_fusion(results: list[list], k=60):
    fused_scores = {}
    for docs in results:
        # the docs are returned in sorted order of relevance
        for rank, doc in enumerate(docs):
            doc_str = dumps(doc)
            logging.info(f"Serialized Document: {doc_str}")
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            previous_score = fused_scores[doc_str]
            fused_scores[doc_str] += 1 / (rank + k)
    reranked_results = [
        loads(doc)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]
    logging.info(f"Reciprocal Rank Fusion applied. Reranked Results: {reranked_results[:10]}")  # Log top 10 results
    return reranked_results

def generate_multiple_queries(question, llm):
    prompt = PromptTemplate(
    input_variables=["question"],
    template="""You are an AI language model assistant. Your task is to OUTPUT 4 
    different versions of the given user question to retrieve relevant documents from a vector 
    database. By generating multiple perspectives on the user question, your goal is to help
    the user overcome some of the limitations of the distance-based similarity search. 
    Provide these alternative questions separated by newlines.
    Original question: {question}""",
)
    # Create a chain with the language model and the prompt
    llm_chain = LLMChain(llm=llm, prompt=prompt, output_parser=StrOutputParser())
    # Run the chain
    response = llm_chain.run({"question": question})
    queries = response.split("\n")
    
    logging.info(f"Generated Queries: {queries}")
    return queries

def answer_question(question, openai_api_key, model_name):
    llm4 = ChatOpenAI(model_name=model_name, temperature=0.1, openai_api_key=openai_api_key)
    global global_chromadb, global_short_documents
    if global_chromadb is None and global_short_documents is not None:
        init_chromadb(openai_api_key, documents=global_short_documents)
    
    logging.info(f"Answering question: {question}")
    # Generate multiple queries
    queries = generate_multiple_queries(question, llm4)
    # Retrieve documents for each query
    results = []
    for query in queries:
        retrieved_docs_with_scores = global_chromadb.similarity_search_with_score(query, k=8)
        # Log the number of documents retrieved for each query and the first 3 docs
        logging.info(f"Retrieved {len(retrieved_docs_with_scores)} documents for query '{query}': {retrieved_docs_with_scores[:3]}")
        results.append(retrieved_docs_with_scores)
    # Apply reciprocal rank fusion
    reranked_results = reciprocal_rank_fusion(results)
    logging.info(f"Number of reranked documents: {len(reranked_results)}")
    #extract the Document object only
    reranked_documents = [doc for doc, _ in reranked_results]
    # Create a new Chroma instance with reranked results
    custom_chromadb = Chroma.from_documents(documents=reranked_documents, embedding=OpenAIEmbeddings(openai_api_key=openai_api_key))
    chatTemplate = """
    You are an AI assistant tasked with answering questions based on context from a podcast conversation.
    Use the provided context and relevant chat messages to answer. If unsure, say so. Keep your answer to four sentences or less, focusing on the most relevant information.
    Chat Messages (if relevant): {chat_history}
    Question: {question} 
    Context from Podcast: {context} 
    Answer:"""
    QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question", "chat_history"],template=chatTemplate)
    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=llm4, 
        chain_type="stuff", 
        retriever=custom_chromadb.as_retriever(search_type="similarity", search_kwargs={"k":10}),
        memory=conversation_memory,
        return_source_documents=True,
        combine_docs_chain_kwargs={'prompt': QA_CHAIN_PROMPT},
    )
    # Log the current chat history
    current_chat_history = conversation_memory.load_memory_variables({})
    logging.info(f"Current Chat History: {current_chat_history}")
    response = qa_chain({"question": question}) 
    logging.info(f"Final response: {response}")
    output = response['answer']   
    return output


总结


就这样!一份关于创建一个互动式、人工智能驱动的播客应用程序的逐步指南。

文章来源:https://medium.com/gitconnected/building-your-ai-podcast-assistant-4944d7262f26
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消