如何使用Qdrant DB创建基于矢量的电影推荐系统?

2023年11月22日 由 alex 发表 625 0

介绍


电影推荐系统从机器学习时代开始就不断发展,发展到现在的变形金刚和矢量数据库。


从基于支持向量机的传统推荐系统开始,我们现在已经进入了 Transformer 的世界。在本文中,我们将探讨如何在矢量数据库中有效存储数千个视频文件,以获得最佳推荐引擎。


4


传统推荐系统


随着变形金刚被引入人工智能领域,支持向量机(SVM)等机器学习算法已经演变成传统的电影推荐系统。电影推荐系统利用机器学习算法来预测用户对电影的偏好和评分。这些系统大致可分为三种主要类型:


  • 协同过滤: 通过收集许多拥有相似观点的用户的偏好来预测用户的兴趣。


  • 基于内容的过滤: 根据属性和描述推荐项目,重点关注用户过去的交互。


  • 混合系统: 结合协作和基于内容的方法来提高效率,并解决冷启动和数据稀疏等问题。


5


各种机器学习技术,如基于实例的最近邻学习,协同过滤的矩阵分解,以及神经网络的深度学习,都有助于提高推荐系统的质量。这些系统会遇到冷启动问题和数据稀疏性等挑战。伦理考虑、可扩展性和上下文信息的集成进一步增加了设计有效和负责任的推荐系统的复杂性。


向量数据库的录入


向量数据库对高效的相似度搜索很有帮助。使用这种相似度搜索在电影推荐系统中特别有用,其目标是找到与用户已经看过和喜欢的电影相似的电影。通过将电影表示为高维空间中的向量,我们可以使用距离度量(如余弦相似度或欧几里得距离)来识别彼此“接近”的电影,表明相似性。


6


随着电影和用户数量的增长,数据库的大小也在增长。矢量数据库设计用于处理大规模数据,同时保持高查询性能。这种可扩展性对于电影推荐系统至关重要,尤其是那些拥有大量电影库和用户基础的大型流媒体平台。


在这种情况下,我们将使用Qdrant数据库,因为它利用了快速近似最近邻搜索,特别是带有余弦相似度搜索的HNSW算法。


7


推荐系统架构


当我们使用矢量数据库时,让我们了解一下推荐系统是如何工作的。根据模型在一部电影中观察到的情绪来推荐电影。该体系结构分为两部分:


候选人的一代


候选人生成是推荐系统功能的重要组成部分。对于成千上万的视频,最初的步骤是根据口音或语言过滤内容。例如,对于西班牙电影,它将只在推荐中显示西班牙电影。这种过滤过程称为启发式过滤。


8


第二步是基于文本将视频转换为文本嵌入。hug Face上有许多模型可以将文本信息转换为向量嵌入。然而,为了获得文本信息,我们首先需要提取视频的音频格式。使用像Whisper或SpeechRecognition这样的音频到文本模型,我们可以检索文本信息作为文本。


利用嵌入模型,我们将文本信息转换为向量嵌入。将这些载体存储在安全可靠的数据库中至关重要。此外,向量数据库简化了相似性搜索。我们将把嵌入保存在Qdrant数据库中。


在很短的响应时间内,我们将基于Qdrant数据库的余弦相似度搜索得到相似的视频。类似视频的检索构成候选生成的最后一步。


重新排名


重新排名本质上是在推荐系统中进行的,根据文本信息中表达的情感来排列电影。在大型语言模型的帮助下,我们可以得到文本信息的意见评分。根据评分,电影将被重新排序,以供推荐。


9


使用 Qdrant 实施代码


在理解了推荐系统的架构之后,现在是时候在代码中实现理论了。我们理解了这个理论,我们知道如何分析电影文本的情感,但关键问题是如何将mp4格式的视频文件转换为文本嵌入。


为了这个代码实现,我从YouTube上提取了30部电影预告片。我们需要安装以后会用到的重要库。


!pip install -q torch 
!pip install -q openai moviepy
!pip install SpeechRecognition
!pip install -q transformers
!pip install -q datasets
!pip install -q qdrant_client


然后,我们将导入代码实现中所需的所有包。


import os
import moviepy.editor as mp
import os
import glob
import speech_recognition as sr
import csv
import numpy as np
import pandas as pd
from qdrant_client import QdrantClient
from qdrant_client.http import models
from transformers import AutoModel, AutoTokenizer
import torch


现在,我们将创建一个目录,在那里我们将保存我们的音频转录。


# specify your path
path = "/content/my_directory"
# create directory
os.makedirs(path, exist_ok=True)


制作目录后,我们将使用以下代码将视频转换为文本信息:


