""" 测试分阶段处理逻辑 """ from pathlib import Path import sys # 导入 main.py 中的函数 from main import ( VIDEO_DIR, OUTPUT_DIR, TEMP_DIR, process_batch_diarization, process_batch_asr, get_video_list ) def test_staged_processing(): """测试分阶段处理""" print("=" * 60) print("分阶段处理测试") print("=" * 60) # 获取视频列表(只取前 2 个进行测试) video_paths = get_video_list(VIDEO_DIR) if not video_paths: print("✗ 未找到视频文件") return # 只测试前 2 个 test_videos = video_paths[:2] print(f"测试视频:{len(test_videos)}") for v in test_videos: print(f" - {v.name}") print() # 阶段 1: 说话人分离 print("=" * 60) print("阶段 1: 说话人分离") print("=" * 60) diar_results = process_batch_diarization(test_videos, max_workers=1) print(f"\n阶段 1 结果:{len(diar_results)}/{len(test_videos)} 成功") for video, result_path in diar_results.items(): status = "✓" if result_path else "✗" print(f" {status} {video.name}: {result_path}") print() # 阶段 2: ASR + 合并 print("=" * 60) print("阶段 2: ASR + 合并") print("=" * 60) results = process_batch_asr(test_videos, diar_results, max_workers=1) print(f"\n阶段 2 结果:{len(results)}/{len(test_videos)} 完成") for result in results: status = "✓" if result.get("success") else "✗" print(f" {status} {Path(result['video']).name}") if result.get("speaker_counts"): print(f" 说话人:{result['speaker_counts']}") print() print("=" * 60) print("✓ 测试完成!") print("=" * 60) if __name__ == "__main__": test_staged_processing()