通过Python Notebook和OpenAI CLIP为视频构建矢量嵌入

2024年09月26日 由 alex 发表 65 0

简介

本文讨论了矢量嵌入在视频分析中的重要性,并通过一个简单的示例逐步介绍了如何构建这些嵌入。


创建 SingleStore 云账户

创建免费 SingleStore 云账户的步骤。我们将使用免费共享层,并使用默认的工作区和数据库名称。


导入笔记本

我们将从 GitHub 下载笔记本。


在 SingleStore 云门户的左侧导航窗格中,我们将选择 DEVELOP > Data Studio。


在网页右上方,我们将选择New Notebook > Import From File


运行笔记本

检查连接到 SingleStore 工作区后,我们将逐个运行单元格。


我们先从 GitHub 下载一段示例视频,然后直接在笔记本中播放这段短视频。示例视频长 142 秒。


对比语言-图像预训练(CLIP)是 OpenAI 的一个模型,它通过在共享嵌入空间中关联图像和文本来理解图像和文本。我们将以如下方式加载它:


device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device = device)


我们将把一段视频分解成以下各个画格:


def extract_frames(video_path):
    frames = []
    cap = cv2.VideoCapture(video_path)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    total_seconds = total_frames / frame_rate
    target_frame_count = int(total_seconds)
    target_frame_index = 0
    for i in range(target_frame_count):
        cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_index)
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
        target_frame_index += int(frame_rate)
    cap.release()
    return frames


接下来,我们将以更简单的形式总结图片中发生的事情:


def generate_embedding(frame):
    frame_tensor = preprocess(PILImage.fromarray(frame)).unsqueeze(0).to(device)
    with torch.no_grad():
        embedding = model.encode_image(frame_tensor).cpu().numpy()
    return embedding[0]


现在,我们将从视频中提取视觉信息并将其总结为结构化格式,以便进一步分析:


def store_frame_embedding_and_image(video_path):
    frames = extract_frames(video_path)
    data = [
        (i+1, generate_embedding(frame), frame)
        for i, frame in enumerate(tqdm(
            frames,
            desc = "Processing frames",
            bar_format = "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]")
        )
    ]
    return pd.DataFrame(data, columns = ["frame_number", "embedding_data", "frame_data"])


让我们来看看存储在 DataFrame 中的数据的大小特征:


embedding_lengths = df["embedding_data"].str.len()
frame_lengths = df["frame_data"].str.len()
# Calculate min and max lengths for embeddings and frames
min_embedding_length, max_embedding_length = embedding_lengths.min(), embedding_lengths.max()
min_frame_length, max_frame_length = frame_lengths.min(), frame_lengths.max()
# Print results
print(f"Min length of embedding vectors: {min_embedding_length}")
print(f"Max length of embedding vectors: {max_embedding_length}")
print(f"Min length of frame data vectors: {min_frame_length}")
print(f"Max length of frame data vectors: {max_frame_length}")


输出示例:


Min length of embedding vectors: 512
Max length of embedding vectors: 512
Min length of frame data vectors: 1080
Max length of frame data vectors: 1080


现在,我们来量化查询嵌入与 DataFrame 中每个帧嵌入的相似程度,以衡量查询与帧之间的相似度:


def calculate_similarity(query_embedding, df):
    # Convert the query embedding to a tensor
    query_tensor = torch.tensor(query_embedding, dtype = torch.float32).to(device)
    # Convert the list of embeddings to a numpy array
    embeddings_np = np.array(df["embedding_data"].tolist())
    # Create a tensor from the numpy array
    embeddings_tensor = torch.tensor(embeddings_np, dtype = torch.float32).to(device)
    # Compute similarities using matrix multiplication
    similarities = torch.mm(embeddings_tensor, query_tensor.unsqueeze(1)).squeeze().tolist()
    return similarities


现在,我们用更简单的数字形式来概括文本查询的含义:


def encode_text_query(query):
    # Tokenize the query text
    tokens = clip.tokenize([query]).to(device)
    
    # Compute text features using the pretrained model
    with torch.no_grad():
        text_features = model.encode_text(tokens)
    
    # Convert the tensor to a NumPy array and return it
    return text_features.cpu().numpy().flatten()