#directory containing video files
source_videos_file_path = r"/content/drive/MyDrive/qdrant_videos"
#directory for storing audio files
destination_audio_files_path = r"/content/my_directory/audios"
# CSV file for storing transcripts
csv_file_path = r"/content/my_directory/transcripts.csv"
# Create the destination directory if it doesn't exist
os.makedirs(destination_audio_files_path, exist_ok=True)
# Initialize recognizer class (for recognizing the speech)
r = sr.Recognizer()
# Open the CSV file in write mode
with open(csv_file_path, 'w', newline='') as csvfile:
    # Create a CSV writer
    writer = csv.writer(csvfile)
    # Write the header row
    writer.writerow(["Video File", "Transcript"])
    # Process video frame by frame
    for video_file in glob.glob(os.path.join(source_videos_file_path, '*.mp4')):
        # Convert video to audio
        video_clip = mp.VideoFileClip(video_file)
        audio_file_path = os.path.join(destination_audio_files_path, os.path.basename(video_file).replace("'", "").replace(" ", "_") + '.wav')
        video_clip.audio.write_audiofile(audio_file_path)
        # Transcribe audio to text
        with sr.AudioFile(audio_file_path) as source:
            # read the audio file
            audio_text = r.listen(source)
            # convert speech to text
            try:
                transcript = r.recognize_google(audio_text)
            except sr.UnknownValueError:
                print("Google Speech Recognition could not understand audio")
                transcript = "Error: Could not understand audio"
            except sr.RequestError as e:
                print("Could not request results from Google Speech Recognition service; {0}".format(e))
                transcript = "Error: Could not request results from Google Speech Recognition service; {0}".format(e)
        # Write the transcript to the CSV file
        writer.writerow([video_file, transcript])


然后,我们将看到数据帧格式的转录本。


data = pd.read_csv('/content/my_directory/transcripts.csv')
data.head()


有一些“语音识别”无法理解的转录本,因此我们将从数据框中删除该行。


data = data[~data['Transcript'].str.startswith('Error')]
data.head()


现在,我们将使用内存中的数据库创建一个QdrantClient实例。


client = QdrantClient(":memory:")


我们将创建一个集合来存储向量嵌入,使用余弦相似度搜索来测量距离。


my_collection = "text_collection"
client.recreate_collection(
    collection_name=my_collection,
    vectors_config=models.VectorParams(size=768, distance=models.Distance.COSINE)
)


我们将使用一个预训练的模型来帮助我们从数据集中提取嵌入层。我们将使用变压器库和GPT-2模型来完成此任务。


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained('gpt2')
model = AutoModel.from_pretrained('gpt2')#.to(device) # switch this for GPU


我们需要提取电影名称并创建一个新列,这样我们就可以知道哪个嵌入属于哪个电影。


def extract_movie_name(file_path):
    file_name = file_path.split("/")[-1]  # Get the last part of the path
    movie_name = file_name.replace(".mp4", "").strip()
    return movie_name
# Apply the function to create the new column
data['Movie_Name'] = data['Video File'].apply(extract_movie_name)
# Display the DataFrame
data[['Video File', 'Movie_Name', 'Transcript']]


现在,我们将创建一个辅助函数,用它来获取每个电影预告脚本的嵌入。


def get_embeddings(row):
    tokenizer = AutoTokenizer.from_pretrained('gpt2')
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    inputs = tokenizer(row['Transcript'], padding=True, truncation=True, max_length=128, return_tensors="pt")
    # Disable gradient computation for the following operations.
    with torch.no_grad():
      outputs = model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy()
    # Return the computed embeddings.
    return outputs


然后,我们将嵌入函数应用到我们的数据集。之后,我们将保存嵌入,这样我们就不必再次加载它们。


data['embeddings'] = data.apply(get_embeddings, axis=1)
np.save("vectors", np.array(data['embeddings']))


10


现在,我们将为每个电影脚本创建一个包含元数据的有效负载。


payload = data[['Transcript', 'Movie_Name', 'embeddings']].to_dict(orient="records")


我们将为令牌嵌入的平均池创建一个辅助函数。然后,我们将遍历成绩单列中的每个成绩单,以创建文本嵌入。


