使用Python实现高效相机流

2023年08月30日 由 alex 发表 415 0

我们来谈谈如何通过 Python 使用网络摄像头。我有一个简单的任务,需要从摄像头读取帧并对每个帧运行神经网络。对于一个特定的摄像头,我在设置目标fps时遇到了问题,所以深入研究FFmpeg,看看它是否有助于解决问题。


最终,让OpenCV和FFmpeg两者都正常工作了,但我发现了一件非常有趣的事情:在我的主要使用场景中,FFmpeg的性能优于OpenCV。实际上,使用FFmpeg读取帧的速度提高了15倍,整个流水线的速度提高了32%。我简直不敢相信这个结果,多次重新检查了所有内容,但结果始终如一。


注意:当我只是连续读取帧时,性能完全相同,但在读取帧之后运行其他操作(需要时间)时,FFmpeg更快。下面我将详细说明我的意思。


现在,让我们来看一下代码。首先是使用OpenCV读取摄像头帧的类:


class VideoStreamCV:
    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
        self.src = src
        self.fps = fps
        self.resolution = resolution
        self.cap = self._open_camera()
        self.wait_for_cam()
    def _open_camera(self):
        cap = cv2.VideoCapture(self.src)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        cap.set(cv2.CAP_PROP_FOURCC, fourcc)
        cap.set(cv2.CAP_PROP_FPS, self.fps)
        return cap
    def read(self):
        ret, frame = self.cap.read()
        if not ret:
            return None
        return frame
    def release(self):
        self.cap.release()
    def wait_for_cam(self):
        for _ in range(30):
            frame = self.read()
        if frame is not None:
            return True
        return False


我使用wait_for_cam函数,因为摄像头通常需要一段时间来“热身”。同样的预热方式也在FFmpeg类中使用:


class VideoStreamFFmpeg:
    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
        self.src = src
        self.fps = fps
        self.resolution = resolution
        self.pipe = self._open_ffmpeg()
        self.frame_shape = (self.resolution[1], self.resolution[0], 3)
        self.frame_size = np.prod(self.frame_shape)
        self.wait_for_cam()
    def _open_ffmpeg(self):
        os_name = platform.system()
        if os_name == "Darwin":  # macOS
            input_format = "avfoundation"
            video_device = f"{self.src}:none"
        elif os_name == "Linux":
            input_format = "v4l2"
            video_device = f"{self.src}"
        elif os_name == "Windows":
            input_format = "dshow"
            video_device = f"video={self.src}"
        else:
            raise ValueError("Unsupported OS")
        command = [
            'ffmpeg',
            '-f', input_format,
            '-r', str(self.fps),
            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
            '-i', video_device,
            '-vcodec', 'mjpeg',  # Input codec set to mjpeg
            '-an', '-vcodec', 'rawvideo',  # Decode the MJPEG stream to raw video
            '-pix_fmt', 'bgr24',
            '-vsync', '2',
            '-f', 'image2pipe', '-'
        ]
        if os_name == "Linux":
            command.insert(2, "-input_format")
            command.insert(3, "mjpeg")
        return subprocess.Popen(
            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
        )
    def read(self):
        raw_image = self.pipe.stdout.read(self.frame_size)
        if len(raw_image) != self.frame_size:
            return None
        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
        return image
    def release(self):
        self.pipe.terminate()
    def wait_for_cam(self):
        for _ in range(30):
            frame = self.read()
        if frame is not None:
            return True
        return False


为了计时运行函数,我使用了装饰器:


def timeit(func):
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        t1 = time.perf_counter()
        print(f"Main function time: {round(t1-t0, 4)}s")
        return result
    return wrapper


作为一个重型的合成任务,我使用了这个简单的函数(也可以只是time.sleep)。这是非常重要的一部分,因为如果没有任何任务,OpenCV和FFmpeg的读取速度是相同的:


def computation_task():
    for _ in range(5000000):
        9999 * 9999


现在是一个循环函数,在循环中我读取帧,计时,然后运行computation_task:


@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):
    timer = []
    for _ in range(100):
        t0 = time.perf_counter()
        cam.read()
        timer.append(time.perf_counter() - t0)
        if run_task:
            computation_task()
    cam.release()
    return round(np.mean(timer), 4)