并在出现提示时输入查询字符串 “Ultra-Fast Ingestion”:


query = input("Enter your query: ")
text_query_embedding = encode_text_query(query)
text_query_embedding /= np.linalg.norm(text_query_embedding)
text_similarities = calculate_similarity(text_query_embedding, df)
df["text_similarity"] = text_similarities


现在我们将得到前 5 个最佳文本匹配:


# Retrieve the top 5 text matches based on similarity
top_text_matches = df.nlargest(5, "text_similarity")
print("Top 5 best matches:")
print(top_text_matches[["frame_number", "text_similarity"]].to_string(index = False))


输出示例


Top 5 best matches:
 frame_number  text_similarity
           40         0.346581
           39         0.345179
           43         0.301896
           53         0.298285
           52         0.294805


我们还可以绘制框架图:


def plot_frames(frames, frame_numbers):
    num_frames = len(frames)
    fig, axes = plt.subplots(1, num_frames, figsize = (15, 5))
    
    for ax, frame_data, frame_number in zip(axes, frames, frame_numbers):
        ax.imshow(frame_data)
        ax.set_title(f"Frame {frame_number}")
        ax.axis("off")
    
    plt.tight_layout()
    plt.show()
# Collect frame data and numbers for the top text matches
top_text_matches_indices = top_text_matches.index.tolist()
frames = [df.at[index, "frame_data"] for index in top_text_matches_indices]
frame_numbers = [df.at[index, "frame_number"] for index in top_text_matches_indices]
# Plot the frames
plot_frames(frames, frame_numbers)


现在,我们用更简单的数字形式来概括图像查询:


def encode_image_query(image):
    # Preprocess the image and add batch dimension
    image_tensor = preprocess(image).unsqueeze(0).to(device)
    
    # Extract features using the model
    with torch.no_grad():
        image_features = model.encode_image(image_tensor)
    
    # Convert features to NumPy array and flatten
    return image_features.cpu().numpy().flatten()


并下载用于查询的示例图像:


image_url = "https://github.com/VeryFatBoy/clip-demo/raw/main/thumbnails/1_what_makes_singlestore_unique.png"
response = requests.get(image_url)
if response.status_code == 200:
    display(Image(url = image_url))
    image_file = PILImage.open(BytesIO(response.content))
    image_query_embedding = encode_image_query(image_file)
    image_query_embedding /= np.linalg.norm(image_query_embedding)
    image_similarities = calculate_similarity(image_query_embedding, df)
    df["image_similarity"] = image_similarities
else:
    print("Failed to download the image, status code:", response.status_code)


现在,我们将得到图像匹配度最高的 5 张图片:


top_image_matches = df.nlargest(5, "image_similarity")
print("Top 5 best matches:")
print(top_image_matches[["frame_number", "image_similarity"]].to_string(index = False))


输出示例:


Top 5 best matches:
 frame_number  image_similarity
            7          0.877372
            6          0.607051
            9          0.591181
            4          0.513214
           15          0.502777


我们还可以绘制框架图:


# Collect frame data and numbers for the top image matches
top_image_matches_indices = top_image_matches.index.tolist()
frames = [df.at[index, "frame_data"] for index in top_image_matches_indices]
frame_numbers = [df.at[index, "frame_number"] for index in top_image_matches_indices]
# Plot the frames
plot_frames(frames, frame_numbers)


现在,让我们使用元素平均法将文字和图像结合起来:


combined_query_embedding = (text_query_embedding + image_query_embedding) / 2
combined_similarities = calculate_similarity(combined_query_embedding, df)
df["combined_similarity"] = combined_similarities


现在,我们将选出综合成绩最好的前 5 场比赛:


top_combined_matches = df.nlargest(5, "combined_similarity")
print("Top 5 best matches:")
print(top_combined_matches[["frame_number", "combined_similarity"]].to_string(index = False))


输出示例:


Top 5 best matches:
 frame_number  combined_similarity
            7             0.516626
            6             0.413325
            9             0.380147
            4             0.363691
            3             0.355250


我们还可以绘制框架图:


