SpeechRecognition/test_staged.py

70 lines
1.8 KiB
Python

"""
测试分阶段处理逻辑
"""
from pathlib import Path
import sys
# 导入 main.py 中的函数
from main import (
VIDEO_DIR,
OUTPUT_DIR,
TEMP_DIR,
process_batch_diarization,
process_batch_asr,
get_video_list
)
def test_staged_processing():
"""测试分阶段处理"""
print("=" * 60)
print("分阶段处理测试")
print("=" * 60)
# 获取视频列表(只取前 2 个进行测试)
video_paths = get_video_list(VIDEO_DIR)
if not video_paths:
print("✗ 未找到视频文件")
return
# 只测试前 2 个
test_videos = video_paths[:2]
print(f"测试视频:{len(test_videos)}")
for v in test_videos:
print(f" - {v.name}")
print()
# 阶段 1: 说话人分离
print("=" * 60)
print("阶段 1: 说话人分离")
print("=" * 60)
diar_results = process_batch_diarization(test_videos, max_workers=1)
print(f"\n阶段 1 结果:{len(diar_results)}/{len(test_videos)} 成功")
for video, result_path in diar_results.items():
status = "" if result_path else ""
print(f" {status} {video.name}: {result_path}")
print()
# 阶段 2: ASR + 合并
print("=" * 60)
print("阶段 2: ASR + 合并")
print("=" * 60)
results = process_batch_asr(test_videos, diar_results, max_workers=1)
print(f"\n阶段 2 结果:{len(results)}/{len(test_videos)} 完成")
for result in results:
status = "" if result.get("success") else ""
print(f" {status} {Path(result['video']).name}")
if result.get("speaker_counts"):
print(f" 说话人:{result['speaker_counts']}")
print()
print("=" * 60)
print("✓ 测试完成!")
print("=" * 60)
if __name__ == "__main__":
test_staged_processing()