最近,在利马举行的第一届冰球锦标赛(3 对 3)上,为了加入人工智能的元素,我使用 PyTorch、计算机视觉技术和卷积神经网络(CNN)建立了一个模型,用于跟踪球员和球队,并收集一些基本的成绩统计数据。
本文旨在为设计和部署该模型提供一个快速指南。虽然模型还需要一些微调,但我希望它能帮助任何人了解计算机视觉应用于体育的有趣世界。
架构
在开始这个项目之前,我做了一些简单的研究,以便找到一个工作基线,避免 "重复劳动"。我发现,在使用计算机视觉追踪球员方面,有很多关于足球的有趣研究(这并不奇怪,因为足球是世界上最受欢迎的团队运动)。不过,我并没有找到很多冰上曲棍球方面的资源。Roboflow 有一些有趣的预训练模型和数据集,可以用来训练你自己的模型,但使用托管模型会产生一些延迟问题,我将进一步解释。最后,我利用足球材料读取视频帧,并按照本教程中介绍的基本原理和跟踪方法获取单个轨迹 。
有了跟踪 ID,我就建立了自己的路径。在本文中,我们将看到该项目如何从一个简单的物体检测任务发展到一个能够完全检测球员、球队并提供一些基本性能指标的模型。
跟踪机制
跟踪机制是模型的支柱。它确保视频中每个检测到的物体都能被识别并分配一个唯一的标识符,并在每个帧中保持这一标识符。跟踪机制的主要组成部分包括:
为了建立我们的跟踪机制,我们将从最初的两个步骤开始:
在下面的步骤中,我们需要使用这些 Python 包:
pip install ultralytics
pip install supervision
pip install opencv-python
接下来,我们将指定我们的库以及样本视频文件和 pickle 文件的路径(如果存在;如果不存在,代码将创建一个并保存在相同的路径下):
#**********************************LIBRARIES*********************************#
from ultralytics import YOLO
import supervision as sv
import pickle
import os
import cv2
# INPUT-video file
video_path = 'D:/PYTHON/video_input.mp4'
# OUTPUT-Video File
output_video_path = 'D:/PYTHON/output_video.mp4'
# PICKLE FILE (IF AVAILABLE LOADS IT IF NOT, SAVES IT IN THIS PATH)
pickle_path = 'D:/PYTHON/stubs/track_stubs.pkl'
现在,让我们继续定义我们的跟踪机制:
#*********************************TRACKING MECHANISM**************************#
class HockeyAnalyzer:
def __init__(self, model_path):
self.model = YOLO(model_path)
self.tracker = sv.ByteTrack()
def detect_frames(self, frames):
batch_size = 20
detections = []
for i in range(0, len(frames), batch_size):
detections_batch = self.model.predict(frames[i:i+batch_size], conf=0.1)
detections += detections_batch
return detections
#********LOAD TRACKS FROM FILE OR DETECT OBJECTS-SAVES PICKLE FILE************#
def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
if read_from_stub and stub_path is not None and os.path.exists(stub_path):
with open(stub_path, 'rb') as f:
tracks = pickle.load(f)
return tracks
detections = self.detect_frames(frames)
tracks = {"person": []}
for frame_num, detection in enumerate(detections):
cls_names = detection.names
cls_names_inv = {v: k for k, v in cls_names.items()}
# Tracking Mechanism
detection_supervision = sv.Detections.from_ultralytics(detection)
detection_with_tracks = self.tracker.update_with_detections(detection_supervision)
tracks["person"].append({})
for frame_detection in detection_with_tracks:
bbox = frame_detection[0].tolist()
cls_id = frame_detection[3]
track_id = frame_detection[4]
if cls_id == cls_names_inv.get('person', None):
tracks["person"][frame_num][track_id] = {"bbox": bbox}
for frame_detection in detection_supervision:
bbox = frame_detection[0].tolist()
cls_id = frame_detection[3]
if stub_path is not None:
with open(stub_path, 'wb') as f:
pickle.dump(tracks, f)
return tracks
#***********************BOUNDING BOXES AND TRACK-IDs**************************#
def draw_annotations(self, video_frames, tracks):
output_video_frames = []
for frame_num, frame in enumerate(video_frames):
frame = frame.copy()
player_dict = tracks["person"][frame_num]
# Draw Players
for track_id, player in player_dict.items():
color = player.get("team_color", (0, 0, 255))
bbox = player["bbox"]
x1, y1, x2, y2 = map(int, bbox)
# Bounding boxes
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Track_id
cv2.putText(frame, str(track_id), (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
output_video_frames.append(frame)
return output_video_frames
该方法首先初始化 YOLO 模型和 ByteTrack 跟踪器。接下来,每帧以 20 个为一批进行处理,使用 YOLO 模型检测和收集每批中的对象。如果 Pickle 文件在其路径中可用,它就会预先计算该文件中的轨迹。如果 pickle 文件不可用(首次运行代码或删除了之前的 pickle 文件),get_object_tracks 会将每次检测转换为 ByteTrack 所需的格式,用这些检测更新跟踪器,并将跟踪信息存储到指定路径下的新 pickle 文件中。
要执行跟踪器并保存带有边界框和轨迹 ID 的新输出视频,可以使用以下代码:
#*************** EXECUTES TRACKING MECHANISM AND OUTPUT VIDEO****************#
# Read the video frames
video_frames = []
cap = cv2.VideoCapture(video_path)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
video_frames.append(frame)
cap.release()
#********************* EXECUTE TRACKING METHOD WITH YOLO**********************#
tracker = HockeyAnalyzer('D:/PYTHON/yolov8x.pt')
tracks = tracker.get_object_tracks(video_frames, read_from_stub=True, stub_path=pickle_path)
annotated_frames = tracker.draw_annotations(video_frames, tracks)
#*********************** SAVES VIDEO FILE ************************************#
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
height, width, _ = annotated_frames[0].shape
out = cv2.VideoWriter(output_video_path, fourcc, 30, (width, height))
for frame in annotated_frames:
out.write(frame)
out.release()
如果代码工作正常,就会出现类似示例片段 01 中的视频输出。
溜冰场
从第一步就可以看出,我们面临着一些挑战。首先,正如预期的那样,模型会捕捉到所有移动的物体;球员、裁判,甚至溜冰场外的物体。其次,那些红色的边界框会让球员的追踪变得有点不清晰,在展示时也不是很整洁。在本节中,我们将重点缩小检测范围,只检测溜冰场内的物体。此外,我们还将把底部的边界框换成椭圆形,以确保更清晰的可视性。
让我们先从使用方框切换到使用椭圆。为此,我们只需在现有代码中的标签和边界框方法上方添加一个新方法:
#************ Design of Ellipse for tracking players instead of Bounding boxes**************#
def draw_ellipse(self, frame, bbox, color, track_id=None, team=None):
y2 = int(bbox[3])
x_center = (int(bbox[0]) + int(bbox[2])) // 2
width = int(bbox[2]) - int(bbox[0])
color = (255, 0, 0)
text_color = (255, 255, 255)
cv2.ellipse(
frame,
center=(x_center, y2),
axes=(int(width) // 2, int(0.35 * width)),
angle=0.0,
startAngle=-45,
endAngle=235,
color=color,
thickness=2,
lineType=cv2.LINE_4
)
if track_id is not None:
rectangle_width = 40
rectangle_height = 20
x1_rect = x_center - rectangle_width // 2
x2_rect = x_center + rectangle_width // 2
y1_rect = (y2 - rectangle_height // 2) + 15
y2_rect = (y2 + rectangle_height // 2) + 15
cv2.rectangle(frame,
(int(x1_rect), int(y1_rect)),
(int(x2_rect), int(y2_rect)),
color,
cv2.FILLED)
x1_text = x1_rect + 12
if track_id > 99:
x1_text -= 10
font_scale = 0.4
cv2.putText(
frame,
f"{track_id}",
(int(x1_text), int(y1_rect + 15)),
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
text_color,
thickness=2
)
return frame
我们还需要更新注释步骤,通过调用椭圆方法来替换包围盒和 ID:
#***********************BOUNDING BOXES AND TRACK-IDs**************************#
def draw_annotations(self, video_frames, tracks):
output_video_frames = []
for frame_num, frame in enumerate(video_frames):
frame = frame.copy()
player_dict = tracks["person"][frame_num]
# Draw Players
for track_id, player in player_dict.items():
bbox = player["bbox"]
# Draw ellipse and tracking IDs
self.draw_ellipse(frame, bbox, (0, 255, 0), track_id)
x1, y1, x2, y2 = map(int, bbox)
output_video_frames.append(frame)
return output_video_frames
如示例片段 02 所示,做出这些更改后,输出的视频应该会更加整洁。
现在,要处理溜冰场边界,我们需要掌握一些计算机视觉分辨率的基本知识。在我们的使用案例中,我们使用的是 720p(1280x720 像素)格式,这意味着我们处理的每个帧或图像的尺寸都是 1280 像素(宽)x 720 像素(高)。
使用 720p(1280x720 像素)格式意味着什么?这意味着图像由水平方向的 1280 像素和垂直方向的 720 像素组成。这种格式的坐标从图像左上角的(0,0)开始,x 坐标随着向右移动而增加,y 坐标随着向下移动而增加。这些坐标用于标记图像中的特定区域,比如用(x1, y1)表示方框的左上角,用(x2, y2)表示方框的右下角。了解这一点有助于我们测量距离和速度,并决定在视频中重点分析的位置。
因此,我们将使用以下代码开始用绿线标记帧边界:
#********************* Border Definition for Frame***********************
import cv2
video_path = 'D:/PYTHON/video_input.mp4'
cap = cv2.VideoCapture(video_path)
#**************Read, Define and Draw corners of the frame****************
ret, frame = cap.read()
bottom_left = (0, 720)
bottom_right = (1280, 720)
upper_left = (0, 0)
upper_right = (1280, 0)
cv2.line(frame, bottom_left, bottom_right, (0, 255, 0), 2)
cv2.line(frame, bottom_left, upper_left, (0, 255, 0), 2)
cv2.line(frame, bottom_right, upper_right, (0, 255, 0), 2)
cv2.line(frame, upper_left, upper_right, (0, 255, 0), 2)
#*******************Save the frame with marked corners*********************
output_image_path = 'rink_area_marked_VALIDATION.png'
cv2.imwrite(output_image_path, frame)
print("Rink area saved:", output_image_path)
结果应该是一个绿色的矩形,如示例片段 03 中的 (a) 所示。但是,为了只跟踪溜冰场内的移动物体,我们需要与(b)中的划界更为相似的划界。
正确处理(b)项就像一个反复试验的过程,你需要测试不同的坐标,直到找到最适合你的模型的边界。最初,我的目标是与溜冰场边界完全匹配。然而,追踪系统在靠近边缘的地方很吃力。为了提高精确度,我稍微扩大了边界,以确保捕捉到溜冰场内的所有跟踪物体,同时排除溜冰场外的物体。结果如 图 所示,这是我能得到的最好结果(你还可以找到更好的方案),由这些坐标定义:
最后,我们将定义另外两个区域:白队和黄队的进攻区(两队都希望在此得分)。这将使我们能够收集对手区域内每支球队的一些基本位置统计和压力指标。
#**************YELLOW TEAM OFFENSIVE ZONE****************
Bottom Left Corner: (-450, 710)
Bottom Right Corner: (2030, 710)
Upper Left Corner: (200, 150)
Upper Right Corner: (1160, 150)
#**************WHITE TEAM OFFENSIVE ZONE****************
Bottom Left Corner: (180, 150)
Bottom Right Corner: (1100, 150)
Upper Left Corner: (352, 61)
Upper Right Corner: (900, 61)
我们先将这些坐标放在一边,下一步再解释如何对每个团队进行分类。然后,我们将把这一切整合到我们最初的追踪方法中。
使用深度学习进行团队预测
沃伦-麦库洛克(Warren McCulloch)和沃尔特-皮茨(Walter Pitts)于 1943 年撰写的论文《神经活动中蕴含的思想的逻辑计算》(A Logical Calculus of the Ideas Immanent in Nervous Activity)为早期的神经网络研究奠定了坚实的基础。后来,在 1957 年,简化神经元的数学模型(接收输入、对这些输入施加权重、求和并输出二进制结果)启发弗兰克-罗森布拉特制造了 Mark I。从那时起,让计算机像我们一样思考的探索就没有停止过。
在深入了解代码之前,我们先来了解一下其架构的一些基本概念:
也就是说,为了运行我们的 CNN 模型,我们需要以下 Python 软件包:
pip install torch torchvision
pip install matplotlib
pip install scikit-learn
接下来,我们将指定我们的库和样本数据集的路径:
# ************CONVOLUTIONAL NEURAL NETWORK-THREE CLASSES DETECTION**************************
# REFEREE
# WHITE TEAM (Team_away)
# YELLOW TEAM (Team_home)
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt
#Training and Validation Datasets
#Download the teams_sample_dataset file from the project's GitHub repository
data_dir = 'D:/PYTHON/teams_sample_dataset'
首先,我们将确保每张图片的大小相同(调整为 150x150 像素),然后将其转换为代码可以理解的格式(在 PyTorch 中,输入数据通常表示为张量对象)。最后,我们将调整颜色,使模型更容易处理(归一化),并设置加载图片的程序。这些步骤有助于准备和组织图片,使模型可以有效地开始学习,避免数据格式造成的偏差。
#******************************Data transformation***********************************
transform = transforms.Compose([
transforms.Resize((150, 150)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
# Load dataset
train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=transform)
val_dataset = datasets.ImageFolder(os.path.join(data_dir, 'val'), transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
接下来,我们将定义 CNN 的架构:
#********************************CNN Model Architecture**************************************
class CNNModel(nn.Module):
def __init__(self):
super(CNNModel, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.fc1 = nn.Linear(128 * 18 * 18, 512)
self.dropout = nn.Dropout(0.5)
self.fc2 = nn.Linear(512, 3) #Three Classes (Referee, Team_away,Team_home)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 18 * 18)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
下一步,我们初始化模型,将分类交叉熵配置为损失函数(通常用于分类任务),并指定 Adam 为优化器。如前所述,我们将在 10 个历元的完整周期内执行模型。
#********************************CNN TRAINING**********************************************
# Model-loss function-optimizer
model = CNNModel()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
#*********************************Training*************************************************
num_epochs = 10
train_losses, val_losses = [], []
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
labels = labels.type(torch.LongTensor)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
train_losses.append(running_loss / len(train_loader))
model.eval()
val_loss = 0.0
all_labels = []
all_preds = []
with torch.no_grad():
for inputs, labels in val_loader:
outputs = model(inputs)
labels = labels.type(torch.LongTensor)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, preds = torch.max(outputs, 1)
all_labels.extend(labels.tolist())
all_preds.extend(preds.tolist())
为了跟踪性能,我们将添加一些代码来跟踪训练进度、打印验证指标并绘制它们。最后,我们会将模型保存为 hockey_team_classifier.pth,并将其保存到你选择的指定路径中。
#********************************METRICS & PERFORMANCE************************************
val_losses.append(val_loss / len(val_loader))
val_accuracy = accuracy_score(all_labels, all_preds)
val_precision = precision_score(all_labels, all_preds, average='macro', zero_division=1)
val_recall = recall_score(all_labels, all_preds, average='macro', zero_division=1)
val_f1 = f1_score(all_labels, all_preds, average='macro', zero_division=1)
print(f"Epoch [{epoch + 1}/{num_epochs}], "
f"Loss: {train_losses[-1]:.4f}, "
f"Val Loss: {val_losses[-1]:.4f}, "
f"Val Acc: {val_accuracy:.2%}, "
f"Val Precision: {val_precision:.4f}, "
f"Val Recall: {val_recall:.4f}, "
f"Val F1 Score: {val_f1:.4f}")
#*******************************SHOW METRICS & PERFORMANCE**********************************
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.show()
# SAVE THE MODEL FOR THE GH_CV_track_teams CODE
torch.save(model.state_dict(), 'D:/PYTHON/hockey_team_classifier.pth')
此外,在运行完上述所有步骤后,你应该会看到类似下面的输出(指标可能略有不同):
#**************CNN PERFORMANCE ACROSS TRAINING EPOCHS************************
Epoch [1/10], Loss: 1.5346, Val Loss: 1.2339, Val Acc: 47.37%, Val Precision: 0.7172, Val Recall: 0.5641, Val F1 Score: 0.4167
Epoch [2/10], Loss: 1.1473, Val Loss: 1.1664, Val Acc: 55.26%, Val Precision: 0.6965, Val Recall: 0.6296, Val F1 Score: 0.4600
Epoch [3/10], Loss: 1.0139, Val Loss: 0.9512, Val Acc: 57.89%, Val Precision: 0.6054, Val Recall: 0.6054, Val F1 Score: 0.5909
Epoch [4/10], Loss: 0.8937, Val Loss: 0.8242, Val Acc: 60.53%, Val Precision: 0.7222, Val Recall: 0.5645, Val F1 Score: 0.5538
Epoch [5/10], Loss: 0.7936, Val Loss: 0.7177, Val Acc: 63.16%, Val Precision: 0.6667, Val Recall: 0.6309, Val F1 Score: 0.6419
Epoch [6/10], Loss: 0.6871, Val Loss: 0.7782, Val Acc: 68.42%, Val Precision: 0.6936, Val Recall: 0.7128, Val F1 Score: 0.6781
Epoch [7/10], Loss: 0.6276, Val Loss: 0.5684, Val Acc: 78.95%, Val Precision: 0.8449, Val Recall: 0.7523, Val F1 Score: 0.7589
Epoch [8/10], Loss: 0.4198, Val Loss: 0.5613, Val Acc: 86.84%, Val Precision: 0.8736, Val Recall: 0.8958, Val F1 Score: 0.8653
Epoch [9/10], Loss: 0.3959, Val Loss: 0.3824, Val Acc: 92.11%, Val Precision: 0.9333, Val Recall: 0.9213, Val F1 Score: 0.9243
Epoch [10/10], Loss: 0.2509, Val Loss: 0.2651, Val Acc: 97.37%, Val Precision: 0.9762, Val Recall: 0.9792, Val F1 Score: 0.9769
完成 10 个历元后,CNN 模型的性能指标有所改善。最初,在第 1 个周期,模型的训练损失为 1.5346,验证准确率为 47.37%。我们应该如何理解这个初始点呢?
准确率是评估分类性能最常用的指标之一。在我们的案例中,它代表的是正确预测的类别占总数的比例。然而,仅有高准确率并不能保证模型的整体性能;对特定类别的预测仍然可能不佳(正如我在早期试验中遇到的情况)。关于训练损失,它衡量的是模型学习将输入数据映射到正确标签的效率。由于我们使用的是分类函数,交叉熵损失量化了预测的类别概率与实际标签之间的差异。如果起始值为 1.5346,则表明预测类别与实际类别之间存在显著差异;理想情况下,随着训练的进行,该值应趋近于 0。随着时间的推移,我们观察到训练损失显著下降,验证准确率上升。在最后一个历元,训练和验证损失分别达到 0.2509 和 0.2651 的低值。
为了测试我们的 CNN 模型,我们可以选择一个球员图像样本来评估其预测能力。测试时,你可以运行以下代码。
# *************TEST CNN MODEL WITH SAMPLE DATASET***************************
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
# SAMPLE DATASET FOR VALIDATION
test_dir = 'D:/PYTHON/validation_dataset'
# CNN MODEL FOR TEAM PREDICTIONS
class CNNModel(nn.Module):
def __init__(self):
super(CNNModel, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.fc1 = nn.Linear(128 * 18 * 18, 512)
self.dropout = nn.Dropout(0.5)
self.fc2 = nn.Linear(512, 3)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 18 * 18)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# CNN MODEL PREVIOUSLY SAVED
model = CNNModel()
model.load_state_dict(torch.load('D:/PYTHON/hockey_team_classifier.pth'))
model.eval()
transform = transforms.Compose([
transforms.Resize((150, 150)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
#******************ITERATION ON SAMPLE IMAGES-ACCURACY TEST*****************************
class_names = ['team_referee', 'team_away', 'team_home']
def predict_image(image_path, model, transform):
# LOADS DATASET
image = Image.open(image_path)
image = transform(image).unsqueeze(0)
# MAKES PREDICTIONS
with torch.no_grad():
output = model(image)
_, predicted = torch.max(output, 1)
team = class_names[predicted.item()]
return team
for image_name in os.listdir(test_dir):
image_path = os.path.join(test_dir, image_name)
if os.path.isfile(image_path):
predicted_team = predict_image(image_path, model, transform)
print(f'Image {image_name}: The player belongs to {predicted_team}')
输出结果应该是这样的
# *************CNN MODEL TEST - OUTPUT ***********************************#
Image Away_image04.jpg: The player belongs to team_away
Image Away_image12.jpg: The player belongs to team_away
Image Away_image14.jpg: The player belongs to team_away
Image Home_image07.jpg: The player belongs to team_home
Image Home_image13.jpg: The player belongs to team_home
Image Home_image16.jpg: The player belongs to team_home
Image Referee_image04.jpg: The player belongs to team_referee
Image Referee_image09.jpg: The player belongs to team_referee
Image Referee_image10.jpg: The player belongs to team_referee
Image Referee_image11.jpg: The player belongs to team_referee
正如你所看到的,该模型在识别团队和排除作为团队成员的裁判方面表现出相当出色的能力。
将所有数据整合在一起
要将这一切整合在一起,需要对我们前面描述的跟踪机制进行一些调整。下面是更新代码的详细步骤。
首先,我们将设置所需的库和路径。请注意,我们现在指定了 pickle 文件和 CNN 模型的路径。这次,如果在路径中找不到 pickle 文件,代码就会出错。如果需要,使用之前的代码生成 pickle 文件,然后使用更新后的版本执行视频分析:
import cv2
import numpy as np
from ultralytics import YOLO
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
# MODEL INPUTS
model_path = 'D:/PYTHON/yolov8x.pt'
video_path = 'D:/PYTHON/video_input.mp4'
output_path = 'D:/PYTHON/output_video.mp4'
tracks_path = 'D:/PYTHON/stubs/track_stubs.pkl'
classifier_path = 'D:/PYTHON/hockey_team_classifier.pth'
接下来,我们将加载模型,指定溜冰场坐标,然后像以前一样,在每帧中以 20 个为一批,开始检测物体。请注意,目前我们只使用冰场边界来集中分析冰场。在文章的最后步骤中,当我们加入性能统计时,我们将使用进攻区域坐标。
#*************************** Loads models and rink coordinates********************#
class_names = ['Referee', 'Tm_white', 'Tm_yellow']
class HockeyAnalyzer:
def __init__(self, model_path, classifier_path):
self.model = YOLO(model_path)
self.classifier = self.load_classifier(classifier_path)
self.transform = transforms.Compose([
transforms.Resize((150, 150)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
self.rink_coordinates = np.array([[-450, 710], [2030, 710], [948, 61], [352, 61]])
self.zone_white = [(180, 150), (1100, 150), (900, 61), (352, 61)]
self.zone_yellow = [(-450, 710), (2030, 710), (1160, 150), (200, 150)]
#******************** Detect objects in each frame **********************************#
def detect_frames(self, frames):
batch_size = 20
detections = []
for i in range(0, len(frames), batch_size):
detections_batch = self.model.predict(frames[i:i+batch_size], conf=0.1)
detections += detections_batch
return detections
接下来,我们将添加预测每位球员所在球队的过程:
#*********************** Loads CNN Model**********************************************#
def load_classifier(self, classifier_path):
model = CNNModel()
model.load_state_dict(torch.load(classifier_path, map_location=torch.device('cpu')))
model.eval()
return model
def predict_team(self, image):
with torch.no_grad():
output = self.classifier(image)
_, predicted = torch.max(output, 1)
predicted_index = predicted.item()
team = class_names[predicted_index]
return team
下一步,我们将添加前面描述的方法,从包围盒切换到椭圆:
#************ Ellipse for tracking players instead of Bounding boxes*******************#
def draw_ellipse(self, frame, bbox, color, track_id=None, team=None):
y2 = int(bbox[3])
x_center = (int(bbox[0]) + int(bbox[2])) // 2
width = int(bbox[2]) - int(bbox[0])
if team == 'Referee':
color = (0, 255, 255)
text_color = (0, 0, 0)
else:
color = (255, 0, 0)
text_color = (255, 255, 255)
cv2.ellipse(
frame,
center=(x_center, y2),
axes=(int(width) // 2, int(0.35 * width)),
angle=0.0,
startAngle=-45,
endAngle=235,
color=color,
thickness=2,
lineType=cv2.LINE_4
)
if track_id is not None:
rectangle_width = 40
rectangle_height = 20
x1_rect = x_center - rectangle_width // 2
x2_rect = x_center + rectangle_width // 2
y1_rect = (y2 - rectangle_height // 2) + 15
y2_rect = (y2 + rectangle_height // 2) + 15
cv2.rectangle(frame,
(int(x1_rect), int(y1_rect)),
(int(x2_rect), int(y2_rect)),
color,
cv2.FILLED)
x1_text = x1_rect + 12
if track_id > 99:
x1_text -= 10
font_scale = 0.4
cv2.putText(
frame,
f"{track_id}",
(int(x1_text), int(y1_rect + 15)),
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
text_color,
thickness=2
)
return frame
现在,是时候添加分析器了,其中包括读取 pickle 文件,在我们之前定义的溜冰场边界内缩小分析范围,以及调用 CNN 模型来识别每个球员所在的球队并添加标签。请注意,我们还加入了一项功能,可以用不同颜色标记裁判,并改变其椭圆的颜色。代码最后将处理过的帧写入输出视频。
#******************* Loads Tracked Data (pickle file )**********************************#
def analyze_video(self, video_path, output_path, tracks_path):
with open(tracks_path, 'rb') as f:
tracks = pickle.load(f)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error: Could not open video.")
return
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
frame_num = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
#***********Checks if the player falls within the rink area**********************************#
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
cv2.fillConvexPoly(mask, self.rink_coordinates, 1)
mask = mask.astype(bool)
# Draw rink area
#cv2.polylines(frame, [self.rink_coordinates], isClosed=True, color=(0, 255, 0), thickness=2)
# Get tracks from frame
player_dict = tracks["person"][frame_num]
for track_id, player in player_dict.items():
bbox = player["bbox"]
# Check if the player is within the Rink Area
x_center = int((bbox[0] + bbox[2]) / 2)
y_center = int((bbox[1] + bbox[3]) / 2)
if not mask[y_center, x_center]:
continue
#**********************************Team Prediction********************************************#
x1, y1, x2, y2 = map(int, bbox)
cropped_image = frame[y1:y2, x1:x2]
cropped_pil_image = Image.fromarray(cv2.cvtColor(cropped_image, cv2.COLOR_BGR2RGB))
transformed_image = self.transform(cropped_pil_image).unsqueeze(0)
team = self.predict_team(transformed_image)
#************ Ellipse for tracked players and labels******************************************#
self.draw_ellipse(frame, bbox, (0, 255, 0), track_id, team)
font_scale = 1
text_offset = -20
if team == 'Referee':
rectangle_width = 60
rectangle_height = 25
x1_rect = x1
x2_rect = x1 + rectangle_width
y1_rect = y1 - 30
y2_rect = y1 - 5
# Different setup for Referee
cv2.rectangle(frame,
(int(x1_rect), int(y1_rect)),
(int(x2_rect), int(y2_rect)),
(0, 0, 0),
cv2.FILLED)
text_color = (255, 255, 255)
else:
if team == 'Tm_white':
text_color = (255, 215, 0) # White Team: Blue labels
else:
text_color = (0, 255, 255) # Yellow Team: Yellow labels
# Draw Team labels
cv2.putText(
frame,
team,
(int(x1), int(y1) + text_offset),
cv2.FONT_HERSHEY_PLAIN,
font_scale,
text_color,
thickness=2
)
# Write output video
out.write(frame)
frame_num += 1
cap.release()
out.release()
最后,我们添加 CNN 的架构(在 CNN 设计过程中定义)并执行 Hockey 分析器:
#**********************CNN Model Architecture ******************************#
class CNNModel(nn.Module):
def __init__(self):
super(CNNModel, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.fc1 = nn.Linear(128 * 18 * 18, 512)
self.dropout = nn.Dropout(0.5)
self.fc2 = nn.Linear(512, len(class_names))
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 18 * 18)
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
#*********Execute HockeyAnalyzer/classifier and Save Output************#
analyzer = HockeyAnalyzer(model_path, classifier_path)
analyzer.analyze_video(video_path, output_path, tracks_path)
完成所有步骤后,视频输出应该是这样的:
需要注意的是,在最后一次更新中,物体检测仅在溜冰场内进行,并且区分了球队和裁判。虽然 CNN 模型仍需微调,而且偶尔会对某些球员失去稳定性,但在整个视频中,它仍基本保持了可靠性和准确性。
速度、距离和进攻压力
跟踪球队和球员的能力为衡量表现提供了令人兴奋的可能性,例如生成热图、分析速度和覆盖距离、跟踪区域进入或退出等动作,以及深入研究球员的详细指标。为了让我们能体验一下,我们将添加三个性能指标:每名球员的平均速度、每支球队的滑行距离和进攻压力(以每支球队在对手区域内的滑行距离百分比来衡量)。
我们开始将溜冰场的坐标从基于像素的测量值调整为近似米数。这一调整使我们能够以米而不是像素为单位读取数据。视频中溜冰场的实际尺寸约为 15 米 x 30 米(宽 15 米,高 30 米)。为了方便转换,我们引入了一种将像素坐标转换为米的方法。通过定义溜冰场的实际尺寸并使用其四角(从左到右,从上到下)的像素坐标,我们可以获得转换系数。这些系数将帮助我们估算出以米为单位的距离和以米/秒为单位的速度。(你可以探索和应用的另一项有趣技术是透视变换)。
#*********************Loads models and rink coordinates*****************#
class_names = ['Referee', 'Tm_white', 'Tm_yellow']
class HockeyAnalyzer:
def __init__(self, model_path, classifier_path):
*
*
*
*
*
*
self.pixel_to_meter_conversion() #<------ Add this utility method
#***********Pixel-based measurements to meters***************************#
def pixel_to_meter_conversion(self):
#Rink real dimensions in meters
rink_width_m = 15
rink_height_m = 30
#Pixel coordinates for rink dimensions
left_pixel, right_pixel = self.rink_coordinates[0][0], self.rink_coordinates[1][0]
top_pixel, bottom_pixel = self.rink_coordinates[2][1], self.rink_coordinates[0][1]
#Conversion factors
self.pixels_per_meter_x = (right_pixel - left_pixel) / rink_width_m
self.pixels_per_meter_y = (bottom_pixel - top_pixel) / rink_height_m
def convert_pixels_to_meters(self, distance_pixels):
#Convert pixels to meters
return distance_pixels / self.pixels_per_meter_x, distance_pixels / self.pixels_per_meter_y
现在我们准备为每个球员添加速度,单位为米/秒。为此,我们需要做三处修改。首先,在 HockeyAnalyzer 类中创建一个名为 previous_positions 的空字典,以帮助我们比较球员当前和之前的位置。同样,我们将创建一个 team_stats 结构,用于存储每支球队的统计数据,以便进一步可视化。
接下来,我们将添加一个速度方法,以像素/秒为单位估算球员的速度,然后使用转换系数(前面已解释过)将其转换为米/秒。最后,我们将在 analyze_video 方法中调用新的速度方法,并将速度添加到每个跟踪对象(球员和裁判)中。这就是变化的样子:
#*********************Loads models and rink coordinates*****************#
class_names = ['Referee', 'Tm_white', 'Tm_yellow']
class HockeyAnalyzer:
def __init__(self, model_path, classifier_path):
*
*
*
*
*
*
*
self.pixel_to_meter_conversion()
self.previous_positions = {} #<------ Add this.Initializes empty dictionary
self.team_stats = {
'Tm_white': {'distance': 0, 'speed': [], 'count': 0, 'offensive_pressure': 0},
'Tm_yellow': {'distance': 0, 'speed': [], 'count': 0, 'offensive_pressure': 0}
} #<------ Add this.Initializes empty dictionary
#**************** Speed: meters per second********************************#
def calculate_speed(self, track_id, x_center, y_center, fps):
current_position = (x_center, y_center)
if track_id in self.previous_positions:
prev_position = self.previous_positions[track_id]
distance_pixels = np.linalg.norm(np.array(current_position) - np.array(prev_position))
distance_meters_x, distance_meters_y = self.convert_pixels_to_meters(distance_pixels)
speed_meters_per_second = (distance_meters_x**2 + distance_meters_y**2)**0.5 * fps
else:
speed_meters_per_second = 0
self.previous_positions[track_id] = current_position
return speed_meters_per_second
#******************* Loads Tracked Data (pickle file )**********************************#
def analyze_video(self, video_path, output_path, tracks_path):
with open(tracks_path, 'rb') as f:
tracks = pickle.load(f)
*
*
*
*
*
*
*
*
# Draw Team label
cv2.putText(
frame,
team,
(int(x1), int(y1) + text_offset),
cv2.FONT_HERSHEY_PLAIN,
font_scale,
text_color,
thickness=2
)
#**************Add these lines of code --->:
speed = self.calculate_speed(track_id, x_center, y_center, fps)
# Speed label
speed_font_scale = 0.8
speed_y_position = int(y1) + 20
if speed_y_position > int(y1) - 5:
speed_y_position = int(y1) - 5
cv2.putText(
frame,
f"Speed: {speed:.2f} m/s",
(int(x1), speed_y_position),
cv2.FONT_HERSHEY_PLAIN,
speed_font_scale,
text_color,
thickness=2
)
# Write output video
out.write(frame)
frame_num += 1
cap.release()
out.release()
此时的视频输出应该是这样的(注意速度已添加到每个播放器的标签中):
最后,让我们添加一个统计板,在这里我们可以追踪每支球队每名球员的平均速度,以及其他指标,如覆盖距离和对手区域内的进攻压力。
我们已经定义了进攻区域,并将它们集成到了代码中。现在,我们需要跟踪每位球员进入对手区域的频率。为此,我们将使用光线投射算法实现一种方法。这种算法会检查球员的位置是否在白队或黄队的进攻区域内。它的工作原理是从球员到目标区域之间画一条虚线。如果假想线穿过一条边界,则表示球员在进攻区域内;如果假想线穿过更多边界(在我们的例子中,四条边界中的两条),则表示球员在进攻区域外。然后,代码会扫描整个视频,以确定每个跟踪对象的区域状态。
#************ Locate player's position in Target Zone***********************#
def is_inside_zone(self, position, zone):
x, y = position
n = len(zone)
inside = False
p1x, p1y = zone[0]
for i in range(n + 1):
p2x, p2y = zone[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
现在,我们将通过添加一种方法来处理性能指标,该方法能以表格形式显示每支球队的平均球员速度、总覆盖距离和进攻压力(在对方禁区内停留的时间百分比)。我们将使用 OpenCV 将这些指标格式化为表格并叠加在视频上,我们还将加入动态更新机制,以便在比赛过程中保持实时统计数据。
#*******************************Performance metrics*********************************************#
def draw_stats(self, frame):
avg_speed_white = np.mean(self.team_stats['Tm_white']['speed']) if self.team_stats['Tm_white']['count'] > 0 else 0
avg_speed_yellow = np.mean(self.team_stats['Tm_yellow']['speed']) if self.team_stats['Tm_yellow']['count'] > 0 else 0
distance_white = self.team_stats['Tm_white']['distance']
distance_yellow = self.team_stats['Tm_yellow']['distance']
offensive_pressure_white = self.team_stats['Tm_white'].get('offensive_pressure', 0)
offensive_pressure_yellow = self.team_stats['Tm_yellow'].get('offensive_pressure', 0)
Pressure_ratio_W = offensive_pressure_white/distance_white *100 if self.team_stats['Tm_white']['distance'] > 0 else 0
Pressure_ratio_Y = offensive_pressure_yellow/distance_yellow *100 if self.team_stats['Tm_yellow']['distance'] > 0 else 0
table = [
["", "Away_White", "Home_Yellow"],
["Average Speed\nPlayer", f"{avg_speed_white:.2f} m/s", f"{avg_speed_yellow:.2f} m/s"],
["Distance\nCovered", f"{distance_white:.2f} m", f"{distance_yellow:.2f} m"],
["Offensive\nPressure %", f"{Pressure_ratio_W:.2f} %", f"{Pressure_ratio_Y:.2f} %"],
]
text_color = (0, 0, 0)
start_x, start_y = 10, 590
row_height = 30 # Manage Height between rows
column_width = 150 # Manage Width between rows
font_scale = 1
def put_multiline_text(frame, text, position, font, font_scale, color, thickness, line_type, line_spacing=1.0):
y0, dy = position[1], int(font_scale * 20 * line_spacing) # Adjust line spacing here
for i, line in enumerate(text.split('\n')):
y = y0 + i * dy
cv2.putText(frame, line, (position[0], y), font, font_scale, color, thickness, line_type)
for i, row in enumerate(table):
for j, text in enumerate(row):
if i in [1,2, 3]:
put_multiline_text(
frame,
text,
(start_x + j * column_width, start_y + i * row_height),
cv2.FONT_HERSHEY_PLAIN,
font_scale,
text_color,
1,
cv2.LINE_AA,
line_spacing= 0.8
)
else:
cv2.putText(
frame,
text,
(start_x + j * column_width, start_y + i * row_height),
cv2.FONT_HERSHEY_PLAIN,
font_scale,
text_color,
1,
cv2.LINE_AA,
)
#****************** Track and update game stats****************************************#
def update_team_stats(self, team, speed, distance, position):
if team in self.team_stats:
self.team_stats[team]['speed'].append(speed)
self.team_stats[team]['distance'] += distance
self.team_stats[team]['count'] += 1
if team == 'Tm_white':
if self.is_inside_zone(position, self.zone_white):
self.team_stats[team]['offensive_pressure'] += distance
elif team == 'Tm_yellow':
if self.is_inside_zone(position, self.zone_yellow):
self.team_stats[team]['offensive_pressure'] += distance
为了在视频中显示统计信息,我们必须在 analyze_video 方法中调用该方法,因此请务必在定义速度标签后和处理输出视频前添加这几行额外的代码:
*
*
*
*
*
*
*
#Speed label
speed_font_scale = 0.8
speed_y_position = int(y1) + 20
if speed_y_position > int(y1) - 5:
speed_y_position = int(y1) - 5
cv2.putText(
frame,
f"Speed: {speed:.2f} m/s",
(int(x1), speed_y_position),
cv2.FONT_HERSHEY_PLAIN,
speed_font_scale,
text_color,
thickness=2
)
#**************Add these lines of code--->:
distance = speed / fps
position = (x_center, y_center)
self.update_team_stats(team, speed, distance, position)
# Write output video
out.write(frame)
frame_num += 1
将每个播放器的速度(以米/秒为单位)除以帧频(以帧/秒为单位),就能计算出每个播放器移动的距离(以米为单位)。通过这种计算方法,我们可以估算出每个播放器在视频中每帧变化之间的移动距离。如果一切顺利,最终的视频输出应该是这样的: