HazardInspector/lib/qwen_fun_vid.py

491 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from pathlib import Path
import numpy as np
import supervision as sv
import cv2
from concurrent.futures import ThreadPoolExecutor, as_completed
def generate_video_with_boxes(
boxes_data: list[dict],
input_video_path: str,
output_video_path: str,
frame_annotation_interval: int
) -> None:
"""
将提供的标注数据渲染到视频上,支持按帧间隔进行标注。
:param boxes_data: 包含标注信息的列表,每个元素结构为:
{
"frame_id": int,
"boxes": List[Tuple[int, int, int, int, str]] # (x1, y1, x2, y2, label)
}
:param input_video_path: 输入视频文件路径
:param output_video_path: 输出视频文件路径
:param frame_annotation_interval: 标注间隔(单位:帧),默认为 1每帧标注
:return: 无
"""
# -------------------------------------------------
# 1. 视频读取与基本配置
# -------------------------------------------------
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
raise FileNotFoundError(f"无法打开视频文件或流: {input_video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# -------------------------------------------------
# 2. 初始化 VideoWriter
# -------------------------------------------------
Path(os.path.dirname(output_video_path)).mkdir(parents=True, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_video_path, fourcc, fps, (video_width, video_height))
# -------------------------------------------------
# 3. 初始化标注工具
# -------------------------------------------------
box_annotator = sv.BoxAnnotator()
label_annotator = sv.RichLabelAnnotator(
font_path="C:/Windows/Fonts/simhei.ttf",
text_color=sv.Color.WHITE,
text_padding=5,
font_size=20
)
# -------------------------------------------------
# 4. 数据预处理:构建 frame_id -> boxes 的映射
# -------------------------------------------------
frame_to_boxes: dict[int, list[tuple[int, int, int, int, str]]] = {}
for entry in boxes_data:
frame_id = entry["frame_id"]
boxes = entry["boxes"]
# 确保每个框都有正确的结构
cleaned_boxes = []
for box in boxes:
if len(box) == 5:
cleaned_boxes.append(box) # (x1, y1, x2, y2, label)
frame_to_boxes[frame_id] = cleaned_boxes
# -------------------------------------------------
# 5. 主循环:逐帧读取并渲染
# -------------------------------------------------
frame_idx = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 计算当前帧对应的标注帧索引(考虑间隔)
annotation_frame_idx = frame_idx // frame_annotation_interval
# 如果当前帧没有标注数据,直接写入原始帧
if annotation_frame_idx not in frame_to_boxes:
out.write(frame)
frame_idx += 1
continue
# 获取当前帧的所有框信息
boxes_info = frame_to_boxes[annotation_frame_idx]
boxes = []
labels = []
for _, (x1, y1, x2, y2, label) in enumerate(boxes_info):
# 坐标
boxes.append([x1, y1, x2, y2])
# 使用索引作为唯一标识符,或直接使用 label
labels.append(label)
# 转换为 NumPy 数组
boxes_np = np.array(boxes, dtype=np.float64)
# 构建 Detections 对象
detections = sv.Detections(
xyxy=boxes_np,
confidence=np.ones(len(boxes_np)), # 默认置信度为 1.0
class_id=np.zeros(len(boxes_np), dtype=int) # 类别 ID 在此场景下不重要
)
detections.tracker_id = np.arange(len(boxes_np), dtype=int) # 使用索引作为 ID
# 绘制边框和标签
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
annotated_frame = label_annotator.annotate(
scene=annotated_frame,
detections=detections,
labels=labels
)
# 写入帧
out.write(annotated_frame)
frame_idx += 1
# -------------------------------------------------
# 6. 资源释放
# -------------------------------------------------
cap.release()
out.release()
cv2.destroyAllWindows()
print(f"视频渲染完成,已保存至: {output_video_path}")
def process_track_id(
track_id: int,
frame_list: list[tuple[int, list[int]]],
input_video_path: str,
output_video_root: str,
frame_width: int,
frame_height: int,
target_fps: int,
frame_interval: int
) -> str:
"""
处理单个track_id生成对应的视频
"""
# 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒)
min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
object_based_frames = len(frame_list) * frame_interval
max_output_frame = max(object_based_frames, min_frames_for_2s)
# 创建输出视频路径
output_path = os.path.join(output_video_root, f"{track_id}.mp4")
# 尝试使用GPU硬件编码
try:
# 对于不同平台的GPU编码使用不同的fourcc
# Windows平台使用h264_nvenc或h264_amf
# 如果GPU编码不可用会回退到CPU编码
fourcc = cv2.VideoWriter_fourcc(*'h264') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
# 检查是否成功打开
if not out.isOpened():
# 尝试其他编码方式
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
except Exception:
# 异常时使用CPU编码
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
if not out.isOpened():
print(f"无法创建视频文件: {output_path}")
return f"失败: {output_path}"
# 打开原视频(每个线程独立打开,避免线程安全问题)
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
out.release()
return f"失败: 无法打开视频 {input_video_path}"
# 生成视频帧
current_output_frame = 0
obj_frame_idx = 0
while current_output_frame < max_output_frame:
# 检查当前输出帧是否是5的倍数
if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list):
# 这是需要放置物体帧的位置
original_frame_id, xyxy = frame_list[obj_frame_idx]
# 设置原视频读取位置
cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id)
ret, frame = cap.read()
if not ret:
# 读取失败,使用黑色帧
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
else:
# 有数据,截取对应区域
x1, y1, x2, y2 = map(int, xyxy)
# 确保坐标在有效范围内
x1 = max(0, min(x1, frame_width))
y1 = max(0, min(y1, frame_height))
x2 = max(0, min(x2, frame_width))
y2 = max(0, min(y2, frame_height))
# 创建黑色背景
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 将截取的区域放到输出帧中(保持原位置)
if x2 > x1 and y2 > y1:
cropped = frame[y1:y2, x1:x2]
output_frame[y1:y2, x1:x2] = cropped
# 移到下一个物体帧
obj_frame_idx += 1
else:
# 剩余帧留黑
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 写入帧
out.write(output_frame)
current_output_frame += 1
# 释放资源
out.release()
cap.release()
print(f"已生成视频: {output_path}, 共 {current_output_frame}")
return f"成功: {output_path}"
def frame_all_to_obj_vid(
json_data: dict,
input_video_path: str,
output_video_root: str,
) -> None:
"""
根据标注数据从原视频中截取物体生成ai读取专用视频
参数:
json_data: 标注数据
input_video_path: 原视频路径
output_video_root: 输出视频根目录
"""
# 确保输出目录存在
os.makedirs(output_video_root, exist_ok=True)
# 1. 从 json_data 中提取数据,按 track_id 组织
track_dict: dict[int, list[tuple[int, list[int]]]] = {}
# 遍历每一帧
for frame_id_str, detections in json_data.items():
frame_id = int(frame_id_str)
for det in detections:
track_id = det.get("track_id", -1)
xyxy = det.get("xyxy", [0, 0, 0, 0])
if track_id not in track_dict:
track_dict[track_id] = []
track_dict[track_id].append((frame_id, xyxy))
# 2. 获取视频信息(只需要获取一次)
temp_cap = cv2.VideoCapture(input_video_path)
if not temp_cap.isOpened():
raise ValueError(f"无法打开视频: {input_video_path}")
frame_width = int(temp_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(temp_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
temp_cap.release()
# 目标fps为25每5帧取一帧0, 5, 10...
target_fps = 25
frame_interval = 5 # 每隔5帧取一帧
# 3. 使用多线程并行处理多个track_id
# 根据CPU核心数设置线程池大小
max_workers = min(os.cpu_count() or 4, len(track_dict))
print(f"使用 {max_workers} 个线程并行处理")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_track = {
executor.submit(
process_track_id,
track_id,
frame_list,
input_video_path,
output_video_root,
frame_width,
frame_height,
target_fps,
frame_interval
):
track_id for track_id, frame_list in track_dict.items()
}
# 等待所有任务完成
for future in as_completed(future_to_track):
track_id = future_to_track[future]
try:
result = future.result()
print(f"Track ID {track_id}: {result}")
except Exception as e:
print(f"Track ID {track_id} 处理失败: {str(e)}")
# def frame_all_to_obj_vid(
# json_data: dict,
# input_video_path: str,
# output_video_root: str,
# ) -> None:
# """
# 根据标注数据从原视频中截取物体生成ai读取专用视频
# 参数:
# json_data: 标注数据
# input_video_path: 原视频路径
# output_video_root: 输出视频根目录
# """
# # 确保输出目录存在
# os.makedirs(output_video_root, exist_ok=True)
# # 1. 从 json_data 中提取数据,按 track_id 组织
# track_dict: dict[int, list[tuple[int, list[int]]]] = {}
# # 遍历每一帧
# for frame_id_str, detections in json_data.items():
# frame_id = int(frame_id_str)
# for det in detections:
# track_id = det.get("track_id", -1)
# xyxy = det.get("xyxy", [0, 0, 0, 0])
# if track_id not in track_dict:
# track_dict[track_id] = []
# track_dict[track_id].append((frame_id, xyxy))
# # 2. 打开原视频
# cap = cv2.VideoCapture(input_video_path)
# if not cap.isOpened():
# raise ValueError(f"无法打开视频: {input_video_path}")
# # 获取原视频信息
# original_fps = cap.get(cv2.CAP_PROP_FPS)
# total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# # 目标fps为25每5帧取一帧0, 5, 10...
# target_fps = 25
# frame_interval = 5 # 每隔5帧取一帧
# # 为每个 track_id 生成视频
# for track_id, frame_list in track_dict.items():
# # 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒)
# min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
# object_based_frames = len(frame_list) * frame_interval
# max_output_frame = max(object_based_frames, min_frames_for_2s)
# # 创建输出视频路径
# output_path = os.path.join(output_video_root, f"{track_id}.mp4")
# # 创建视频写入器
# fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
# out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
# if not out.isOpened():
# print(f"无法创建视频文件: {output_path}")
# continue
# # 生成视频帧
# current_output_frame = 0
# obj_frame_idx = 0
# while current_output_frame < max_output_frame:
# # 检查当前输出帧是否是5的倍数
# if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list):
# # 这是需要放置物体帧的位置
# original_frame_id, xyxy = frame_list[obj_frame_idx]
# # 设置原视频读取位置
# cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id)
# ret, frame = cap.read()
# if not ret:
# # 读取失败,使用黑色帧
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# else:
# # 有数据,截取对应区域
# x1, y1, x2, y2 = map(int, xyxy)
# # 确保坐标在有效范围内
# x1 = max(0, min(x1, frame_width))
# y1 = max(0, min(y1, frame_height))
# x2 = max(0, min(x2, frame_width))
# y2 = max(0, min(y2, frame_height))
# # 创建黑色背景
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# # 将截取的区域放到输出帧中(保持原位置)
# if x2 > x1 and y2 > y1:
# cropped = frame[y1:y2, x1:x2]
# output_frame[y1:y2, x1:x2] = cropped
# # 移到下一个物体帧
# obj_frame_idx += 1
# else:
# # 剩余帧留黑
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# # 写入帧
# out.write(output_frame)
# current_output_frame += 1
# # 释放视频写入器
# out.release()
# print(f"已生成视频: {output_path}, 共 {current_output_frame} 帧")
# # 释放原视频
# cap.release()
def create_mian_vid_for_ai(
input_video_path: str,
output_folder: str
) -> str:
"""
将原始视频的第0,1,2...帧映射到新视频的0,5,10...帧,其他帧留黑
参数:
input_video_path: 原始视频路径
output_folder: 输出文件夹路径
返回:
str: 输出视频路径
"""
# 确保输出目录存在
os.makedirs(output_folder, exist_ok=True)
# 构建输出视频路径
output_video_path = os.path.join(output_folder, "mian_vid_ai.mp4")
# 打开原视频
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
raise ValueError(f"无法打开视频: {input_video_path}")
# 获取原视频信息
original_fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 目标fps为25每5帧取一帧0, 5, 10...
target_fps = 25
frame_interval = 5
# 计算输出视频的总帧数
# 确保覆盖所有原始帧且不少于两秒
min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
max_output_frame = max(total_frames * frame_interval, min_frames_for_2s)
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_video_path, fourcc, target_fps, (frame_width, frame_height))
if not out.isOpened():
raise ValueError(f"无法创建视频文件: {output_video_path}")
# 生成视频帧
current_output_frame = 0
original_frame_idx = 0
while current_output_frame < max_output_frame:
# 检查当前输出帧是否是5的倍数
if current_output_frame % frame_interval == 0 and original_frame_idx < total_frames:
# 这是需要放置原始帧的位置
# 设置原视频读取位置
cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_idx)
ret, frame = cap.read()
if not ret:
# 读取失败,使用黑色帧
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
else:
# 有数据,使用原始帧
output_frame = frame
# 移到下一个原始帧
original_frame_idx += 1
else:
# 剩余帧留黑
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 写入帧
out.write(output_frame)
current_output_frame += 1
# 释放资源
cap.release()
out.release()
print(f"已生成视频: {output_video_path}, 共 {current_output_frame}")
return output_video_path