YOLOv11 视频处理脚本:编写一个 Python 脚本,读取一个视频文件,逐
编程语言 · 技术分享
🎪 摸鱼匠:个人主页 🎒 个人专栏:《YOLOv11实战专栏》 🥇 没有好的理念,只有脚踏实地! 文章目录 一、YOLOv11视频处理基础概述1.1 视频处理与目标检测的结合价值1.2 YOLOv11视频处理的基本流程1.3 视频处理中的关键概念解析 二、环境准备与依赖安装2.1 Python环境配置2.2 必要库的安装与配置2.3 YOLOv11模型的获取与加载 三、视频读取与处理基础3.1 使用OpenCV读取视频文件3.2 逐帧处理视频的基本方法3.3 视频属性获取与调整 四、YOLOv11模型加载与推理4.1 YOLOv11模型架构简介4.2 加载预训练模型与自定义模型4.3 单帧图像推理详解4.4 批量推理与性能优化 五、视频处理脚本实现5.1 基础视频处理脚本框架5.2 视频编码格式详解与选择5.3 帧率调整与时间控制 六、高级视频处理技巧6.1 多线程视频处理优化6.2 目标跟踪与轨迹可视化6.3 实时视频流处理 七、完整项目实战7.1 项目需求分析与设计7.2 项目代码实现7.3 项目使用示例示例1:处理视频文件示例2:处理摄像头输入示例3:处理网络视频流示例4:启用目标跟踪示例5:使用配置文件 八、总结与展望8.1 技术要点回顾8.2 性能优化建议8.3 常见问题与解决方案8.4 未来发展方向 一、YOLOv11视频处理基础概述 1.1 视频处理与目标检测的结合价值 视频目标检测是计算机视觉领域的一项核心技术,它将静态图像目标检测扩展到了时序维度,使我们能够分析视频中连续帧之间的目标运动和行为。
帧率越高,视频越流畅,但文件也越大。
帧跳过:对于实时应用,可以考虑跳过一些帧以保持稳定的帧率。
4.4 批量推理与性能优化 当处理视频时,我们通常需要连续处理多帧图像。
8.3 常见问题与解决方案 在使用YOLOv11进行视频处理时,你可能会遇到一些常见问题。
这个脚本将读取视频文件,逐帧进行YOLOv11推理,并将带有检测结果的帧保存为新的视频文件。
不同的编码格式有不同的压缩率和兼容性。
YOLOv11作为目前最先进的目标检测模型之一,其出色的速度和精度平衡使其成为视频处理任务的理想选择。
接着,我们获取视频的基本信息,如帧率、总帧数、宽度和高度。
多线程/多进程:使用多线程或多进程可以并行处理不同的任务,如读取、处理和写入。
为了管理项目依赖,强烈建议使用虚拟环境。
在视频处理中,我们可能需要调整输出视频的帧率,例如: 降低帧率以减小文件大小提高帧率以创建慢动作效果保持原始帧率但改变播放速度 以下是一个支持帧率调整和时间控制的视频处理函数: import cv2 import numpy as np from ultralytics import YOLO import time def process_video_with_fps_control( input_path, output_path, model, output_fps=None, speed_factor=1.0, frame_interval=None, use_time_stretching=False ): """ 支持帧率调整和时间控制的视频处理 参数: input_path: 输入视频路径 output_path: 输出视频路径 model: YOLOv11模型 output_fps: 输出视频帧率 (None表示使用原视频帧率) speed_factor: 播放速度因子 (1.0表示正常速度, 0.5表示慢动作, 2.0表示快放) frame_interval: 帧间隔 (None表示不跳帧, 2表示每2帧处理1帧) use_time_stretching: 是否使用时间拉伸 (改变播放速度但不改变帧率) """ # 打开输入视频 cap = cv2.VideoCapture(input_path) if not cap.isOpened(): print(f"Error: Could not open video file {input_path}") return # 获取视频基本信息 original_fps = cap.get(cv2.CAP_PROP_FPS) original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 计算输出参数 output_fps = output_fps if output_fps is not None else original_fps frame_interval = frame_interval if frame_interval is not None else 1 # 如果使用时间拉伸,计算调整后的帧率 if use_time_stretching: adjusted_fps = original_fps * speed_factor else: adjusted_fps = output_fps # 创建输出视频 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, adjusted_fps, (original_width, original_height)) if not out.isOpened(): print(f"Error: Could not create output video {output_path}") cap.release() return # 计算处理参数 total_output_frames = int(frame_count / frame_interval) if use_time_stretching: total_output_frames = int(frame_count * speed_factor) print(f"Input video: {original_width}x{original_height}, {original_fps} fps, {frame_count} frames") print(f"Output video: {original_width}x{original_height}, {adjusted_fps} fps, ~{total_output_frames} frames") print(f"Speed factor: {speed_factor}x, Frame interval: {frame_interval}") # 处理视频 input_frame_count = 0 output_frame_count = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: break # 跳帧处理 if input_frame_count % frame_interval != 0: input_frame_count += 1 continue # 进行YOLOv11推理 results = model(frame, verbose=False) # 在帧上绘制检测结果 annotated_frame = results[0].plot() # 如果使用时间拉伸,可能需要重复或跳过帧 if use_time_stretching: # 慢动作:重复帧 if speed_factor < 1.0: repeat_count = int(1.0 / speed_factor) for _ in range(repeat_count): out.write(annotated_frame) output_frame_count += 1 # 快放:跳过一些帧 elif speed_factor > 1.0: skip_count = int(speed_factor) if input_frame_count % skip_count == 0: out.write(annotated_frame) output_frame_count += 1 # 正常速度 else: out.write(annotated_frame) output_frame_count += 1 else: # 不使用时间拉伸,直接写入 out.write(annotated_frame) output_frame_count += 1 input_frame_count += 1 # 显示进度 if input_frame_count % 10 == 0: elapsed_time = time.time() - start_time fps_processing = output_frame_count / elapsed_time progress = input_frame_count / frame_count * 100 print(f"Processed {input_frame_count}/{frame_count} frames ({progress:.1f}%), " f"Output {output_frame_count} frames, {fps_processing:.2f} fps") # 释放资源 cap.release() out.release() # 计算处理时间 end_time = time.time() processing_time = end_time - start_time fps_processing = output_frame_count / processing_time print(f"Video processing completed in {processing_time:.2f} seconds") print(f"Processing speed: {fps_processing:.2f} fps") print(f"Output: {output_frame_count} frames, saved to {output_path}") # 使用示例 from ultralytics import YOLO # 加载模型 model = YOLO('yolov11s.pt') # 示例1: 降低帧率 print("Example 1: Reducing frame rate") process_video_with_fps_control( input_path='input_video.mp4', output_path='output_video_15fps.mp4', model=model, output_fps=15 # 将帧率降低到15fps ) # 示例2: 慢动作效果 print("\nExample 2: Creating slow motion effect") process_video_with_fps_control( input_path='input_video.mp4', output_path='output_video_slow_motion.mp4', model=model, speed_factor=0.5, # 0.5倍速度(慢动作) use_time_stretching=True ) # 示例3: 快放效果 print("\nExample 3: Creating fast forward effect") process_video_with_fps_control( input_path='input_video.mp4', output_path='output_video_fast_forward.mp4', model=model, speed_factor=2.0, # 2倍速度(快放) use_time_stretching=True ) # 示例4: 跳帧处理 print("\nExample 4: Frame skipping") process_video_with_fps_control( input_path='input_video.mp4', output_path='output_video_skip_frames.mp4', model=model, frame_interval=2 # 每2帧处理1帧 ) 这个函数提供了多种帧率调整和时间控制选项: output_fps:直接设置输出视频的帧率speed_factor:调整播放速度,小于1.0为慢动作,大于1.0为快放frame_interval:跳帧处理,例如设置为2表示每2帧处理1帧use_time_stretching:是否使用时间拉伸,改变播放速度但不改变帧率 通过组合这些参数,你可以实现各种视频效果,如慢动作、快放、降低帧率以减小文件大小等。
以下是一些常见的视频编码格式: 编码格式特点优点缺点适用场景H.264 (AVC)广泛使用的视频编码标准兼容性好,压缩效率高相对较旧,压缩效率不如H.265大多数视频平台和设备H.265 (HEVC)H.264的继任者压缩效率比H.264高50%计算复杂度高,兼容性不如H.2644K/8K视频,流媒体VP9Google开发的开源编码格式免版税,压缩效率高兼容性不如H.264YouTube,Web视频AV1Alliance for Open Media开发的开源编码格式压缩效率比H.265更高计算复杂度非常高,硬件支持有限未来流媒体标准MPEG-4包含多种编码格式的标准支持多种内容类型压缩效率不如H.264旧设备和系统 在OpenCV中,我们可以使用cv2.VideoWriter_fourcc()函数指定编码格式。
自适应处理:系统可能会根据视频内容和计算资源自动调整处理参数,实现最佳的性能和质量平衡。
如果你还没有安装Python,可以从Python官网下载并安装。
解决方案:安装额外的编解码器,或者使用FFmpeg进行格式转换。
六、高级视频处理技巧 6.1 多线程视频处理优化 视频处理是一个计算密集型任务,特别是当使用高分辨率的视频或大型YOLOv11模型时。
四、YOLOv11模型加载与推理 4.1 YOLOv11模型架构简介 YOLOv11是YOLO(You Only Look Once)系列目标检测模型的最新版本,它在保持高速推理的同时,进一步提高了检测精度。
安装时记得勾选"Add Python to PATH"选项,这样可以在命令行中直接使用Python。
确保安装了正确版本的CUDA和cuDNN。
比特率:每秒传输的数据量,影响视频质量和文件大小。
锚点无关检测:直接预测边界框的中心点、宽高和类别,不需要预设锚点框。
容器格式:用于封装视频、音频等数据的文件格式,如MP4、AVI、MOV等。
帧率调整:我们学习了如何调整输出视频的帧率,以及如何实现慢动作和快放效果。
解决方案:使用更高质量的编码格式,调整编码参数,或者避免多次编码解码。
此外,调整帧率通常只会影响读取速度,而不会改变原始视频的帧率。
你需要根据你的GPU内存大小和视频分辨率来选择合适的批量大小。
在实际应用中,我们需要在process_frame()函数中添加YOLOv11检测逻辑。
端到端视频理解:未来的模型可能会直接处理视频序列,而不是逐帧处理,从而更好地利用时序信息。
如果速度是关键,选择较小的模型(如YOLOv11n);如果精度更重要,选择较大的模型(如YOLOv11x)。
虚拟环境可以隔离不同项目的依赖,避免版本冲突。
不同视频格式兼容性问题: 问题:某些视频格式无法正确读取或写入。
视频编码格式:我们探讨了不同的视频编码格式,以及如何选择合适的编码格式来平衡质量和文件大小。
编码格式:用于压缩视频数据的算法,常见的有H.264、H.265、MPEG-4等。
二、环境准备与依赖安装 2.1 Python环境配置 在开始编写YOLOv11视频处理脚本之前,我们需要确保有一个合适的Python环境。
总之,YOLOv11视频处理是一个充满活力的领域,有着广阔的应用前景。
最后,我们读取并显示视频的第一帧。
这个项目将具备以下功能: 支持多种输入源(视频文件、摄像头、网络视频流)支持多种输出格式(MP4、AVI等)支持调整输出视频的帧率和分辨率支持目标跟踪和轨迹可视化支持多线程处理以提高性能提供友好的命令行界面支持配置文件和参数保存 下面是项目的整体架构图: 输入源 视频读取器 帧队列 帧处理器 结果队列 视频写入器 输出视频 配置管理器 跟踪器 性能监控器 命令行界面 7.2 项目代码实现 以下是完整的项目代码实现: #!/usr/bin/env python3 """ YOLOv11 Video Processing Tool A comprehensive tool for processing videos with YOLOv11, supporting various input sources, output formats, and advanced features like object tracking and multi-threading. """ import cv2 import numpy as np from ultralytics import YOLO import time import threading import queue import argparse import json import os import random from collections import deque import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class Config: """配置管理类""" def __init__(self, config_file=None): """ 初始化配置 参数: config_file: 配置文件路径 """ # 默认配置 self.config = { "model": { "path": "yolov11s.pt", "conf_threshold": 0.5, "iou_threshold": 0.45, "device": "auto" # auto, cpu, cuda }, "input": { "source": "", # 视频文件路径、摄像头ID或视频流URL "type": "file" # file, camera, stream }, "output": { "path": "output.mp4", "fps": None, # None表示使用原视频帧率 "width": None, # None表示使用原视频宽度 "height": None, # None表示使用原视频高度 "codec": "mp4v" }, "processing": { "enable_tracking": False, "show_trajectory": True, "trajectory_length": 20, "num_threads": 2, "batch_size": 1, "save_frames": False, "frames_dir": "frames" }, "display": { "show_fps": True, "show_confidence": True, "show_class": True, "font_scale": 0.5, "thickness": 2, "bbox_color": [0, 255, 0], # BGR格式 "text_color": [0, 0, 0] # BGR格式 } } # 如果提供了配置文件,加载配置 if config_file and os.path.exists(config_file): self.load_config(config_file) def load_config(self, config_file): """ 从文件加载配置 参数: config_file: 配置文件路径 """ try: with open(config_file, 'r') as f: file_config = json.load(f) # 递归更新配置 self._update_config(self.config, file_config) logger.info(f"Configuration loaded from {config_file}") except Exception as e: logger.error(f"Error loading configuration: {e}") def save_config(self, config_file): """ 保存配置到文件 参数: config_file: 配置文件路径 """ try: with open(config_file, 'w') as f: json.dump(self.config, f, indent=4) logger.info(f"Configuration saved to {config_file}") except Exception as e: logger.error(f"Error saving configuration: {e}") def _update_config(self, base_config, new_config): """ 递归更新配置 参数: base_config: 基础配置 new_config: 新配置 """ for key, value in new_config.items(): if key in base_config and isinstance(base_config[key], dict) and isinstance(value, dict): self._update_config(base_config[key], value) else: base_config[key] = value def get(self, key_path, default=None): """ 获取配置值 参数: key_path: 配置键路径,如 "model.conf_threshold" default: 默认值 返回: 配置值 """ keys = key_path.split('.') value = self.config try: for key in keys: value = value[key] return value except (KeyError, TypeError): return default def set(self, key_path, value): """ 设置配置值 参数: key_path: 配置键路径,如 "model.conf_threshold" value: 配置值 """ keys = key_path.split('.') config = self.config # 导航到最后一级的父级 for key in keys[:-1]: if key not in config: config[key] = {} config = config[key] # 设置值 config[keys[-1]] = value class SimpleTracker: """简单目标跟踪器""" def __init__(self, max_disappeared=10, max_distance=50): """ 初始化跟踪器 参数: max_disappeared: 目标消失的最大帧数 max_distance: 最大关联距离 """ self.next_object_id = 0 self.objects = {} self.disappeared = {} self.max_disappeared = max_disappeared self.max_distance = max_distance # 为每个跟踪对象分配颜色 self.colors = {} def register(self, centroid): """注册新对象""" self.objects[self.next_object_id] = { 'centroid': centroid, 'trajectory': deque(maxlen=20) # 保存最近20个位置点 } self.disappeared[self.next_object_id] = 0 self.colors[self.next_object_id] = ( random.randint(0, 255), random.randint(0, 255), random.randint(0, 255) ) self.next_object_id += 1 def deregister(self, object_id): """注销对象""" if object_id in self.objects: del self.objects[object_id] if object_id in self.disappeared: del self.disappeared[object_id] if object_id in self.colors: del self.colors[object_id] def update(self, rects): """更新跟踪器""" if len(rects) == 0: # 标记所有现有对象为消失 for object_id in list(self.disappeared.keys()): self.disappeared[object_id] += 1 # 如果对象消失时间过长,注销它 if self.disappeared[object_id] > self.max_disappeared: self.deregister(object_id) return self.objects # 计算输入矩形的中心点 input_centroids = np.zeros((len(rects), 2), dtype="int") for (i, (start_x, start_y, end_x, end_y)) in enumerate(rects): c_x = int((start_x + end_x) / 2.0) c_y = int((start_y + end_y) / 2.0) input_centroids[i] = (c_x, c_y) # 如果当前没有跟踪的对象,注册所有输入中心点 if len(self.objects) == 0: for i in range(len(input_centroids)): self.register(input_centroids[i]) else: # 获取现有对象的中心点 object_centroids = [obj['centroid'] for obj in self.objects.values()] # 计算距离矩阵 D = np.linalg.norm(np.array(object_centroids)[:, np.newaxis] - input_centroids, axis=2) # 找到最小距离的行和列索引 rows = D.min(axis=1).argsort() cols = D.argmin(axis=1)[rows] # 跟踪已使用的行和列索引 used_row_idxs = set() used_col_idxs = set() # 遍历(行,列)索引元组的组合 for (row, col) in zip(rows, cols): # 如果已经检查过这个行或列,忽略它 if row in used_row_idxs or col in used_col_idxs: continue # 如果距离大于最大距离,不关联 if D[row, col] > self.max_distance: continue # 获取对象ID并更新其中心点 object_id = list(self.objects.keys())[row] self.objects[object_id]['centroid'] = input_centroids[col] self.objects[object_id]['trajectory'].append(input_centroids[col]) self.disappeared[object_id] = 0 # 标记为已使用 used_row_idxs.add(row) used_col_idxs.add(col) # 计算未使用的行和列索引 unused_row_idxs = set(range(0, D.shape[0])).difference(used_row_idxs) unused_col_idxs = set(range(0, D.shape[1])).difference(used_col_idxs) # 如果对象数量大于等于输入中心点数量,检查是否有对象消失 if D.shape[0] >= D.shape[1]: for row in unused_row_idxs: object_id = list(self.objects.keys())[row] self.disappeared[object_id] += 1 # 如果对象消失时间过长,注销它 if self.disappeared[object_id] > self.max_disappeared: self.deregister(object_id) # 否则注册新的输入中心点作为可跟踪对象 else: for col in unused_col_idxs: self.register(input_centroids[col]) return self.objects class VideoReader: """视频读取器""" def __init__(self, source, source_type="file"): """ 初始化视频读取器 参数: source: 视频源(文件路径、摄像头ID或视频流URL) source_type: 源类型(file、camera、stream) """ self.source = source self.source_type = source_type self.cap = None self.video_info = {} def open(self): """ 打开视频源 返回: bool: 是否成功打开 """ if self.source_type == "camera": # 摄像头 try: camera_id = int(self.source) self.cap = cv2.VideoCapture(camera_id) except ValueError: logger.error(f"Invalid camera ID: {self.source}") return False else: # 视频文件或流 self.cap = cv2.VideoCapture(self.source) if not self.cap.isOpened(): logger.error(f"Could not open video source: {self.source}") return False # 获取视频基本信息 if self.source_type == "camera": # 摄像头可能不支持获取所有属性 self.video_info = { 'fps': 30.0, # 假设30fps 'width': int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), 'height': int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), 'frame_count': -1 # 摄像头没有总帧数 } else: self.video_info = { 'fps': self.cap.get(cv2.CAP_PROP_FPS), 'width': int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), 'height': int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), 'frame_count': int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) } logger.info(f"Video source opened: {self.video_info['width']}x{self.video_info['height']}, " f"{self.video_info['fps']} fps, {self.video_info['frame_count']} frames") return True def read(self): """ 读取一帧 返回: tuple: (ret, frame) ret表示是否成功读取,frame是图像帧 """ return self.cap.read() def release(self): """释放资源""" if self.cap is not None: self.cap.release() def get_info(self): """ 获取视频信息 返回: dict: 视频信息 """ return self.video_info class VideoWriter: """视频写入器""" def __init__(self, output_path, fps, width, height, codec='mp4v'): """ 初始化视频写入器 参数: output_path: 输出视频路径 fps: 帧率 width: 宽度 height: 高度 codec: 编码格式 """ self.output_path = output_path self.fps = fps self.width = width self.height = height self.codec = codec self.out = None # 创建输出目录(如果不存在) output_dir = os.path.dirname(output_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) def open(self): """ 打开视频写入器 返回: bool: 是否成功打开 """ fourcc = cv2.VideoWriter_fourcc(*self.codec) self.out = cv2.VideoWriter(self.output_path, fourcc, self.fps, (self.width, height)) if not self.out.isOpened(): logger.error(f"Could not create output video: {self.output_path}") return False logger.info(f"Output video opened: {self.width}x{self.height}, {self.fps} fps, codec: {self.codec}") return True def write(self, frame): """ 写入一帧 参数: frame: 图像帧 """ if self.out is not None: self.out.write(frame) def release(self): """释放资源""" if self.out is not None: self.out.release() class FrameProcessor: """帧处理器""" def __init__(self, config): """ 初始化帧处理器 参数: config: 配置对象 """ self.config = config # 加载YOLOv11模型 model_path = config.get("model.path") device = config.get("model.device") logger.info(f"Loading YOLOv11 model: {model_path}") self.model = YOLO(model_path) if device != "auto": self.model.to(device) # 初始化跟踪器(如果需要) self.tracker = None if config.get("processing.enable_tracking"): self.tracker = SimpleTracker() # 获取阈值 self.conf_threshold = config.get("model.conf_threshold") self.iou_threshold = config.get("model.iou_threshold") # 获取显示参数 self.show_fps = config.get("display.show_fps") self.show_confidence = config.get("display.show_confidence") self.show_class = config.get("display.show_class") self.font_scale = config.get("display.font_scale") self.thickness = config.get("display.thickness") self.bbox_color = tuple(config.get("display.bbox_color")) self.text_color = tuple(config.get("display.text_color")) # 性能统计 self.frame_count = 0 self.start_time = time.time() self.fps = 0 def process(self, frame): """ 处理一帧 参数: frame: 输入帧 返回: numpy.ndarray: 处理后的帧 """ # 进行YOLOv11推理 results = self.model( frame, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False ) # 获取检测结果 boxes = results[0].boxes # 如果启用跟踪,更新跟踪器 if self.tracker is not None and len(boxes) > 0: # 提取边界框坐标 rects = [] for box in boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() rects.append((int(x1), int(y1), int(x2), int(y2))) # 更新跟踪器 objects = self.tracker.update(rects) # 在帧上绘制跟踪结果 for (object_id, obj_data) in objects.items(): centroid = obj_data['centroid'] trajectory = obj_data['trajectory'] color = self.tracker.colors[object_id] # 绘制中心点 cv2.circle(frame, (int(centroid[0]), int(centroid[1])), 4, color, -1) # 绘制ID cv2.putText( frame, f"ID {object_id}", (int(centroid[0]) - 10, int(centroid[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, self.font_scale, color, self.thickness ) # 绘制轨迹 if self.config.get("processing.show_trajectory") and len(trajectory) > 1: for i in range(1, len(trajectory)): # 计算轨迹线的粗细(越新的点越粗) thickness = int(2 * (i / len(trajectory))) # 绘制轨迹线 cv2.line( frame, (int(trajectory[i-1][0]), int(trajectory[i-1][1])), (int(trajectory[i][0]), int(trajectory[i][1])), color, thickness ) # 在帧上绘制检测结果 annotated_frame = results[0].plot( conf=self.show_confidence, labels=self.show_class, font_size=self.font_scale, line_width=self.thickness, color=self.bbox_color ) # 更新性能统计 self.frame_count += 1 current_time = time.time() elapsed_time = current_time - self.start_time self.fps = self.frame_count / elapsed_time # 显示FPS if self.show_fps: cv2.putText( annotated_frame, f"FPS: {self.fps:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, self.font_scale, (0, 255, 0), self.thickness ) return annotated_frame def get_fps(self): """ 获取当前FPS 返回: float: 当前FPS """ return self.fps class VideoProcessor: """视频处理器主类""" def __init__(self, config): """ 初始化视频处理器 参数: config: 配置对象 """ self.config = config # 初始化组件 self.reader = None self.writer = None self.frame_processor = None # 线程相关 self.running = False self.frame_queue = queue.Queue(maxsize=10) self.result_queue = queue.Queue(maxsize=10) self.capture_thread = None self.process_thread = None self.write_thread = None # 性能统计 self.processed_frames = 0 self.start_time = None def setup(self): """ 设置处理器 返回: bool: 是否成功设置 """ # 获取输入源配置 source = self.config.get("input.source") source_type = self.config.get("input.type") # 创建视频读取器 self.reader = VideoReader(source, source_type) if not self.reader.open(): return False # 获取视频信息 video_info = self.reader.get_info() # 获取输出配置 output_path = self.config.get("output.path") output_fps = self.config.get("output.fps") or video_info['fps'] output_width = self.config.get("output.width") or video_info['width'] output_height = self.config.get("output.height") or video_info['height'] output_codec = self.config.get("output.codec") # 创建视频写入器 self.writer = VideoWriter(output_path, output_fps, output_width, output_height, output_codec) if not self.writer.open(): self.reader.release() return False # 创建帧处理器 self.frame_processor = FrameProcessor(self.config) logger.info("Video processor setup completed") return True def capture_frames(self): """捕获帧的线程函数""" while self.running: ret, frame = self.reader.read() if not ret: logger.info("No more frames to capture") break # 如果队列已满,等待一段时间 if self.frame_queue.full(): time.sleep(0.01) continue # 将帧放入队列 self.frame_queue.put(frame) # 放入结束标志 self.frame_queue.put(None) def process_frames(self): """处理帧的线程函数""" while True: try: # 从队列获取帧 frame = self.frame_queue.get(timeout=1.0) # 检查结束标志 if frame is None: break # 处理帧 processed_frame = self.frame_processor.process(frame) # 如果结果队列已满,等待一段时间 if self.result_queue.full(): time.sleep(0.01) continue # 将结果放入队列 self.result_queue.put(processed_frame) # 更新统计 self.processed_frames += 1 # 定期打印进度 if self.processed_frames % 30 == 0: elapsed_time = time.time() - self.start_time fps = self.processed_frames / elapsed_time logger.info(f"Processed {self.processed_frames} frames, {fps:.2f} fps") except queue.Empty: continue # 放入结束标志 self.result_queue.put(None) def write_frames(self): """写入帧的线程函数""" while True: try: # 从队列获取处理后的帧 frame = self.result_queue.get(timeout=1.0) # 检查结束标志 if frame is None: break # 写入帧 self.writer.write(frame) # 保存帧(如果需要) if self.config.get("processing.save_frames"): frame_dir = self.config.get("processing.frames_dir") if not os.path.exists(frame_dir): os.makedirs(frame_dir) frame_path = os.path.join(frame_dir, f"frame_{self.processed_frames:06d}.jpg") cv2.imwrite(frame_path, frame) except queue.Empty: continue def start(self): """ 开始处理 返回: bool: 是否成功开始 """ if not self.setup(): return False self.running = True self.start_time = time.time() # 创建并启动线程 self.capture_thread = threading.Thread(target=self.capture_frames) self.process_thread = threading.Thread(target=self.process_frames) self.write_thread = threading.Thread(target=self.write_frames) self.capture_thread.start() self.process_thread.start() self.write_thread.start() logger.info("Video processing started") return True def stop(self): """停止处理""" self.running = False # 等待线程结束 if self.capture_thread: self.capture_thread.join() if self.process_thread: self.process_thread.join() if self.write_thread: self.write_thread.join() # 释放资源 if self.reader: self.reader.release() if self.writer: self.writer.release() # 计算处理时间 if self.start_time: elapsed_time = time.time() - self.start_time fps = self.processed_frames / elapsed_time logger.info(f"Video processing completed in {elapsed_time:.2f} seconds") logger.info(f"Processed {self.processed_frames} frames, {fps:.2f} fps") logger.info("Video processing stopped") def wait(self): """等待处理完成""" if self.capture_thread: self.capture_thread.join() if self.process_thread: self.process_thread.join() if self.write_thread: self.write_thread.join() def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='YOLOv11 Video Processing Tool') # 输入源 input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument('--video', type=str, help='Input video file path') input_group.add_argument('--camera', type=int, help='Camera ID') input_group.add_argument('--stream', type=str, help='Video stream URL') # 输出 parser.add_argument('--output', type=str, default='output.mp4', help='Output video file path') # 模型 parser.add_argument('--model', type=str, default='yolov11s.pt', help='YOLOv11 model path') parser.add_argument('--conf', type=float, default=0.5, help='Confidence threshold') parser.add_argument('--iou', type=float, default=0.45, help='IOU threshold') parser.add_argument('--device', type=str, default='auto', choices=['auto', 'cpu', 'cuda'], help='Device to use') # 输出视频参数 parser.add_argument('--fps', type=float, default=None, help='Output video FPS') parser.add_argument('--width', type=int, default=None, help='Output video width') parser.add_argument('--height', type=int, default=None, help='Output video height') parser.add_argument('--codec', type=str, default='mp4v', help='Output video codec') # 处理选项 parser.add_argument('--tracking', action='store_true', help='Enable object tracking') parser.add_argument('--no-trajectory', action='store_true', help='Disable trajectory visualization') parser.add_argument('--threads', type=int, default=2, help='Number of processing threads') parser.add_argument('--save-frames', action='store_true', help='Save processed frames') parser.add_argument('--frames-dir', type=str, default='frames', help='Directory to save frames') # 配置文件 parser.add_argument('--config', type=str, help='Configuration file path') parser.add_argument('--save-config', type=str, help='Save configuration to file') return parser.parse_args() def main(): """主函数""" # 解析命令行参数 args = parse_arguments() # 创建配置 config = Config(args.config) # 从命令行参数更新配置 if args.video: config.set("input.source", args.video) config.set("input.type", "file") elif args.camera: config.set("input.source", str(args.camera)) config.set("input.type", "camera") elif args.stream: config.set("input.source", args.stream) config.set("input.type", "stream") config.set("output.path", args.output) config.set("model.path", args.model) config.set("model.conf_threshold", args.conf) config.set("model.iou_threshold", args.iou) config.set("model.device", args.device) if args.fps: config.set("output.fps", args.fps) if args.width: config.set("output.width", args.width) if args.height: config.set("output.height", args.height) config.set("output.codec", args.codec) config.set("processing.enable_tracking", args.tracking) if args.no_trajectory: config.set("processing.show_trajectory", False) config.set("processing.num_threads", args.threads) config.set("processing.save_frames", args.save_frames) config.set("processing.frames_dir", args.frames_dir) # 保存配置(如果需要) if args.save_config: config.save_config(args.save_config) # 创建并启动视频处理器 processor = VideoProcessor(config) try: if processor.start(): # 等待处理完成 processor.wait() except KeyboardInterrupt: logger.info("Processing interrupted by user") finally: processor.stop() if __name__ == "__main__": main() 这个项目提供了一个完整的YOLOv11视频处理解决方案,具有以下特点: 模块化设计:将功能分解为多个类,每个类负责特定功能配置管理:支持配置文件和命令行参数,灵活配置各种选项多线程处理:使用多线程提高处理性能目标跟踪:集成了简单的目标跟踪功能多种输入源:支持视频文件、摄像头和网络视频流性能监控:实时显示处理速度和进度错误处理:完善的错误处理和日志记录 7.3 项目使用示例 以下是使用这个项目的几个示例: 示例1:处理视频文件 python yolov11_video_processor.py --video input.mp4 --output output.mp4 --model yolov11s.pt --conf 0.5 示例2:处理摄像头输入 python yolov11_video_processor.py --camera 0 --output camera_output.mp4 --model yolov11m.pt --conf 0.6 示例3:处理网络视频流 python yolov11_video_processor.py --stream rtsp://example.com/stream --output stream_output.mp4 --model yolov11l.pt 示例4:启用目标跟踪 python yolov11_video_processor.py --video input.mp4 --output tracked_output.mp4 --tracking --model yolov11s.pt 示例5:使用配置文件 python yolov11_video_processor.py --config config.json --video input.mp4 --output output.mp4 配置文件示例(config.json): { "model": { "path": "yolov11s.pt", "conf_threshold": 0.5, "iou_threshold": 0.45, "device": "auto" }, "input": { "source": "input.mp4", "type": "file" }, "output": { "path": "output.mp4", "fps": null, "width": null, "height": null, "codec": "mp4v" }, "processing": { "enable_tracking": true, "show_trajectory": true, "trajectory_length": 20, "num_threads": 2, "batch_size": 1, "save_frames": false, "frames_dir": "frames" }, "display": { "show_fps": true, "show_confidence": true, "show_class": true, "font_scale": 0.5, "thickness": 2, "bbox_color": [0, 255, 0], "text_color": [0, 0, 0] } } 八、总结与展望 8.1 技术要点回顾 在这篇文章中,我们系统地学习了如何使用YOLOv11进行视频处理,从基础概念到高级技巧,从单线程处理到多线程优化。
视频输出质量差: 问题:输出视频质量明显低于输入视频。
这些场景都需要我们掌握如何将YOLOv11应用于视频处理。
以下是训练YOLOv11模型的简要步骤: 准备数据集:按照YOLO格式组织数据,包括图像文件和对应的标注文件创建配置文件:定义模型结构、训练参数等运行训练:使用model.train()方法开始训练评估模型:使用model.val()方法评估模型性能导出模型:使用model.export()方法导出模型为不同格式 训练模型的详细步骤超出了本文的范围,但你可以参考Ultralytics的官方文档获取更多信息。
例如,某些属性(如亮度、对比度)可能只适用于摄像头捕获的视频,而不适用于预录制的视频文件。
解决方案:减少批量大小,使用更小的模型,或者逐帧处理而不是批量处理。
当队列满时,我们会丢弃旧的帧,以避免内存堆积。
单帧处理:我们详细讲解了如何对单帧图像进行YOLOv11推理,包括阈值设置、结果提取和可视化。
实时视频流处理:我们实现了实时视频流处理,能够处理摄像头输入和网络视频流。
以下是加载不同类型模型的代码示例: from ultralytics import YOLO # 加载预训练模型 # 可选模型: yolov11n.pt, yolov11s.pt, yolov11m.pt, yolov11l.pt, yolov11x.pt pretrained_model = YOLO('yolov11s.pt') # 加载small版本的预训练模型 # 加载自定义训练的模型 # 假设你已经训练了自己的模型并保存为custom_model.pt custom_model = YOLO('path/to/your/custom_model.pt') # 加载模型时可以指定设备 # 使用GPU (如果可用) gpu_model = YOLO('yolov11s.pt').to('cuda') # 使用CPU cpu_model = YOLO('yolov11s.pt').to('cpu') # 打印模型信息 print("Pretrained model info:") pretrained_model.info() print("\nCustom model info:") custom_model.info() 如果你想要使用自定义训练的模型,首先需要准备训练数据并训练模型。
理解这些概念有助于我们在处理视频时做出合适的选择,比如在保证质量的前提下选择合适的编码格式来减小文件大小,或者根据需求调整输出视频的帧率。
想象一下,你正在开发一个智能监控系统,需要实时识别视频中的人、车、物体;或者你正在制作一个视频内容分析工具,想要自动标记视频中的所有物体;又或者你只是想在自己的视频上添加一些酷炫的检测效果。
除了批量推理,还有其他一些性能优化技巧: 模型量化:将模型权重从FP32转换为FP16或INT8,可以减少内存使用并提高推理速度。
Ultralytics提供了多种预训练模型,从轻量级的YOLOv11n到高精度的YOLOv11x,你可以根据你的需求选择合适的模型。
8.2 性能优化建议 在实际应用中,你可能需要进一步优化视频处理的性能。
使用TensorRT:NVIDIA的TensorRT可以进一步优化模型推理速度。
硬件加速:使用GPU进行推理可以显著提高处理速度。
6.3 实时视频流处理 除了处理预录制的视频文件,YOLOv11也可以用于处理实时视频流,如摄像头捕获、网络视频流等。
处理速度慢: 问题:视频处理速度太慢,无法满足实时需求。
YOLOv11有多个变体,从轻量级的YOLOv11n到高精度的YOLOv11x,适用于不同的应用场景。
以下是批量推理的代码示例: import cv2 import numpy as np from ultralytics import YOLO import time def batch_inference(model, frames, conf_threshold=0.5, iou_threshold=0.45): """ 对多帧图像进行批量YOLOv11推理 参数: model: YOLOv11模型 frames: 输入图像列表 (numpy数组列表) conf_threshold: 置信度阈值 iou_threshold: IOU阈值 (用于NMS) 返回: results_list: 检测结果列表 annotated_frames: 带有检测框的图像列表 """ # 确保输入是numpy数组列表 if not all(isinstance(frame, np.ndarray) for frame in frames): raise ValueError("All input frames must be numpy arrays") # 进行批量推理 results = model( frames, conf=conf_threshold, iou=iou_threshold, verbose=False # 不打印推理信息 ) # 处理结果 results_list = [] annotated_frames = [] for result in results: results_list.append(result) annotated_frames.append(result.plot()) return results_list, annotated_frames def process_video_with_batch_inference(video_path, output_path, batch_size=8): """ 使用批量推理处理视频 参数: video_path: 输入视频路径 output_path: 输出视频路径 batch_size: 批量大小 """ # 加载模型 model = YOLO('yolov11s.pt') # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video file {video_path}") return # 获取视频基本信息 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 创建视频写入对象 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # 批量处理视频 frame_count = 0 batch_frames = [] start_time = time.time() while True: ret, frame = cap.read() if not ret: break batch_frames.append(frame) # 当达到批量大小或视频结束时,进行推理 if len(batch_frames) == batch_size or (ret is False and batch_frames): # 批量推理 results, annotated_frames = batch_inference(model, batch_frames) # 写入处理后的帧 for annotated_frame in annotated_frames: out.write(annotated_frame) frame_count += len(batch_frames) print(f"Processed {frame_count} frames") # 清空批量 batch_frames = [] # 处理剩余的帧(如果有) if batch_frames: results, annotated_frames = batch_inference(model, batch_frames) for annotated_frame in annotated_frames: out.write(annotated_frame) frame_count += len(batch_frames) print(f"Processed {frame_count} frames") # 计算处理时间 end_time = time.time() processing_time = end_time - start_time fps_processing = frame_count / processing_time print(f"Video processing completed in {processing_time:.2f} seconds") print(f"Processing speed: {fps_processing:.2f} fps") print(f"Output saved to {output_path}") # 释放资源 cap.release() out.release() cv2.destroyAllWindows() # 使用示例 input_video = 'path/to/input/video.mp4' output_video = 'path/to/output/video.mp4' process_video_with_batch_inference(input_video, output_video, batch_size=8) 在上面的代码中,我们定义了一个batch_inference函数,它可以同时处理多帧图像。
YOLOv11模型:我们学习了YOLOv11模型的基本架构和特点,以及如何加载和使用预训练模型进行推理。
1.2 YOLOv11视频处理的基本流程 YOLOv11视频处理的基本流程可以概括为以下几个步骤: 视频读取:使用OpenCV等库打开视频文件,获取视频的基本信息(如帧率、分辨率、总帧数等)逐帧处理:循环读取视频的每一帧,将帧转换为YOLOv11可处理的格式目标检测:将每一帧输入到YOLOv11模型中,获取检测结果结果可视化:在原始帧上绘制检测框、类别标签和置信度视频写入:将处理后的帧写入新的视频文件资源释放:处理完成后释放所有资源 这个流程看似简单,但每个步骤都有许多细节需要注意,比如视频编码格式的选择、帧率的调整、内存管理等。
消费者线程的数量可以根据你的CPU核心数和GPU数量来调整。
2.2 必要库的安装与配置 YOLOv11视频处理需要几个关键的Python库,包括OpenCV用于视频读写,PyTorch用于模型推理,以及Ultralytics提供的YOLOv11实现。
6.2 目标跟踪与轨迹可视化 在视频处理中,目标跟踪是一个非常有用的功能,它可以在连续帧之间跟踪同一个目标,并绘制其运动轨迹。
目标跟踪不稳定: 问题:目标跟踪器频繁丢失目标或ID切换。
模型量化:将模型权重从FP32转换为FP16或INT8可以减少内存使用并提高推理速度。
批量大小(batch_size)是一个重要的参数,它影响内存使用和处理速度。
先进的损失函数:使用CIoU损失和Focal Loss的变体,提高难样本的学习效果。
目标跟踪:我们集成了简单的目标跟踪功能,可以在连续帧之间跟踪目标并绘制运动轨迹。
多模态融合:未来的视频处理可能会融合音频、文本等多模态信息,实现更全面的理解。
以下是创建和激活虚拟环境的步骤: # 创建名为yolov11_video的虚拟环境 python -m venv yolov11_video # 激活虚拟环境 (Windows) yolov11_video\Scripts\activate # 激活虚拟环境 (Linux/Mac) source yolov11_video/bin/activate 激活虚拟环境后,你的命令行提示符前会显示(yolov11_video),表示你已经在虚拟环境中。
你可以加载预训练模型,也可以加载自己训练的模型。
解决方案:使用GPU加速,选择更小的模型,减少输入分辨率,或者跳过一些帧。
以下是一个处理实时视频流的示例: import cv2 import numpy as np from ultralytics import YOLO import time import threading import queue class RealTimeVideoProcessor: def __init__(self, model_path='yolov11s.pt', conf_threshold=0.5, iou_threshold=0.45): """ 初始化实时视频处理器 参数: model_path: YOLOv11模型路径 conf_threshold: 置信度阈值 iou_threshold: IOU阈值 """ # 加载YOLOv11模型 self.model = YOLO(model_path) # 设置阈值 self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold # 初始化变量 self.cap = None self.running = False self.frame_queue = queue.Queue(maxsize=1) # 只保存最新帧 self.result_queue = queue.Queue(maxsize=1) # 只保存最新结果 # 统计信息 self.fps = 0 self.frame_count = 0 self.start_time = time.time() def open_camera(self, camera_id=0, width=640, height=480, fps=30): """ 打开摄像头 参数: camera_id: 摄像头ID (通常为0) width: 视频宽度 height: 视频高度 fps: 帧率 返回: bool: 是否成功打开 """ # 打开摄像头 self.cap = cv2.VideoCapture(camera_id) if not self.cap.isOpened(): print(f"Error: Could not open camera {camera_id}") return False # 设置摄像头参数 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FPS, fps) # 获取实际参数 actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) print(f"Camera opened: {actual_width}x{actual_height}, {actual_fps} fps") return True def open_video_stream(self, stream_url): """ 打开视频流 参数: stream_url: 视频流URL 返回: bool: 是否成功打开 """ # 打开视频流 self.cap = cv2.VideoCapture(stream_url) if not self.cap.isOpened(): print(f"Error: Could not open video stream {stream_url}") return False # 获取视频流参数 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = self.cap.get(cv2.CAP_PROP_FPS) print(f"Video stream opened: {width}x{height}, {fps} fps") return True def capture_frames(self): """捕获帧的线程函数""" while self.running: ret, frame = self.cap.read() if not ret: print("Error: Could not read frame") continue # 如果队列已满,清空队列 if self.frame_queue.full(): try: self.frame_queue.get_nowait() except queue.Empty: pass # 将帧放入队列 self.frame_queue.put(frame) def process_frames(self): """处理帧的线程函数""" while self.running: try: # 从队列获取帧 frame = self.frame_queue.get(timeout=1.0) # 进行YOLOv11推理 results = self.model( frame, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False ) # 在帧上绘制检测结果 annotated_frame = results[0].plot() # 如果结果队列已满,清空队列 if self.result_queue.full(): try: self.result_queue.get_nowait() except queue.Empty: pass # 将结果放入队列 self.result_queue.put((frame, annotated_frame, results)) # 更新统计信息 self.frame_count += 1 current_time = time.time() elapsed_time = current_time - self.start_time self.fps = self.frame_count / elapsed_time except queue.Empty: continue def start_processing(self): """开始处理""" if self.cap is None: print("Error: No camera or video stream opened") return False self.running = True # 创建并启动线程 self.capture_thread = threading.Thread(target=self.capture_frames) self.process_thread = threading.Thread(target=self.process_frames) self.capture_thread.start() self.process_thread.start() return True def stop_processing(self): """停止处理""" self.running = False # 等待线程结束 if hasattr(self, 'capture_thread'): self.capture_thread.join() if hasattr(self, 'process_thread'): self.process_thread.join() # 释放资源 if self.cap is not None: self.cap.release() cv2.destroyAllWindows() def get_latest_result(self): """获取最新的处理结果""" try: return self.result_queue.get_nowait() except queue.Empty: return None def display_results(self, window_name="YOLOv11 Real-time Detection"): """显示处理结果""" while self.running: result = self.get_latest_result() if result is not None: frame, annotated_frame, results = result # 在图像上显示FPS cv2.putText( annotated_frame, f"FPS: {self.fps:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2 ) # 显示结果 cv2.imshow(window_name, annotated_frame) # 按'q'键退出 if cv2.waitKey(1) & 0xFF == ord('q'): self.running = False break def main(): # 创建实时视频处理器 processor = RealTimeVideoProcessor( model_path='yolov11s.pt', conf_threshold=0.5, iou_threshold=0.45 ) # 选择输入源 input_source = input("Choose input source (1: Camera, 2: Video Stream URL): ") if input_source == "1": # 打开摄像头 camera_id = int(input("Enter camera ID (usually 0): ")) width = int(input("Enter width (default 640): ") or "640") height = int(input("Enter height (default 480): ") or "480") fps = int(input("Enter FPS (default 30): ") or "30") if not processor.open_camera(camera_id, width, height, fps): return else: # 打开视频流 stream_url = input("Enter video stream URL: ") if not processor.open_video_stream(stream_url): return # 开始处理 if not processor.start_processing(): return # 显示结果 try: processor.display_results() finally: # 停止处理 processor.stop_processing() if __name__ == "__main__": main() 这个实时视频处理器使用了多线程技术来保持低延迟: 主线程:负责显示结果和处理用户输入捕获线程:负责从摄像头或视频流中捕获帧处理线程:负责对帧进行YOLOv11推理 通过使用队列来传递帧和结果,我们可以确保处理线程总是使用最新的帧,从而减少延迟。
模型剪枝:移除模型中不重要的权重,可以减少模型大小和计算量。
因此,视频处理的核心就是逐帧处理,然后将处理后的帧重新组合成视频。
视频处理脚本:我们构建了一个基础的视频处理脚本,能够读取视频文件,逐帧进行YOLOv11推理,并输出处理后的视频。
在选择模型时,需要根据你的应用场景在精度和速度之间做出权衡。
以下是使用FP16精度进行推理的示例: # 加载模型并使用FP16精度 model = YOLO('yolov11s.pt').to('cuda').half() # 确保输入也是FP16格式 frame = cv2.imread('image.jpg') frame_half = frame.astype(np.float16) / 255.0 # 进行推理 results = model(frame_half, verbose=False) 五、视频处理脚本实现 5.1 基础视频处理脚本框架 现在我们已经了解了YOLOv11模型和视频处理的基础知识,接下来我们将构建一个完整的视频处理脚本。
推理结果包含多种信息: boxes:边界框信息,包括坐标、置信度和类别masks:分割掩码(如果模型支持实例分割)keypoints:关键点信息(如果模型支持姿态估计)probs:分类概率(如果是分类任务)obb:旋转边界框(如果模型支持旋转目标检测) 我们可以通过results[0].plot()方法直接在图像上绘制检测结果,也可以手动提取检测结果并自定义绘制方式。
1.3 视频处理中的关键概念解析 在深入代码实现之前,我们需要理解几个视频处理中的关键概念: 帧率(FPS):每秒显示的帧数,常见的有24fps、30fps、60fps等。
一个视频文件本质上是一系列按特定速率播放的图像帧集合,加上音频轨道(可选)。
跟踪器的工作原理如下: 检测阶段:使用YOLOv11检测每一帧中的目标关联阶段:将当前帧的检测框与前一帧的跟踪对象关联更新阶段:更新跟踪对象的位置和轨迹绘制阶段:在图像上绘制跟踪结果和轨迹 在实际应用中,你可能需要使用更成熟的跟踪算法,如DeepSORT、StrongSORT等。
分辨率:视频的宽度和高度,如1920×1080(1080p)、1280×720(720p)等。
这些算法通常有更好的跟踪性能,特别是在目标遮挡、快速移动等复杂场景下。
然后,我们实现了一个process_video_with_batch_inference函数,它使用批量推理来处理视频。
注册
登录控制台
