想象一下,你正在构建一个医疗领域的问题解答系统。你希望确保该系统能在用户提问时准确检索到相关的医学文章。但是,一般的嵌入模型可能难以应对医学术语的高度专业化词汇和细微差别。
这就是微调的作用所在!
在这篇文章中,我们将深入探讨针对特定领域微调嵌入模型的过程。我们将专门为你的领域生成一个数据集,并用它来训练模型,以便更好地理解你所选领域中的微妙语言模式和概念。
最后,你将拥有一个功能更强大的嵌入模型,该模型针对你的领域进行了优化,能够为你的 NLP 任务提供更准确的检索和更好的结果。
嵌入: 理解概念
嵌入是一种强大的文本或图像数字表示方法,可以捕捉语义关系。将文本或音频想象成多维空间中的一个点,在这个点上,相似的单词或短语比不相似的单词或短语靠得更近。
嵌入对许多 NLP 任务至关重要,例如:
语义相似性: 查找两幅图像或文本的相似程度。
文本分类: 根据数据的含义将其归类。
问题解答: 查找与回答问题最相关的文档。
检索增强生成 (RAG): 将用于检索的嵌入模型和用于文本生成的语言模型相结合,以提高生成文本的质量和相关性。
Matryoshka 表征学习
Matryoshka 表征学习(MRL)是一种创建 “可截断 ”嵌入向量的技术。想象一下一系列嵌套的玩偶,每个玩偶内部都包含一个较小的玩偶。MRL 嵌入文本的方式是,前面的维度(如外部的玩偶)包含最重要的信息,后面的维度则增加细节。这样就可以在需要时只使用嵌入向量的一部分,从而降低存储和计算成本。
Bge-base-en
BAAI/bge-base-en-v1.5模型由北京人工智能学会(BAAI)开发,是一个功能强大的文本嵌入模型。它擅长各种 NLP 任务,在 MTEB 和 C-MTEB 等基准测试中表现出色。bge-base-en 模型对于计算资源有限的应用来说是一个不错的选择。
为什么要微调嵌入模型?
针对特定领域微调嵌入模型对于优化 RAG 系统至关重要。这一过程可确保模型对相似性的理解与特定领域的上下文和语言细微差别相一致。经过微调的嵌入模型能够更好地检索与问题最相关的文档,最终使 RAG 系统做出更准确、更相关的回答。
数据集格式: 为微调奠定基础
你可以使用各种数据集格式进行微调。
以下是最常见的类型:
在这篇文章中,我们将创建一个问题和答案的数据集,以微调我们的 bge-base-en-v1.5 模型。
损失函数: 指导训练过程
损失函数对于训练嵌入模型至关重要。它们衡量模型预测与实际标签之间的差异,为模型调整权重提供信号。
不同的损失函数适用于不同的数据集格式:
代码示例
安装依赖项
我们首先安装必要的库。我们将使用datasets、sentence-transformers和google-generativeai来处理数据集、嵌入模型和文本生成。
apt-get -qq install poppler-utils tesseract-ocr
pip install datasets sentence-transformers google-generativeai
pip install -q --user --upgrade pillow
pip install -q unstructured["all-docs"] pi_heif"all-docs"] pi_heif
pip install -q --upgrade unstructured
pip install --upgrade nltk
我们还将安装用于 PDF 解析的 unstructured 和用于文本处理的 nltk。
PDF 解析和文本提取
我们将使用unstructured库从 PDF 文件中提取文本和表格。
import nltk
import os
from unstructured.partition.pdf import partition_pdf
from collections import Counter
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt_tab')
def process_pdfs_in_folder(folder_path):
total_text = [] # To accumulate the text from all PDFs
# Get list of all PDF files in the folder
pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]
for pdf_file in pdf_files:
pdf_path = os.path.join(folder_path, pdf_file)
print(f"Processing: {pdf_path}")
# Apply the partition logic
elements = partition_pdf(pdf_path, strategy="auto")
# Display the types of elements
display(Counter(type(element) for element in elements))
# Join the elements to form text and add it to total_text list
text = "\n\n".join([str(el) for el in elements])
total_text.append(text)
# Return the total concatenated text
return "\n\n".join(total_text)
folder_path = "data"
all_text = process_pdfs_in_folder(folder_path)
我们会查看指定文件夹中的每个 PDF 文件,并将内容划分为文本、表格和数字。
然后,我们将文本元素合并为单一文本表示法。
自定义文本分块
我们使用 nltk 将提取的文本分成易于管理的块。这对于使文本更适合 llm 处理至关重要。
import nltk
nltk.download('punkt')download('punkt')
def nltk_based_splitter(text: str, chunk_size: int, overlap: int) -> list:
"""
Splits the input text into chunks of a specified size, with optional overlap between chunks.
Parameters:
- text: The input text to be split.
- chunk_size: The maximum size of each chunk (in terms of characters).
- overlap: The number of overlapping characters between consecutive chunks.
Returns:
- A list of text chunks, with or without overlap.
"""
from nltk.tokenize import sent_tokenize
# Tokenize the input text into individual sentences
sentences = sent_tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
# If the current chunk plus the next sentence doesn't exceed the chunk size, add the sentence to the chunk
if len(current_chunk) + len(sentence) <= chunk_size:
current_chunk += " " + sentence
else:
# Otherwise, add the current chunk to the list of chunks and start a new chunk with the current sentence
chunks.append(current_chunk.strip()) # Strip to remove leading spaces
current_chunk = sentence
# After the loop, if there is any leftover text in the current chunk, add it to the list of chunks
if current_chunk:
chunks.append(current_chunk.strip())
# Handle overlap if it's specified (overlap > 0)
if overlap > 0:
overlapping_chunks = []
for i in range(len(chunks)):
if i > 0:
# Calculate the start index for overlap from the previous chunk
start_overlap = max(0, len(chunks[i-1]) - overlap)
# Combine the overlapping portion of the previous chunk with the current chunk
chunk_with_overlap = chunks[i-1][start_overlap:] + " " + chunks[i]
# Append the combined chunk, making sure it's not longer than chunk_size
overlapping_chunks.append(chunk_with_overlap[:chunk_size])
else:
# For the first chunk, there's no previous chunk to overlap with
overlapping_chunks.append(chunks[i][:chunk_size])
return overlapping_chunks # Return the list of chunks with overlap
# If overlap is 0, return the non-overlapping chunks
return chunks
chunks = nltk_based_splitter(text=all_text,
chunk_size=2048,
overlap=0)
数据集生成器
在本节中,我们定义了两个函数:
prompt 函数为 Google Gemini 创建一个提示,要求根据提供的文本块生成一对问答。
import google.generativeai as genai
import pandas as pd
# Replace with your valid Google API key
GOOGLE_API_KEY = "xxxxxxxxxxxx"
# Prompt generator with an explicit request for structured output
def prompt(text_chunk):
return f"""
Based on the following text, generate one Question and its corresponding Answer.
Please format the output as follows:
Question: [Your question]
Answer: [Your answer]
Text: {text_chunk}
"""
# Function to interact with Google's Gemini and return a QA pair
def generate_with_gemini(text_chunk:str, temperature:float, model_name:str):
genai.configure(api_key=GOOGLE_API_KEY)
generation_config = {"temperature": temperature}
# Initialize the generative model
gen_model = genai.GenerativeModel(model_name, generation_config=generation_config)
# Generate response based on the prompt
response = gen_model.generate_content(prompt(text_chunk))
# Extract question and answer from response using keyword
try:
question, answer = response.text.split("Answer:", 1)
question = question.replace("Question:", "").strip()
answer = answer.strip()
except ValueError:
question, answer = "N/A", "N/A" # Handle unexpected format in response
return question, answer
generate_with_gemini 函数与 Gemini 模型交互,并使用创建的提示生成 QA 对。
运行问答生成
我们使用 process_text_chunks 函数,利用双子座模型为每个文本块生成 QA 对。
def process_text_chunks(text_chunks:list, temperature:int, model_name=str):
"""
Processes a list of text chunks to generate questions and answers using a specified model.
Parameters:
- text_chunks: A list of text chunks to process.
- temperature: The sampling temperature to control randomness in the generated outputs.
- model_name: The name of the model to use for generating questions and answers.
Returns:
- A Pandas DataFrame containing the text chunks, questions, and answers.
"""
results = []
# Iterate through each text chunk
for chunk in text_chunks:
question, answer = generate_with_gemini(chunk, temperature, model_name)
results.append({"Text Chunk": chunk, "Question": question, "Answer": answer})
# Convert results into a Pandas DataFrame
df = pd.DataFrame(results)
return df
# Process the text chunks and get the DataFrame
df_results = process_text_chunks(text_chunks=chunks,
temperature=0.7,
model_name="gemini-1.5-flash")
df_results.to_csv("generated_qa_pairs.csv", index=False)
然后将这些结果存储在 Pandas DataFrame 中。
加载数据集
接下来,我们将 CSV 文件中生成的 QA 对加载到 HuggingFace 数据集中。我们要确保数据格式正确,以便进行微调。
from datasets import load_dataset
# Load the CSV file into a Hugging Face Dataset
dataset = load_dataset('csv', data_files='generated_qa_pairs.csv')
def process_example(example, idx):
return {
"id": idx, # Add unique ID based on the index
"anchor": example["Question"],
"positive": example["Answer"]
}
dataset = dataset.map(process_example,
with_indices=True ,
remove_columns=["Text Chunk", "Question", "Answer"])
加载模型
我们从 HuggingFace 加载 BAAI/bge-base-en-v1.5 模型,确保选择适当的执行设备(CPU 或 GPU)。
import torch
from sentence_transformers import SentenceTransformer
from sentence_transformers.evaluation import (
InformationRetrievalEvaluator,
SequentialEvaluator,
)
from sentence_transformers.util import cos_sim
from datasets import load_dataset, concatenate_datasets
from sentence_transformers.losses import MatryoshkaLoss, MultipleNegativesRankingLoss
model_id = "BAAI/bge-base-en-v1.5"
# Load a model
model = SentenceTransformer(
model_id, device="cuda" if torch.cuda.is_available() else "cpu"
)
定义损失函数
在这里,我们要配置 Matryoshka 损失函数,指定用于截断嵌入的维度。
# Important: large to small
matryoshka_dimensions = [768, 512, 256, 128, 64]
inner_train_loss = MultipleNegativesRankingLoss(model)
train_loss = MatryoshkaLoss(
model, inner_train_loss, matryoshka_dims=matryoshka_dimensions
)
内部损失函数 MultipleNegativesRankingLoss 可以帮助模型生成适合检索任务的嵌入。
定义训练参数
我们使用 SentenceTransformerTrainingArguments 来定义训练参数。这包括输出目录、历时次数、批量大小、学习率和评估策略。
from sentence_transformers import SentenceTransformerTrainingArguments
from sentence_transformers.training_args import BatchSamplers
# define training arguments
args = SentenceTransformerTrainingArguments(
output_dir="bge-finetuned", # output directory and hugging face model ID
num_train_epochs=1, # number of epochs
per_device_train_batch_size=4, # train batch size
gradient_accumulation_steps=16, # for a global batch size of 512
per_device_eval_batch_size=16, # evaluation batch size
warmup_ratio=0.1, # warmup ratio
learning_rate=2e-5, # learning rate, 2e-5 is a good value
lr_scheduler_type="cosine", # use constant learning rate scheduler
optim="adamw_torch_fused", # use fused adamw optimizer
tf32=True, # use tf32 precision
bf16=True, # use bf16 precision
batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch
eval_strategy="epoch", # evaluate after each epoch
save_strategy="epoch", # save after each epoch
logging_steps=10, # log every 10 steps
save_total_limit=3, # save only the last 3 models
load_best_model_at_end=True, # load the best model when training ends
metric_for_best_model="eval_dim_128_cosine_ndcg@10", # Optimizing for the best ndcg@10 score for the 128 dimension
)
注意:如果你使用的是 Tesla T4,并在训练过程中遇到错误,请尝试注释掉 tf32=True 和 bf16=True 行,禁用 TF32 和 BF16 精度。
创建评估器
我们创建了一个评估器来测量模型在训练过程中的性能。评估器使用 InformationRetrievalEvaluator 对 Matryoshka loss 中的每个维度评估模型的检索性能。
corpus = dict(dict(
zip(dataset['train']['id'],
dataset['train']['positive'])
) # Our corpus (cid => document)
queries = dict(
zip(dataset['train']['id'],
dataset['train']['anchor'])
) # Our queries (qid => question)
# Create a mapping of relevant document (1 in our case) for each query
relevant_docs = {} # Query ID to relevant documents (qid => set([relevant_cids])
for q_id in queries:
relevant_docs[q_id] = [q_id]
matryoshka_evaluators = []
# Iterate over the different dimensions
for dim in matryoshka_dimensions:
ir_evaluator = InformationRetrievalEvaluator(
queries=queries,
corpus=corpus,
relevant_docs=relevant_docs,
name=f"dim_{dim}",
truncate_dim=dim, # Truncate the embeddings to a certain dimension
score_functions={"cosine": cos_sim},
)
matryoshka_evaluators.append(ir_evaluator)
# Create a sequential evaluator
evaluator = SequentialEvaluator(matryoshka_evaluators)
微调前评估模型
我们对基本模型进行评估,以便在微调之前获得基准性能。
results = evaluator(model)
for dim in matryoshka_dimensions:
key = f"dim_{dim}_cosine_ndcg@10"
print(f"{key}: {results[key]}")
定义训练器
我们创建一个 SentenceTransformerTrainer 对象,指定模型、训练参数、数据集、损失函数和评估器。
from sentence_transformers import SentenceTransformerTrainer
trainer = SentenceTransformerTrainer(
model=model, # our embedding model
args=args, # training arguments we defined above
train_dataset=dataset.select_columns(
["positive", "anchor"]
),
loss=train_loss, # Matryoshka loss
evaluator=evaluator, # Sequential Evaluator
)
开始微调
trainer.train() 方法启动微调过程,使用提供的数据和损失函数更新模型的权重。
# start training
trainer.train()
# save the best model
trainer.save_model()
训练完成后,我们会将表现最好的模型保存到指定的输出目录中。
微调后的评估
最后,我们将加载微调后的模型,并使用相同的评估器对其进行评估,以衡量微调后性能的提升。
from sentence_transformers import SentenceTransformer
fine_tuned_model = SentenceTransformer(
args.output_dir, device="cuda" if torch.cuda.is_available() else "cpu"
)
# Evaluate the model
results = evaluator(fine_tuned_model)
# Print the main score
for dim in matryoshka_dimensions:
key = f"dim_{dim}_cosine_ndcg@10"
print(f"{key}: {results[key]}")
通过为你的领域微调嵌入模型,你的 nlp 应用程序就能更深入地理解该领域的特定语言和概念,从而显著改善问题解答、文档检索和文本生成等任务。
本文中讨论的技术,如利用 mrl 和使用 bge-base-en 等强大的模型,为构建特定领域的嵌入模型提供了一条实用的途径。虽然我们关注的重点是微调过程,但请记住,数据集的质量同样至关重要,精心策划一个能准确反映领域细微差别的数据集对于实现最佳结果至关重要。
随着NLP领域的不断进步,我们有望看到更强大的嵌入模型和微调策略的出现,通过不断了解情况和调整方法,你可以充分发挥嵌入模型的潜力,构建符合你特定需求的高质量 nlp 应用程序。