HazardInspector/lib/qwen_fun_vid.py

498 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from pathlib import Path
import numpy as np
import supervision as sv
import cv2
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import cv2
from pathlib import Path
def generate_video_to_objects(
obj_dict: dict[str, dict],
input_video_path: str,
output_dir: str,
) -> None:
"""
根据 obj_dict 中的物体信息,从原视频中截取附近帧,生成新视频
:param obj_dict: 包含物体信息的字典,每个元素结构为:
"0": {
"class_id": 4,
"start_frame": 551,
"end_frame": 597
},
:param input_video_path: 输入视频文件路径
:param output_dir: 输出视频目录
"""
# 最低秒数
min_seconds = 2
# 前后额外帧数
extra_frames = 5
print(f"开始抽取物体视频: {input_video_path}")
print(f"输出目录: {output_dir}")
# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 1. 打开原始视频
backends = [
(cv2.CAP_FFMPEG, 'FFmpeg'),
(cv2.CAP_DSHOW, 'DirectShow'),
(cv2.CAP_ANY, 'Default')
]
cap = None
for backend, backend_name in backends:
try:
cap = cv2.VideoCapture(input_video_path, backend)
if cap.isOpened():
if backend == cv2.CAP_FFMPEG:
try:
cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
hw_accel = cap.get(cv2.CAP_PROP_HW_ACCELERATION)
print(f"FFmpeg硬件加速: {'已启用' if hw_accel > 0 else '未启用'}")
except Exception as e:
print(f"设置硬件加速失败: {e}")
print(f"使用后端: {backend_name}")
break
except Exception as e:
print(f"尝试{backend_name}后端失败: {e}")
continue
if not cap or not cap.isOpened():
raise Exception(f"无法打开视频文件: {input_video_path}")
# 2. 获取视频参数
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
min_frames = int(min_seconds * fps)
fourcc = cv2.VideoWriter_fourcc(*'avc1') # type: ignore
# 3. 预处理所有物体的帧范围 + 初始化写入器
total_objects = len(obj_dict)
print(f"总共需要处理 {total_objects} 个物体")
writers = {} # 存储所有视频写入器
obj_ranges = {} # 存储每个物体的起止帧
for track_id, track_data in obj_dict.items():
start_idx = max(0, track_data["start_frame"] - extra_frames)
end_idx = min(total_frames - 1, track_data["end_frame"] + extra_frames)
# 保证最小长度
if end_idx - start_idx + 1 < min_frames:
end_idx = start_idx + min_frames - 1
output_path = os.path.join(output_dir, f"obj_{int(track_id):03d}.mp4")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
raise Exception(f"无法初始化视频写入器: {output_path}")
writers[track_id] = out
obj_ranges[track_id] = (start_idx, end_idx)
print(f"物体 {track_id}: 帧 {start_idx} ~ {end_idx} -> {output_path}")
# 4. 单轮遍历视频,一次性写入所有需要的帧(最高效)
frame_idx = 0
print_interval = 50
while cap.isOpened():
# 读取一帧
ret, frame = cap.read()
if not ret:
break
# 遍历所有物体,判断当前帧是否需要写入
for track_id, (start, end) in obj_ranges.items():
if start <= frame_idx <= end:
writers[track_id].write(frame)
# 进度打印
if frame_idx % print_interval == 0:
progress = (frame_idx / total_frames) * 100
print(f"处理帧: {frame_idx}/{total_frames} ({progress:.1f}%)")
frame_idx += 1
# 5. 释放所有写入器
for track_id, out in writers.items():
out.release()
start, end = obj_ranges[track_id]
print(f"物体 {track_id} 完成 | 总帧数: {end - start + 1}")
# 6. 释放资源
cap.release()
cv2.destroyAllWindows()
print(f"\n✅ 所有物体视频生成完成!目录: {output_dir}")
def process_track_id(
track_id: int,
frame_list: list[tuple[int, list[int]]],
input_video_path: str,
output_video_root: str,
frame_width: int,
frame_height: int,
target_fps: int,
frame_interval: int
) -> str:
"""
处理单个track_id生成对应的视频
"""
# 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒)
min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
object_based_frames = len(frame_list) * frame_interval
max_output_frame = max(object_based_frames, min_frames_for_2s)
# 创建输出视频路径
output_path = os.path.join(output_video_root, f"{track_id}.mp4")
# 尝试使用GPU硬件编码
try:
# 对于不同平台的GPU编码使用不同的fourcc
# Windows平台使用h264_nvenc或h264_amf
# 如果GPU编码不可用会回退到CPU编码
fourcc = cv2.VideoWriter_fourcc(*'h264') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
# 检查是否成功打开
if not out.isOpened():
# 尝试其他编码方式
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
except Exception:
# 异常时使用CPU编码
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
if not out.isOpened():
print(f"无法创建视频文件: {output_path}")
return f"失败: {output_path}"
# 打开原视频(每个线程独立打开,避免线程安全问题)
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
out.release()
return f"失败: 无法打开视频 {input_video_path}"
# 生成视频帧
current_output_frame = 0
obj_frame_idx = 0
while current_output_frame < max_output_frame:
# 检查当前输出帧是否是5的倍数
if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list):
# 这是需要放置物体帧的位置
original_frame_id, xyxy = frame_list[obj_frame_idx]
# 设置原视频读取位置
cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id)
ret, frame = cap.read()
if not ret:
# 读取失败,使用黑色帧
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
else:
# 有数据,截取对应区域
x1, y1, x2, y2 = map(int, xyxy)
# 确保坐标在有效范围内
x1 = max(0, min(x1, frame_width))
y1 = max(0, min(y1, frame_height))
x2 = max(0, min(x2, frame_width))
y2 = max(0, min(y2, frame_height))
# 创建黑色背景
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 将截取的区域放到输出帧中(保持原位置)
if x2 > x1 and y2 > y1:
cropped = frame[y1:y2, x1:x2]
output_frame[y1:y2, x1:x2] = cropped
# 移到下一个物体帧
obj_frame_idx += 1
else:
# 剩余帧留黑
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 写入帧
out.write(output_frame)
current_output_frame += 1
# 释放资源
out.release()
cap.release()
print(f"已生成视频: {output_path}, 共 {current_output_frame}")
return f"成功: {output_path}"
def frame_all_to_obj_vid(
json_data: dict,
input_video_path: str,
output_video_root: str,
) -> None:
"""
根据标注数据从原视频中截取物体生成ai读取专用视频
参数:
json_data: 标注数据
input_video_path: 原视频路径
output_video_root: 输出视频根目录
"""
# 确保输出目录存在
os.makedirs(output_video_root, exist_ok=True)
# 1. 从 json_data 中提取数据,按 track_id 组织
track_dict: dict[int, list[tuple[int, list[int]]]] = {}
# 遍历每一帧
for frame_id_str, detections in json_data.items():
frame_id = int(frame_id_str)
for det in detections:
track_id = det.get("track_id", -1)
xyxy = det.get("xyxy", [0, 0, 0, 0])
if track_id not in track_dict:
track_dict[track_id] = []
track_dict[track_id].append((frame_id, xyxy))
# 2. 获取视频信息(只需要获取一次)
temp_cap = cv2.VideoCapture(input_video_path)
if not temp_cap.isOpened():
raise ValueError(f"无法打开视频: {input_video_path}")
frame_width = int(temp_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(temp_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
temp_cap.release()
# 目标fps为25每5帧取一帧0, 5, 10...
target_fps = 25
frame_interval = 5 # 每隔5帧取一帧
# 3. 使用多线程并行处理多个track_id
# 根据CPU核心数设置线程池大小
max_workers = min(os.cpu_count() or 4, len(track_dict))
print(f"使用 {max_workers} 个线程并行处理")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_track = {
executor.submit(
process_track_id,
track_id,
frame_list,
input_video_path,
output_video_root,
frame_width,
frame_height,
target_fps,
frame_interval
):
track_id for track_id, frame_list in track_dict.items()
}
# 等待所有任务完成
for future in as_completed(future_to_track):
track_id = future_to_track[future]
try:
result = future.result()
print(f"Track ID {track_id}: {result}")
except Exception as e:
print(f"Track ID {track_id} 处理失败: {str(e)}")
# def frame_all_to_obj_vid(
# json_data: dict,
# input_video_path: str,
# output_video_root: str,
# ) -> None:
# """
# 根据标注数据从原视频中截取物体生成ai读取专用视频
# 参数:
# json_data: 标注数据
# input_video_path: 原视频路径
# output_video_root: 输出视频根目录
# """
# # 确保输出目录存在
# os.makedirs(output_video_root, exist_ok=True)
# # 1. 从 json_data 中提取数据,按 track_id 组织
# track_dict: dict[int, list[tuple[int, list[int]]]] = {}
# # 遍历每一帧
# for frame_id_str, detections in json_data.items():
# frame_id = int(frame_id_str)
# for det in detections:
# track_id = det.get("track_id", -1)
# xyxy = det.get("xyxy", [0, 0, 0, 0])
# if track_id not in track_dict:
# track_dict[track_id] = []
# track_dict[track_id].append((frame_id, xyxy))
# # 2. 打开原视频
# cap = cv2.VideoCapture(input_video_path)
# if not cap.isOpened():
# raise ValueError(f"无法打开视频: {input_video_path}")
# # 获取原视频信息
# original_fps = cap.get(cv2.CAP_PROP_FPS)
# total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# # 目标fps为25每5帧取一帧0, 5, 10...
# target_fps = 25
# frame_interval = 5 # 每隔5帧取一帧
# # 为每个 track_id 生成视频
# for track_id, frame_list in track_dict.items():
# # 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒)
# min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
# object_based_frames = len(frame_list) * frame_interval
# max_output_frame = max(object_based_frames, min_frames_for_2s)
# # 创建输出视频路径
# output_path = os.path.join(output_video_root, f"{track_id}.mp4")
# # 创建视频写入器
# fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
# out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))
# if not out.isOpened():
# print(f"无法创建视频文件: {output_path}")
# continue
# # 生成视频帧
# current_output_frame = 0
# obj_frame_idx = 0
# while current_output_frame < max_output_frame:
# # 检查当前输出帧是否是5的倍数
# if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list):
# # 这是需要放置物体帧的位置
# original_frame_id, xyxy = frame_list[obj_frame_idx]
# # 设置原视频读取位置
# cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id)
# ret, frame = cap.read()
# if not ret:
# # 读取失败,使用黑色帧
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# else:
# # 有数据,截取对应区域
# x1, y1, x2, y2 = map(int, xyxy)
# # 确保坐标在有效范围内
# x1 = max(0, min(x1, frame_width))
# y1 = max(0, min(y1, frame_height))
# x2 = max(0, min(x2, frame_width))
# y2 = max(0, min(y2, frame_height))
# # 创建黑色背景
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# # 将截取的区域放到输出帧中(保持原位置)
# if x2 > x1 and y2 > y1:
# cropped = frame[y1:y2, x1:x2]
# output_frame[y1:y2, x1:x2] = cropped
# # 移到下一个物体帧
# obj_frame_idx += 1
# else:
# # 剩余帧留黑
# output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# # 写入帧
# out.write(output_frame)
# current_output_frame += 1
# # 释放视频写入器
# out.release()
# print(f"已生成视频: {output_path}, 共 {current_output_frame} 帧")
# # 释放原视频
# cap.release()
def create_mian_vid_for_ai(
input_video_path: str,
output_folder: str
) -> str:
"""
将原始视频的第0,1,2...帧映射到新视频的0,5,10...帧,其他帧留黑
参数:
input_video_path: 原始视频路径
output_folder: 输出文件夹路径
返回:
str: 输出视频路径
"""
# 确保输出目录存在
os.makedirs(output_folder, exist_ok=True)
# 构建输出视频路径
output_video_path = os.path.join(output_folder, "mian_vid_ai.mp4")
# 打开原视频
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
raise ValueError(f"无法打开视频: {input_video_path}")
# 获取原视频信息
original_fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 目标fps为25每5帧取一帧0, 5, 10...
target_fps = 25
frame_interval = 5
# 计算输出视频的总帧数
# 确保覆盖所有原始帧且不少于两秒
min_frames_for_2s = 25 * 2 # 2秒 @ 25fps
max_output_frame = max(total_frames * frame_interval, min_frames_for_2s)
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore
out = cv2.VideoWriter(output_video_path, fourcc, target_fps, (frame_width, frame_height))
if not out.isOpened():
raise ValueError(f"无法创建视频文件: {output_video_path}")
# 生成视频帧
current_output_frame = 0
original_frame_idx = 0
while current_output_frame < max_output_frame:
# 检查当前输出帧是否是5的倍数
if current_output_frame % frame_interval == 0 and original_frame_idx < total_frames:
# 这是需要放置原始帧的位置
# 设置原视频读取位置
cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_idx)
ret, frame = cap.read()
if not ret:
# 读取失败,使用黑色帧
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
else:
# 有数据,使用原始帧
output_frame = frame
# 移到下一个原始帧
original_frame_idx += 1
else:
# 剩余帧留黑
output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# 写入帧
out.write(output_frame)
current_output_frame += 1
# 释放资源
cap.release()
out.release()
print(f"已生成视频: {output_video_path}, 共 {current_output_frame}")
return output_video_path