import os from pathlib import Path import numpy as np import supervision as sv import cv2 from concurrent.futures import ThreadPoolExecutor, as_completed import os import cv2 from pathlib import Path def generate_video_to_objects( obj_dict: dict[dict], input_video_path: str, output_dir: str, ) -> None: """ 根据 obj_dict 中的物体信息,从原视频中截取附近帧,生成新视频 :param obj_dict: 包含物体信息的字典,每个元素结构为: "0": { "class_id": 4, "start_frame": 551, "end_frame": 597 }, :param input_video_path: 输入视频文件路径 :param output_dir: 输出视频目录 """ # 最低秒数 min_seconds = 2 # 前后额外帧数 extra_frames = 5 print(f"开始抽取物体视频: {input_video_path}") print(f"输出目录: {output_dir}") # 确保输出目录存在 Path(output_dir).mkdir(parents=True, exist_ok=True) # 1. 打开原始视频 backends = [ (cv2.CAP_FFMPEG, 'FFmpeg'), (cv2.CAP_DSHOW, 'DirectShow'), (cv2.CAP_ANY, 'Default') ] cap = None for backend, backend_name in backends: try: cap = cv2.VideoCapture(input_video_path, backend) if cap.isOpened(): if backend == cv2.CAP_FFMPEG: try: cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY) hw_accel = cap.get(cv2.CAP_PROP_HW_ACCELERATION) print(f"FFmpeg硬件加速: {'已启用' if hw_accel > 0 else '未启用'}") except Exception as e: print(f"设置硬件加速失败: {e}") print(f"使用后端: {backend_name}") break except Exception as e: print(f"尝试{backend_name}后端失败: {e}") continue if not cap or not cap.isOpened(): raise Exception(f"无法打开视频文件: {input_video_path}") # 2. 获取视频参数 fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) min_frames = int(min_seconds * fps) fourcc = cv2.VideoWriter_fourcc(*'avc1') # 3. 预处理所有物体的帧范围 + 初始化写入器 total_objects = len(obj_dict) print(f"总共需要处理 {total_objects} 个物体") writers = {} # 存储所有视频写入器 obj_ranges = {} # 存储每个物体的起止帧 for track_id, track_data in obj_dict.items(): start_idx = max(0, track_data["start_frame"] - extra_frames) end_idx = min(total_frames - 1, track_data["end_frame"] + extra_frames) # 保证最小长度 if end_idx - start_idx + 1 < min_frames: end_idx = start_idx + min_frames - 1 output_path = os.path.join(output_dir, f"obj_{int(track_id):03d}.mp4") out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if not out.isOpened(): raise Exception(f"无法初始化视频写入器: {output_path}") writers[track_id] = out obj_ranges[track_id] = (start_idx, end_idx) print(f"物体 {track_id}: 帧 {start_idx} ~ {end_idx} -> {output_path}") # 4. 单轮遍历视频,一次性写入所有需要的帧(最高效) frame_idx = 0 print_interval = 50 while cap.isOpened(): # 读取一帧 ret, frame = cap.read() if not ret: break # 遍历所有物体,判断当前帧是否需要写入 for track_id, (start, end) in obj_ranges.items(): if start <= frame_idx <= end: writers[track_id].write(frame) # 进度打印 if frame_idx % print_interval == 0: progress = (frame_idx / total_frames) * 100 print(f"处理帧: {frame_idx}/{total_frames} ({progress:.1f}%)") frame_idx += 1 # 5. 释放所有写入器 for track_id, out in writers.items(): out.release() start, end = obj_ranges[track_id] print(f"物体 {track_id} 完成 | 总帧数: {end - start + 1}") # 6. 释放资源 cap.release() cv2.destroyAllWindows() print(f"\n✅ 所有物体视频生成完成!目录: {output_dir}") def process_track_id( track_id: int, frame_list: list[tuple[int, list[int]]], input_video_path: str, output_video_root: str, frame_width: int, frame_height: int, target_fps: int, frame_interval: int ) -> str: """ 处理单个track_id,生成对应的视频 """ # 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒) min_frames_for_2s = 25 * 2 # 2秒 @ 25fps object_based_frames = len(frame_list) * frame_interval max_output_frame = max(object_based_frames, min_frames_for_2s) # 创建输出视频路径 output_path = os.path.join(output_video_root, f"{track_id}.mp4") # 尝试使用GPU硬件编码 try: # 对于不同平台的GPU编码,使用不同的fourcc # Windows平台使用h264_nvenc或h264_amf # 如果GPU编码不可用,会回退到CPU编码 fourcc = cv2.VideoWriter_fourcc(*'h264') # type: ignore out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height)) # 检查是否成功打开 if not out.isOpened(): # 尝试其他编码方式 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height)) except Exception: # 异常时使用CPU编码 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height)) if not out.isOpened(): print(f"无法创建视频文件: {output_path}") return f"失败: {output_path}" # 打开原视频(每个线程独立打开,避免线程安全问题) cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): out.release() return f"失败: 无法打开视频 {input_video_path}" # 生成视频帧 current_output_frame = 0 obj_frame_idx = 0 while current_output_frame < max_output_frame: # 检查当前输出帧是否是5的倍数 if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list): # 这是需要放置物体帧的位置 original_frame_id, xyxy = frame_list[obj_frame_idx] # 设置原视频读取位置 cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id) ret, frame = cap.read() if not ret: # 读取失败,使用黑色帧 output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) else: # 有数据,截取对应区域 x1, y1, x2, y2 = map(int, xyxy) # 确保坐标在有效范围内 x1 = max(0, min(x1, frame_width)) y1 = max(0, min(y1, frame_height)) x2 = max(0, min(x2, frame_width)) y2 = max(0, min(y2, frame_height)) # 创建黑色背景 output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # 将截取的区域放到输出帧中(保持原位置) if x2 > x1 and y2 > y1: cropped = frame[y1:y2, x1:x2] output_frame[y1:y2, x1:x2] = cropped # 移到下一个物体帧 obj_frame_idx += 1 else: # 剩余帧留黑 output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # 写入帧 out.write(output_frame) current_output_frame += 1 # 释放资源 out.release() cap.release() print(f"已生成视频: {output_path}, 共 {current_output_frame} 帧") return f"成功: {output_path}" def frame_all_to_obj_vid( json_data: dict, input_video_path: str, output_video_root: str, ) -> None: """ 根据标注数据从原视频中截取物体,生成ai读取专用视频 参数: json_data: 标注数据 input_video_path: 原视频路径 output_video_root: 输出视频根目录 """ # 确保输出目录存在 os.makedirs(output_video_root, exist_ok=True) # 1. 从 json_data 中提取数据,按 track_id 组织 track_dict: dict[int, list[tuple[int, list[int]]]] = {} # 遍历每一帧 for frame_id_str, detections in json_data.items(): frame_id = int(frame_id_str) for det in detections: track_id = det.get("track_id", -1) xyxy = det.get("xyxy", [0, 0, 0, 0]) if track_id not in track_dict: track_dict[track_id] = [] track_dict[track_id].append((frame_id, xyxy)) # 2. 获取视频信息(只需要获取一次) temp_cap = cv2.VideoCapture(input_video_path) if not temp_cap.isOpened(): raise ValueError(f"无法打开视频: {input_video_path}") frame_width = int(temp_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(temp_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) temp_cap.release() # 目标fps为25,每5帧取一帧(0, 5, 10...) target_fps = 25 frame_interval = 5 # 每隔5帧取一帧 # 3. 使用多线程并行处理多个track_id # 根据CPU核心数设置线程池大小 max_workers = min(os.cpu_count() or 4, len(track_dict)) print(f"使用 {max_workers} 个线程并行处理") with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_track = { executor.submit( process_track_id, track_id, frame_list, input_video_path, output_video_root, frame_width, frame_height, target_fps, frame_interval ): track_id for track_id, frame_list in track_dict.items() } # 等待所有任务完成 for future in as_completed(future_to_track): track_id = future_to_track[future] try: result = future.result() print(f"Track ID {track_id}: {result}") except Exception as e: print(f"Track ID {track_id} 处理失败: {str(e)}") # def frame_all_to_obj_vid( # json_data: dict, # input_video_path: str, # output_video_root: str, # ) -> None: # """ # 根据标注数据从原视频中截取物体,生成ai读取专用视频 # 参数: # json_data: 标注数据 # input_video_path: 原视频路径 # output_video_root: 输出视频根目录 # """ # # 确保输出目录存在 # os.makedirs(output_video_root, exist_ok=True) # # 1. 从 json_data 中提取数据,按 track_id 组织 # track_dict: dict[int, list[tuple[int, list[int]]]] = {} # # 遍历每一帧 # for frame_id_str, detections in json_data.items(): # frame_id = int(frame_id_str) # for det in detections: # track_id = det.get("track_id", -1) # xyxy = det.get("xyxy", [0, 0, 0, 0]) # if track_id not in track_dict: # track_dict[track_id] = [] # track_dict[track_id].append((frame_id, xyxy)) # # 2. 打开原视频 # cap = cv2.VideoCapture(input_video_path) # if not cap.isOpened(): # raise ValueError(f"无法打开视频: {input_video_path}") # # 获取原视频信息 # original_fps = cap.get(cv2.CAP_PROP_FPS) # total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # # 目标fps为25,每5帧取一帧(0, 5, 10...) # target_fps = 25 # frame_interval = 5 # 每隔5帧取一帧 # # 为每个 track_id 生成视频 # for track_id, frame_list in track_dict.items(): # # 计算需要生成的总帧数(确保覆盖所有物体帧且不少于两秒) # min_frames_for_2s = 25 * 2 # 2秒 @ 25fps # object_based_frames = len(frame_list) * frame_interval # max_output_frame = max(object_based_frames, min_frames_for_2s) # # 创建输出视频路径 # output_path = os.path.join(output_video_root, f"{track_id}.mp4") # # 创建视频写入器 # fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore # out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height)) # if not out.isOpened(): # print(f"无法创建视频文件: {output_path}") # continue # # 生成视频帧 # current_output_frame = 0 # obj_frame_idx = 0 # while current_output_frame < max_output_frame: # # 检查当前输出帧是否是5的倍数 # if current_output_frame % frame_interval == 0 and obj_frame_idx < len(frame_list): # # 这是需要放置物体帧的位置 # original_frame_id, xyxy = frame_list[obj_frame_idx] # # 设置原视频读取位置 # cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_id) # ret, frame = cap.read() # if not ret: # # 读取失败,使用黑色帧 # output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # else: # # 有数据,截取对应区域 # x1, y1, x2, y2 = map(int, xyxy) # # 确保坐标在有效范围内 # x1 = max(0, min(x1, frame_width)) # y1 = max(0, min(y1, frame_height)) # x2 = max(0, min(x2, frame_width)) # y2 = max(0, min(y2, frame_height)) # # 创建黑色背景 # output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # # 将截取的区域放到输出帧中(保持原位置) # if x2 > x1 and y2 > y1: # cropped = frame[y1:y2, x1:x2] # output_frame[y1:y2, x1:x2] = cropped # # 移到下一个物体帧 # obj_frame_idx += 1 # else: # # 剩余帧留黑 # output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # # 写入帧 # out.write(output_frame) # current_output_frame += 1 # # 释放视频写入器 # out.release() # print(f"已生成视频: {output_path}, 共 {current_output_frame} 帧") # # 释放原视频 # cap.release() def create_mian_vid_for_ai( input_video_path: str, output_folder: str ) -> str: """ 将原始视频的第0,1,2...帧映射到新视频的0,5,10...帧,其他帧留黑 参数: input_video_path: 原始视频路径 output_folder: 输出文件夹路径 返回: str: 输出视频路径 """ # 确保输出目录存在 os.makedirs(output_folder, exist_ok=True) # 构建输出视频路径 output_video_path = os.path.join(output_folder, "mian_vid_ai.mp4") # 打开原视频 cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): raise ValueError(f"无法打开视频: {input_video_path}") # 获取原视频信息 original_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 目标fps为25,每5帧取一帧(0, 5, 10...) target_fps = 25 frame_interval = 5 # 计算输出视频的总帧数 # 确保覆盖所有原始帧且不少于两秒 min_frames_for_2s = 25 * 2 # 2秒 @ 25fps max_output_frame = max(total_frames * frame_interval, min_frames_for_2s) # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # type: ignore out = cv2.VideoWriter(output_video_path, fourcc, target_fps, (frame_width, frame_height)) if not out.isOpened(): raise ValueError(f"无法创建视频文件: {output_video_path}") # 生成视频帧 current_output_frame = 0 original_frame_idx = 0 while current_output_frame < max_output_frame: # 检查当前输出帧是否是5的倍数 if current_output_frame % frame_interval == 0 and original_frame_idx < total_frames: # 这是需要放置原始帧的位置 # 设置原视频读取位置 cap.set(cv2.CAP_PROP_POS_FRAMES, original_frame_idx) ret, frame = cap.read() if not ret: # 读取失败,使用黑色帧 output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) else: # 有数据,使用原始帧 output_frame = frame # 移到下一个原始帧 original_frame_idx += 1 else: # 剩余帧留黑 output_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # 写入帧 out.write(output_frame) current_output_frame += 1 # 释放资源 cap.release() out.release() print(f"已生成视频: {output_video_path}, 共 {current_output_frame} 帧") return output_video_path