# Collect frame data and numbers for the top combined matches
top_combined_matches_indices = top_combined_matches.index.tolist()
frames = [df.at[index, "frame_data"] for index in top_combined_matches_indices]
frame_numbers = [df.at[index, "frame_number"] for index in top_combined_matches_indices]
# Plot the frames
plot_frames(frames, frame_numbers)


接下来,我们将在 SingleStore 中存储数据。首先,我们要准备数据:


frames_df = df.copy()
frames_df.drop(
    columns = ["text_similarity", "image_similarity", "combined_similarity"],
    inplace = True
)
query_string = combined_query_embedding.copy()


我们还需要进行一下数据清理:


def process_data(arr):
    return np.array2string(arr, separator = ",").replace("\n", "")
frames_df["embedding_data"] = frames_df["embedding_data"].apply(process_data)
frames_df["frame_data"] = frames_df["frame_data"].apply(process_data)
query_string = process_data(query_string)


我们将检查是否在免费共享层上运行:


shared_tier_check = %sql SHOW VARIABLES LIKE "is_shared_tier"
if not shared_tier_check or shared_tier_check[0][1] == "OFF":
    %sql DROP DATABASE IF EXISTS video_db;
    %sql CREATE DATABASE IF NOT EXISTS video_db;


然后获取数据库连接:


from sqlalchemy import *
db_connection = create_engine(connection_url)


我们将确保有一个表格来存储数据:


DROP TABLE IF EXISTS frames;
CREATE TABLE IF NOT EXISTS frames (
    frame_number INT(10) UNSIGNED NOT NULL,
    embedding_data VECTOR(512) NOT NULL,
    frame_data TEXT,
    KEY(frame_number)
);


然后将 DataFrame 写入 SingleStore:


frames_df.to_sql(
    "frames",
    con = db_connection,
    if_exists = "append",
    index = False,
    chunksize = 1000
)


我们可以从 SingleStore 读回一些数据:


SELECT frame_number,
    SUBSTRING(embedding_data, 1, 50) AS embedding_data,
    SUBSTRING(frame_data, 1, 50) AS frame_data
FROM frames
LIMIT 1;


我们还可以创建 ANN 索引:


ALTER TABLE frames ADD VECTOR INDEX (embedding_data)
     INDEX_OPTIONS '{
          "index_type":"AUTO",
          "metric_type":"DOT_PRODUCT"
     }';


首先,让我们运行一个不使用 ANN 索引的查询:


SELECT frame_number,
    embedding_data <*> :query_string AS similarity
FROM frames
ORDER BY similarity USE INDEX () DESC
LIMIT 5;


输出示例:


frame_number          similarity
           7  0.5166257619857788
           6  0.4133252203464508
           9 0.38014671206474304
           4 0.36369115114212036
           3 0.35524997115135193


现在,我们将使用 ANN 索引运行一次查询:


SELECT frame_number,
    embedding_data <*> :query_string AS similarity
FROM frames
ORDER BY similarity DESC
LIMIT 5;


输出示例:


frame_number          similarity
           7  0.5166257619857788
           6  0.4133252203464508
           9 0.38014671206474304
           4 0.36369115114212036
           3 0.35524997115135193


我们还可以使用 Python 作为替代:


sql_query = """
SELECT frame_number, embedding_data, frame_data
FROM frames
ORDER BY embedding_data <*> %s DESC
LIMIT 5;
"""
new_frames_df = pd.read_sql(
    sql_query,
    con = db_connection,
    params = (query_string,)
)
new_frames_df.head()


由于我们只存储了少量数据(142 行),因此无论是否使用 ANN 索引,结果都是相同的。我们查询数据库的结果与之前合并查询的结果一致。


总结

在本文中,我们使用 Python 和 OpenAI 的 CLIP 模型将向量嵌入应用于视频分析。我们了解了如何从视频中提取帧,为每个帧生成嵌入,并使用这些嵌入根据文本和图像查询执行相似性搜索。这让我们能够检索相关的视频片段,使其成为视频内容分析的有用工具。


文章来源:https://medium.com/@VeryFatBoy/quick-tip-build-vector-embeddings-for-video-via-python-notebook-openai-clip-809e3ce5cd17
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消