最后,main我设置了几个参数,使用OpenCV和FFmpeg初始化了两个视频流,并在不使用computation_task和使用 OpenCV 的情况下运行它们


def main():
    fsp = 30
    resolution = (1920, 1080)
    for run_task in [False, True]:
        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)
        print(f"FFMPEG, task {run_task}:")
        print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
        print(f"CV2, task {run_task}:")
        print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")


下面是我得到的结果:


FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323s
CV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332s
FFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014s
CV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s


因此,在没有合成任务的情况下,我得到了相同的读取时间:0.0323和0.0332。但是有了合成任务后,时间分别为0.0014和0.023,因此FFmpeg显著更快。


下面是一个图表,显示每次迭代所需的时间:读取帧、使用yolov8s模型(在CPU上)处理帧和保存检测到对象的帧:


2-1


下面是一个包含合成测试的完整脚本:


import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as np

class VideoStreamFFmpeg:
    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
        self.src = src
        self.fps = fps
        self.resolution = resolution
        self.pipe = self._open_ffmpeg()
        self.frame_shape = (self.resolution[1], self.resolution[0], 3)
        self.frame_size = np.prod(self.frame_shape)
        self.wait_for_cam()
    def _open_ffmpeg(self):
        os_name = platform.system()
        if os_name == "Darwin":  # macOS
            input_format = "avfoundation"
            video_device = f"{self.src}:none"
        elif os_name == "Linux":
            input_format = "v4l2"
            video_device = f"{self.src}"
        elif os_name == "Windows":
            input_format = "dshow"
            video_device = f"video={self.src}"
        else:
            raise ValueError("Unsupported OS")
        command = [
            'ffmpeg',
            '-f', input_format,
            '-r', str(self.fps),
            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
            '-i', video_device,
            '-vcodec', 'mjpeg',  # Input codec set to mjpeg
            '-an', '-vcodec', 'rawvideo',  # Decode the MJPEG stream to raw video
            '-pix_fmt', 'bgr24',
            '-vsync', '2',
            '-f', 'image2pipe', '-'
        ]
        if os_name == "Linux":
            command.insert(2, "-input_format")
            command.insert(3, "mjpeg")
        return subprocess.Popen(
            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
        )
    def read(self):
        raw_image = self.pipe.stdout.read(self.frame_size)
        if len(raw_image) != self.frame_size:
            return None
        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
        return image
    def release(self):
        self.pipe.terminate()
    def wait_for_cam(self):
        for _ in range(30):
            frame = self.read()
        if frame is not None:
            return True
        return False

class VideoStreamCV:
    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
        self.src = src
        self.fps = fps
        self.resolution = resolution
        self.cap = self._open_camera()
        self.wait_for_cam()
    def _open_camera(self):
        cap = cv2.VideoCapture(self.src)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        cap.set(cv2.CAP_PROP_FOURCC, fourcc)
        cap.set(cv2.CAP_PROP_FPS, self.fps)
        return cap
    def read(self):
        ret, frame = self.cap.read()
        if not ret:
            return None
        return frame
    def release(self):
        self.cap.release()
    def wait_for_cam(self):
        for _ in range(30):
            frame = self.read()
        if frame is not None:
            return True
        return False

def timeit(func):
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        t1 = time.perf_counter()
        print(f"Main function time: {round(t1-t0, 4)}s")
        return result
    return wrapper

def computation_task():
    for _ in range(5000000):
        9999 * 9999

@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):
    timer = []
    for _ in range(100):
        t0 = time.perf_counter()
        cam.read()
        timer.append(time.perf_counter() - t0)
        if run_task:
            computation_task()
    cam.release()
    return round(np.mean(timer), 4)

def main():
    fsp = 30
    resolution = (1920, 1080)
    for run_task in [False, True]:
        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)
        print(f"FFMPEG, task {run_task}:")
        print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
        print(f"CV2, task {run_task}:")
        print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")

if __name__ == "__main__":
    main()


注意:这个脚本是在Apple的M1 Pro芯片上测试的。

文章来源:https://pub.towardsai.net/efficient-camera-stream-with-python-b6adf93fab32
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消