# Set the expected size for the vector embeddings
expected_vector_size = 768
# Define a function for mean pooling of token embeddings
def mean_pooling(model_output, attention_mask):
    # Extract token embeddings from the model output
    token_embeddings = model_output[0]
    # Expand the attention mask to match the size of token embeddings
    input_mask_expanded = (attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
    # Calculate the sum of token embeddings, considering the attention mask
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    # Calculate the sum of the attention mask (clamped to avoid division by zero)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    # Return the mean-pooled embeddings
    return sum_embeddings / sum_mask
# Initialize a list to store text embeddings
text_embeddings = []
# Loop through each transcript in the 'Transcript' column of the 'data' variable
for transcript in data['Transcript']:
    # Tokenize the transcript, ensuring padding and truncation, and return PyTorch tensors
    inputs = tokenizer(transcript, padding=True, truncation=True, max_length=128, return_tensors="pt")
    # Perform inference using the model with the tokenized inputs
    with torch.no_grad():
        embs = model(**inputs)
    # Calculate mean-pooled embeddings using the defined function
    embedding = mean_pooling(embs, inputs["attention_mask"])
    # Ensure the embeddings are of the correct size by trimming or padding
    embedding = embedding[:, :expected_vector_size]
    
    # Append the resulting embedding to the list
    text_embeddings.append(embedding)


为了在Qdrant数据库集合中为每个转录本分配显式ID,我们将创建一个ID列表,然后将ID、向量和有效负载的组合放在一起。


ids = list(range(len(data)))
# Convert PyTorch tensors to lists of floats
text_embeddings_list = [[float(num) for num in emb.numpy().flatten().tolist()[:expected_vector_size]] for emb in text_embeddings]
client.upsert(collection_name=my_collection,
              points=models.Batch(
                  ids=ids,
                  vectors=text_embeddings_list,
                  payloads=payload
                  )
              )


使用情感分析模型,你可以生成一个情感得分,其中将计算-1和1之间的情感极性。得分为-1表示消极情绪,0表示中立情绪,1表示积极情绪。


from textblob import TextBlob
def calculate_sentiment_score(text):
    # Create a TextBlob object
    blob = TextBlob(text)
    # Get the sentiment polarity (-1 to 1, where -1 is negative, 0 is neutral, and 1 is positive)
    sentiment_score = blob.sentiment.polarity
    return sentiment_score
# Example usage:
text_example = data['Transcript'].iloc[0]
sentiment_score_example = calculate_sentiment_score(text_example)
print(f"Sentiment Score: {sentiment_score_example}")


对于这个例子,最终的情绪得分将是0.75。现在,我们将应用辅助函数来计算“data”数据框架的情感得分。


data['Sentiment Score'] = data['Transcript'].apply(calculate_sentiment_score)
data.head()


你可以取每个电影脚本的向量嵌入的平均值,并将其与情感得分结合起来,得到最终的意见得分。


data['avg_embeddings'] = data['embeddings'].apply(lambda x: np.mean(x, axis=0))
data['Opinion_Score'] = 0.7 * data['avg_embeddings'] + 0.3 * data['Sentiment']


在上面的代码中,我给嵌入分配了更多的权重,因为它们捕获了语义内容和电影脚本之间的相似性。内在的内容相似性在决定整体意见得分方面更为关键。“情感”一栏定义了电影文本的情感基调。我给它分配了一个较低的权重,因为在计算总体意见得分时,情感作为一个因素并不像语义内容那么重要。权重是任意的(就像我们在分割数据集时给训练集和测试集赋予权重一样)。


11


然后创建一个电影推荐函数,在其中传递一个电影名称并获得所需的推荐电影数量。


def get_recommendations(movie_name):
    # Find the row corresponding to the given movie name
    query_row = data[data['Movie_Name'] == movie_name]
    if not query_row.empty:
      # Convert the 'Opinion_Score' column to a NumPy array
      opinion_scores_array = np.array(data['Opinion_Score'].tolist())
      # Upsert the 'Opinion_Score' vectors to the Qdrant collection
      opinion_scores_ids = list(range(len(data)))
      # Convert the 'Opinion_Score' array to a list of lists
      opinion_scores_list = opinion_scores_array.reshape(-1, 1).tolist()
      client.upsert(
          collection_name=my_collection,
          points=models.Batch(
              ids=opinion_scores_ids,
              vectors=opinion_scores_list
              )
          )
      # Define a query vector based on the opinion score you want to find similar movies for
      query_opinion_score = np.array([0.8] * 768)  # Adjust as needed
      # Perform a similarity search
      search_results = client.search(
          collection_name=my_collection,
          query_vector=query_opinion_score.tolist(),
          limit=3)
       # Extract movie recommendations from search results
      recommended_movie_ids = [result.id for result in search_results]
      recommended_movies = data.loc[data.index.isin(recommended_movie_ids)]
      # Display recommended movies
      print("Recommended Movies:")
      print(recommended_movies[['Movie_Name', 'Opinion_Score']])
    else:
      print(f"Movie '{movie_name}' not found in the dataset.")
# Example usage:
get_recommendations("Star Wars_ The Last Jedi Trailer (Official)")


这样,我们就可以使用Qdrant数据库创建一个电影推荐系统。


12


结论


矢量数据库有很多用例。在这些用例中,电影推荐系统通过余弦相似度搜索和大型语言模型得到了显着改进。


13


使用 Qdrant 数据库创建电影推荐系统非常有趣。
文章来源:https://medium.com/@akriti.upadhyay/how-to-create-a-vector-based-movie-recommendation-system-using-qdrant-db-e026554f49ec
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消