diff --git a/.coverage b/.coverage index dce66a0..3beff83 100644 Binary files a/.coverage and b/.coverage differ diff --git a/app/asr/map_speaker.py b/app/asr/map_speaker.py index e916a6f..d6ff1a4 100644 --- a/app/asr/map_speaker.py +++ b/app/asr/map_speaker.py @@ -25,31 +25,4 @@ def find_speaker(begin_time, end_time, diarization_segments): max_overlap = overlap_duration best_speaker = seg['speaker'] - return best_speaker - -def main(): - diarization = load_json(r'd:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio\result.json') - transcription = load_json(r'd:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio\output\VID_20251031_132320_019_mono_result.json') - - diarization_segments = diarization['segments'] - - for sentence in transcription['sentences']: - begin_time = sentence['begin_time'] - end_time = sentence['end_time'] - - new_speaker = find_speaker(begin_time, end_time, diarization_segments) - sentence['speaker'] = new_speaker - - save_json(r'd:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio\output\VID_20251031_132320_019_mono_result.json', transcription) - - speaker_counts = {} - for sentence in transcription['sentences']: - speaker = sentence['speaker'] - speaker_counts[speaker] = speaker_counts.get(speaker, 0) + 1 - - print("说话人统计:") - for speaker, count in sorted(speaker_counts.items()): - print(f" {speaker}: {count} 句") - -if __name__ == '__main__': - main() + return best_speaker \ No newline at end of file diff --git a/app/asr/routes.py b/app/asr/routes.py index 66885f1..d8c670a 100644 --- a/app/asr/routes.py +++ b/app/asr/routes.py @@ -45,7 +45,13 @@ def register_asr_routes(app): try: from app.asr.core import main - main(path) + result = main(path) + + # 检查 main() 的返回值,如果是错误信息,返回 400 + if result and isinstance(result, str): + task_running[task_id] = False + return jsonify(make_response(status="error", message=result)), 400 + finally: if use_alarm: signal.alarm(0) # pyright: ignore diff --git a/server.py b/main.py similarity index 100% rename from server.py rename to main.py diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..498a765 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,19 @@ +[pytest] +testpaths = test +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = -v --tb=short --strict-markers +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + integration: marks tests as integration tests + unit: marks tests as unit tests + real: marks tests as real end-to-end tests (requires real files) + performance: marks tests as performance tests + requires_ffmpeg: marks tests that require ffmpeg + requires_gpu: marks tests that require GPU + requires_model: marks tests that require ASR model loaded +filterwarnings = + ignore::DeprecationWarning + ignore::PendingDeprecationWarning + ignore::UserWarning diff --git a/run_tests.py b/run_tests.py new file mode 100644 index 0000000..da0fec1 --- /dev/null +++ b/run_tests.py @@ -0,0 +1,44 @@ +""" +测试运行脚本 +运行所有测试 +""" +import subprocess +import sys +from pathlib import Path + +def run_tests(): + """运行所有测试""" + project_root = Path(__file__).parent.absolute() + + # 构建 pytest 命令 + cmd = [ + sys.executable, + '-m', + 'pytest', + str(project_root / 'test'), + '-v', + '--tb=short', + '-ra' + ] + + print("=" * 60) + print("开始运行测试...") + print("=" * 60) + print(f"测试目录:{project_root / 'test'}") + print("=" * 60) + + # 运行测试 + result = subprocess.run(cmd, cwd=project_root) + + print("=" * 60) + if result.returncode == 0: + print("所有测试通过!") + else: + print(f"测试失败,退出码:{result.returncode}") + print("=" * 60) + + return result.returncode + + +if __name__ == '__main__': + sys.exit(run_tests()) diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..049c09d --- /dev/null +++ b/test/README.md @@ -0,0 +1,194 @@ +# 项目测试套件文档 + +## 测试目录结构 + +``` +test/ +├── conftest.py # Pytest 配置和全局 fixtures +├── test_main.py # 测试应用创建和主入口 +├── test_asr_routes.py # 测试 ASR 语音识别路由 +├── test_transcode_routes.py # 测试转码路由 +├── test_utils.py # 测试工具函数和配置 +├── test_core.py # 测试核心功能模块 +└── test_integration.py # 集成测试 +``` + +## 依赖 + +### 测试 覆盖测试 +pip install pytest pytest-cov pyyaml pytest-timeout + +## 运行测试 + +### 方式 1:使用 pytest 命令 +```bash +# 运行所有测试 +pytest test/ -v + +# 运行特定测试文件 +pytest test/test_main.py -v + +# 运行特定测试类 +pytest test/test_main.py::TestAppCreation -v + +# 运行特定测试函数 +pytest test/test_main.py::TestAppCreation::test_create_app_returns_flask_app -v + +# 运行带标记的测试 +pytest -m "not slow" -v +``` + +### 方式 2:使用运行脚本 +```bash +python run_tests.py +``` + +## 测试覆盖范围 + +### 1. 应用创建测试 (test_main.py) +- ✅ Flask 应用创建 +- ✅ 配置加载 +- ✅ 路由注册 +- ✅ CORS 配置 +- ✅ 根路由响应 + +### 2. ASR 路由测试 (test_asr_routes.py) +- ✅ 语音识别接口 (`/api/recognize`) + - 参数验证 + - 错误处理 + - 响应格式 +- ✅ 结果获取接口 (`/api/result`) + - 参数验证 + - 文件存在性检查 + - 结果返回 +- ✅ 全局状态管理 +- ✅ ASR 服务类 +- ✅ 说话人分离服务类 + +### 3. 转码路由测试 (test_transcode_routes.py) +- ✅ 视频转码接口 (`/api/convert`) + - 参数验证 + - 文件存在性检查 + - 错误处理 +- ✅ 视频 URL 获取接口 (`/api/getVidUrl`) + - 参数验证 + - URL 生成 +- ✅ 支持的视频格式 +- ✅ 输出目录管理 + +### 4. 工具函数测试 (test_utils.py) +- ✅ `make_response` 函数 + - 默认响应 + - 自定义状态 + - 数据封装 + - 错误处理 + - 时间戳格式 +- ✅ CORS 注册 +- ✅ 配置验证 + +### 5. 核心功能测试 (test_core.py) +- ✅ 视频列表获取 +- ✅ 临时目录管理 +- ✅ WAV 音频提取 +- ✅ ASR 服务类 +- ✅ 说话人分离服务类 +- ✅ 转码核心函数 +- ✅ Caddy 运行 + +### 6. 集成测试 (test_integration.py) +- ✅ 完整应用启动 +- ✅ 所有路由响应 +- ✅ 错误处理一致性 +- ✅ 模块隔离 +- ✅ 配置隔离 +- ✅ 并发请求 +- ✅ 全局状态 +- ✅ 路径处理 +- ✅ 资源清理 + +## Fixtures + +### 全局 Fixtures (conftest.py) + +- `app`: Flask 应用实例 +- `client`: 测试客户端 +- `temp_dirs`: 临时目录集合 + - `root`: 根目录 + - `input`: 输入目录 + - `output`: 输出目录 + - `temp`: 临时目录 +- `clean_temp`: 清理临时目录 +- `sample_audio_file`: 示例音频文件路径 +- `sample_video_file`: 示例视频文件路径 + +## 测试标记 + +- `slow`: 标记为慢速测试 +- `integration`: 标记为集成测试 +- `unit`: 标记为单元测试 + +使用示例: +```bash +# 跳过慢速测试 +pytest -m "not slow" -v + +# 只运行集成测试 +pytest -m integration -v + +# 只运行单元测试 +pytest -m unit -v +``` + +## 测试最佳实践 + +1. **测试隔离**: 每个测试应该是独立的,不依赖其他测试的状态 +2. **使用 fixtures**: 优先使用提供的 fixtures 管理资源 +3. **清理资源**: 测试完成后自动清理临时文件 +4. **断言明确**: 每个测试应该有明确的断言 +5. **错误场景**: 测试正常路径和错误路径 + +## 配置说明 + +### pytest.ini +- 测试路径:`test/` +- 测试文件模式:`test_*.py` +- 测试类模式:`Test*` +- 测试函数模式:`test_*` +- 详细输出:`-v` +- 严格标记:`--strict-markers` + +## 依赖要求 + +确保已安装以下依赖: +```bash +pip install pytest +pip install flask +pip install waitress +``` + +其他项目依赖见 `requirements.txt` + +## 注意事项 + +1. 部分测试可能需要实际的模型文件和服务依赖 +2. 转码测试需要 ffmpeg 支持 +3. GPU 相关测试需要 CUDA 环境 +4. 集成测试可能需要网络连接 + +## 故障排除 + +### 测试失败 +1. 检查依赖是否完整安装 +2. 检查文件路径是否正确 +3. 检查权限问题 +4. 查看详细错误输出 + +### 测试缓慢 +1. 使用 `-m "not slow"` 跳过慢速测试 +2. 减少测试数据量 +3. 使用并行测试(pytest-xdist) + +### 导入错误 +1. 确保在项目根目录运行 +2. 检查 PYTHONPATH 设置 +3. 确保 conftest.py 正确配置 diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..6981f39 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,76 @@ +""" +Pytest 配置文件 +提供全局的 fixture 和测试工具 +""" +import os +import sys +import pytest +from pathlib import Path +import tempfile +import shutil + +# 添加项目根目录到 Python 路径 +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app +from app.settings import config + + +@pytest.fixture(scope='session') +def app(): + """创建 Flask 应用实例""" + app = create_app() + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = tempfile.mkdtemp() + app.config['INPUT_DIR'] = tempfile.mkdtemp() + yield app + + +@pytest.fixture(scope='function') +def client(app): + """创建测试客户端""" + with app.test_client() as client: + yield client + + +@pytest.fixture(scope='session') +def temp_dirs(): + """创建临时目录用于测试""" + temp_dir = tempfile.mkdtemp() + dirs = { + 'root': Path(temp_dir), + 'input': Path(temp_dir) / 'input', + 'output': Path(temp_dir) / 'output', + 'temp': Path(temp_dir) / 'temp' + } + for d in dirs.values(): + d.mkdir(parents=True, exist_ok=True) + yield dirs + shutil.rmtree(temp_dir) + + +@pytest.fixture(scope='function') +def clean_temp(temp_dirs): + """清理临时目录""" + yield temp_dirs + for d in temp_dirs.values(): + if d.exists(): + for f in d.glob('*'): + if f.is_file(): + f.unlink() + + +@pytest.fixture +def sample_audio_file(temp_dirs): + """创建示例音频文件路径(不实际创建文件)""" + audio_path = temp_dirs['input'] / 'test_audio.wav' + yield audio_path + + +@pytest.fixture +def sample_video_file(temp_dirs): + """创建示例视频文件路径(不实际创建文件)""" + video_path = temp_dirs['input'] / 'test_video.mp4' + yield video_path diff --git a/test/real/README.md b/test/real/README.md new file mode 100644 index 0000000..d27be38 --- /dev/null +++ b/test/real/README.md @@ -0,0 +1,270 @@ +# 端到端真实测试套件 + +本目录包含完整的端到端真实测试,使用真实的视频/音频文件测试整个系统。 + +--- + +## 📁 文件结构 + +``` +test/real/ +├── README.md # 本文件(使用说明) +├── README_故障排查.md # 故障排查指南 +├── test_config.yaml # 测试配置文件 +├── test_requirements.txt # 额外依赖 +├── test_real_e2e.py # 端到端测试主文件 +└── check_e2e.py # 环境诊断脚本 +``` + +--- + +## 🚀 快速开始 + +### 1. 安装额外依赖 +```bash +pip install pyyaml pytest-timeout +``` + +### 2. 检查测试环境 +```bash +python test/real/check_e2e.py +``` + +### 3. 预下载模型(重要!) +```bash +# 首次运行前,手动加载模型避免测试卡住 +python -c "from app.asr.asr_service import ASRService; ASRService()" +``` + +### 4. 运行测试 +```bash +# 运行所有端到端测试 +pytest test/real/ -v + +# 只运行快速验证测试(不测试 ASR) +pytest test/real/test_real_e2e.py::TestVideoFileValidation -v +pytest test/real/test_real_e2e.py::TestErrorHandling -v + +# 运行单个 ASR 测试(带详细输出) +pytest test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file -v -s +``` + +--- + +## 📋 测试覆盖 + +### 测试类别(共 8 个测试类) + +| 测试类 | 测试内容 | 预计时间 | +|--------|---------|---------| +| `TestVideoFileValidation` | 视频文件验证 | <1 秒 | +| `TestAudioFileValidation` | 音频文件验证 | <1 秒 | +| `TestASRRecognition` | ASR 识别流程 | 30 秒 -5 分钟 | +| `TestSpeakerDiarization` | 说话人分离 | 30 秒 -5 分钟 | +| `TestVideoTranscoding` | 视频转码 | 1-3 分钟 | +| `TestPerformance` | 性能测试 | 1-5 分钟 | +| `TestErrorHandling` | 错误处理 | <1 秒 | +| `TestAPIResponseFormat` | API 响应格式 | <1 秒 | + +### 测试用例(共 20+ 个) + +**文件验证**: +- ✅ 视频文件存在性 +- ✅ 文件大小限制 +- ✅ 文件格式支持 +- ✅ 音频文件存在性 + +**ASR 识别**: +- ✅ 音频文件识别 +- ✅ 视频文件识别 +- ✅ 结果获取 +- ✅ 响应格式验证 + +**说话人分离**: +- ✅ 说话人数量验证 +- ✅ 时间戳精度检查 + +**视频转码**: +- ✅ 视频转码功能 +- ✅ 转码后 URL 获取 + +**性能测试**: +- ✅ ASR 处理时间 +- ✅ 实时处理率(RTF) + +**错误处理**: +- ✅ 文件不存在错误 +- ✅ 缺少参数错误 +- ✅ 转码错误处理 + +--- + +## ⚙️ 配置说明 + +### test_config.yaml + +```yaml +test_files: + primary_video: "input/VID_20251104_085655_024.AVI" # 主测试文件 + backup_videos: # 备用文件 + - "input/Miehhuoxqih.AVI" + - "input/VID_20251104_090655_025.AVI" + audio_files: # 音频测试文件 + - "input/VID_20251031_132320_019_mono.wav" + +timeouts: + asr_recognize: 600 # ASR 识别超时(秒) + transcode: 300 # 转码超时 + +performance: + max_asr_time: 120 # 最大 ASR 处理时间 + min_processing_speed: 0.5 # 最小处理速度(倍) +``` + +--- + +## 🏷️ 测试标记(Markers) + +测试使用了 pytest markers 分类: + +- `@pytest.mark.real` - 端到端真实测试 +- `@pytest.mark.slow` - 慢速测试(>10 秒) +- `@pytest.mark.performance` - 性能测试 +- `@pytest.mark.timeout(N)` - N 秒超时 +- `@pytest.mark.requires_ffmpeg` - 需要 ffmpeg +- `@pytest.mark.requires_gpu` - 需要 GPU + +### 运行特定测试 + +```bash +# 只运行快速测试 +pytest test/real/ -v -m "not slow" + +# 只运行性能测试 +pytest test/real/ -v -m "performance" + +# 跳过需要 GPU 的测试 +pytest test/real/ -v -m "not requires_gpu" + +# 只运行 ASR 识别测试 +pytest test/real/test_real_e2e.py::TestASRRecognition -v +``` + +--- + +## ⚠️ 常见问题 + +### 测试卡住怎么办? + +**可能原因**: +1. 首次下载模型(10-30 分钟) +2. 模型加载缓慢(30 秒 -2 分钟) +3. 文件过大处理时间长 + +**解决方案**: +1. 预下载模型:`python -c "from app.asr.asr_service import ASRService; ASRService()"` +2. 使用更小的测试文件 +3. 安装 `pytest-timeout` 设置超时 + +详见:[README_故障排查.md](README_故障排查.md) + +### CPU 占用低但测试无响应? + +这是正常的,可能原因: +- 等待模型下载(网络 IO) +- 等待模型加载(内存 IO) +- 音频/视频处理中 + +**建议**:首次运行耐心等待 10-30 分钟 + +### 如何跳过慢速测试? + +```bash +pytest test/real/ -v -m "not slow" +``` + +--- + +## 📊 预期性能 + +### 处理时间参考(CPU) + +| 文件类型 | 大小 | 处理时间 | +|---------|------|---------| +| 短音频 | 5MB | 30-60 秒 | +| 长音频 | 20MB | 2-5 分钟 | +| 短视频 | 50MB | 1-3 分钟 | +| 长视频 | 200MB | 5-15 分钟 | + +### 处理时间参考(GPU) + +| 文件类型 | 大小 | 处理时间 | +|---------|------|---------| +| 短音频 | 5MB | 10-20 秒 | +| 长音频 | 20MB | 30-60 秒 | +| 短视频 | 50MB | 30-60 秒 | +| 长视频 | 200MB | 2-5 分钟 | + +--- + +## 🔧 依赖要求 + +### 必需依赖 +```bash +pip install pyyaml>=6.0 +``` + +### 可选依赖(推荐) +```bash +# 测试超时控制 +pip install pytest-timeout>=2.2.0 + +# 性能测试 +pip install pytest-benchmark>=4.0.0 + +# 代码覆盖率 +pip install pytest-cov>=4.1.0 +``` + +### 完整依赖列表 +见:[test_requirements.txt](test_requirements.txt) + +--- + +## 📝 测试输出示例 + +``` +============================================================ test session starts ============================================================ +platform win32 -- Python 3.10.0, pytest-7.4.0, pluggy-1.3.0 -- D:\Python310\python.exe +cachedir: .pytest_cache +rootdir: D:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio2 +plugins: timeout-2.2.0 +collected 20 items + +test/real/test_real_e2e.py::TestVideoFileValidation::test_primary_video_file_exists PASSED [ 5%] +test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file + 使用测试音频:VID_20251031_132320_019_mono.wav (5.23 MB) + 开始测试音频识别 + 音频识别耗时:45.32 秒 + ✓ 音频识别成功,task_id: abc123... +PASSED [ 10%] + +======================================================== 20 passed in 180.52s (0:03:00) ========================================================= +``` + +--- + +## 🆘 需要帮助? + +1. 查看 [README_故障排查.md](README_故障排查.md) +2. 运行诊断脚本:`python test/real/check_e2e.py` +3. 查看详细输出:`pytest test/real/ -v -s` +4. 按 `Ctrl+C` 查看卡住位置的堆栈跟踪 + +--- + +## 📚 相关文档 + +- [主测试文档](../README.md) - 整体测试策略 +- [pytest 文档](https://docs.pytest.org/) - pytest 使用指南 +- [FunASR 文档](https://github.com/alibaba-damo-academy/FunASR) - ASR 模型说明 diff --git a/test/real/README_故障排查.md b/test/real/README_故障排查.md new file mode 100644 index 0000000..1952e95 --- /dev/null +++ b/test/real/README_故障排查.md @@ -0,0 +1,229 @@ +# 端到端测试故障排查指南 + +## ⚠️ 测试卡住的常见原因 + +### 1. **首次运行下载模型**(最常见) +**症状**:CPU 占用不高,测试长时间无响应 + +**原因**: +- FunASR 模型首次运行需要下载(约 1-2GB) +- 下载速度取决于网络 +- 下载过程中测试会卡住等待模型加载 + +**解决方案**: +```bash +# 手动预下载模型 +python -c "from app.asr.asr_service import ASRService; ASRService()" +``` + +或等待 10-30 分钟(取决于网络) + +--- + +### 2. **模型加载缓慢** +**症状**:CPU 占用低,内存占用逐渐上升 + +**原因**: +- ASR 模型很大(Paraformer 约 200MB+) +- CPU 加载模型需要时间(30 秒 -2 分钟) +- 首次加载会初始化多个组件 + +**解决方案**: +- 耐心等待首次加载 +- 后续测试会快很多(模型已缓存) +- 使用 GPU 可加速(如果有) + +--- + +### 3. **音频/视频文件过大** +**症状**:处理时间超长 + +**原因**: +- 长音频处理时间与长度成正比 +- 1 分钟音频 ≈ 10-30 秒处理时间(CPU) +- 视频需要先提取音频 + +**解决方案**: +```yaml +# 在 test_config.yaml 中使用较小的测试文件 +test_files: + primary_video: "input/short.AVI" # 使用短文件 +``` + +--- + +### 4. **pytest-timeout 未安装** +**症状**:测试卡住后不会自动超时 + +**解决方案**: +```bash +pip install pytest-timeout +``` + +--- + +## 🔍 快速诊断命令 + +### 检查测试环境 +```bash +python test/real/check_e2e.py +``` + +### 运行最简单的测试(不卡) +```bash +# 只运行文件验证测试(立即完成) +pytest test/real/test_real_e2e.py::TestVideoFileValidation -v + +# 只运行错误处理测试(不依赖 ASR) +pytest test/real/test_real_e2e.py::TestErrorHandling -v +``` + +### 运行单个 ASR 测试(带详细输出) +```bash +# -s 显示 print 输出,便于观察进度 +pytest test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file -v -s --timeout=600 +``` + +### 查看模型缓存 +```bash +# 检查模型是否已下载 +dir app\asr /s /b | findstr ".pt" +``` + +--- + +## 🚀 推荐的测试流程 + +### 第一步:预下载模型(重要!) +```bash +# 在运行测试前,先手动加载一次模型 +python -c "from app.asr.asr_service import ASRService; s = ASRService(); print('模型加载完成')" +``` + +### 第二步:运行快速验证测试 +```bash +# 验证测试环境正常 +pytest test/real/test_real_e2e.py::TestVideoFileValidation -v +pytest test/real/test_real_e2e.py::TestAudioFileValidation -v +pytest test/real/test_real_e2e.py::TestErrorHandling -v +``` + +### 第三步:运行单个 ASR 测试 +```bash +# 使用短音频文件测试 +pytest test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file -v -s +``` + +### 第四步:运行完整测试 +```bash +# 所有端到端测试 +pytest test/real/ -v -s --timeout=600 +``` + +--- + +## 📊 预期处理时间参考 + +| 测试类型 | 文件大小 | CPU 时间 | GPU 时间 | +|---------|---------|---------|---------| +| 文件验证 | - | <1 秒 | <1 秒 | +| 错误处理 | - | <1 秒 | <1 秒 | +| ASR 识别(短音频) | 10MB | 30-60 秒 | 10-20 秒 | +| ASR 识别(长音频) | 50MB | 2-5 分钟 | 30-60 秒 | +| 视频转码 | 100MB | 1-3 分钟 | 30-60 秒 | + +--- + +## 💡 优化建议 + +### 1. 使用更小的测试文件 +```yaml +# test_config.yaml +test_files: + primary_video: "input/short.AVI" # 使用短文件 +``` + +### 2. 跳过慢速测试 +```bash +# 只运行快速测试 +pytest test/real/ -v -m "not slow" +``` + +### 3. 并行测试(如果多个文件) +```bash +pip install pytest-xdist +pytest test/real/ -v -n 4 # 4 个进程并行 +``` + +### 4. 使用 GPU(如果有) +```python +# 在 asr_service.py 中 +service = ASRService(device='cuda') +``` + +--- + +## 🆘 仍然卡住怎么办? + +### 方法 1:查看堆栈跟踪 +按 `Ctrl+C` 中断测试,查看卡在哪里 + +### 方法 2:增加超时时间 +```bash +pytest test/real/ -v --timeout=1200 # 20 分钟超时 +``` + +### 方法 3:使用更短的测试文件 +```bash +# 修改 test_config.yaml +primary_video: "input/short.AVI" +``` + +### 方法 4:检查日志 +```bash +# 查看详细输出 +pytest test/real/ -v -s 2>&1 | tee test.log +``` + +--- + +## ✅ 正常测试的标志 + +1. **首次运行**: + - 模型下载:10-30 分钟(网络依赖) + - 模型加载:30 秒 -2 分钟 + - 后续运行:快 10 倍 + +2. **处理过程**: + - CPU 占用:50-100%(正常推理) + - 内存占用:2-4GB(模型加载) + - 磁盘 IO:中等(读写临时文件) + +3. **完成标志**: + - 看到 `✓` 标记 + - 显示处理时间 + - 生成结果文件 + +--- + +## 📝 测试输出示例 + +``` +test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file + +使用测试音频:VID_20251031_132320_019_mono.wav (5.23 MB) + +开始测试音频识别:VID_20251031_132320_019_mono.wav +音频识别耗时:45.32 秒 +✓ 音频识别成功,task_id: abc123... + +PASSED [100%] +``` + +--- + +## 🔗 相关文档 + +- [测试配置文件](test_config.yaml) - 修改测试文件路径 +- [测试代码](test_real_e2e.py) - 查看测试逻辑 +- [pytest 文档](https://docs.pytest.org/) - pytest 使用 diff --git a/test/real/check_e2e.py b/test/real/check_e2e.py new file mode 100644 index 0000000..944977c --- /dev/null +++ b/test/real/check_e2e.py @@ -0,0 +1,163 @@ +""" +端到端测试诊断脚本 +用于快速检查测试环境和依赖 +""" +import sys +import os +from pathlib import Path + +project_root = Path(__file__).parent.parent.parent.absolute() +os.chdir(project_root) + +print("=" * 60) +print("端到端测试环境诊断") +print("=" * 60) + +# 1. 检查 Python 版本 +print(f"\n[OK] Python 版本:{sys.version}") +print(f" 路径:{sys.executable}") + +# 2. 检查测试配置文件 +print("\n" + "-" * 60) +print("测试配置文件检查") +print("-" * 60) + +config_file = Path(__file__).parent / "test_config.yaml" +if config_file.exists(): + print(f"[OK] 配置文件存在:{config_file}") + + import yaml + with open(config_file, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + + print(f"[OK] 主测试视频:{config['test_files']['primary_video']}") + print(f"[OK] 备用视频:{len(config['test_files']['backup_videos'])} 个") + print(f"[OK] 音频文件:{len(config['test_files']['audio_files'])} 个") + + # 检查文件是否存在 + for key, files in [('视频', config['test_files']['backup_videos']), + ('音频', config['test_files']['audio_files'])]: + for file_path in files: + full_path = project_root / file_path + exists = "[OK]" if full_path.exists() else "[X]" + print(f" {exists} {file_path}") +else: + print(f"[X] 配置文件不存在:{config_file}") + +# 3. 检查测试文件 +print("\n" + "-" * 60) +print("测试文件检查") +print("-" * 60) + +test_files_dir = Path(__file__).parent +print(f"测试目录:{test_files_dir}") + +# 4. 检查依赖 +print("\n" + "-" * 60) +print("依赖检查") +print("-" * 60) + +dependencies = { + 'pytest': 'pytest', + 'yaml': 'pyyaml', + 'flask': 'flask', + 'torch': 'pytorch', + 'librosa': 'librosa', +} + +for module, package in dependencies.items(): + try: + __import__(module) + print(f"[OK] {package}") + except ImportError: + print(f"[X] {package} (需要安装:pip install {package})") + +# 5. 检查实际测试文件 +print("\n" + "-" * 60) +print("实际测试文件检查") +print("-" * 60) + +input_dir = project_root / "input" +if input_dir.exists(): + print(f"输入目录:{input_dir}") + video_files = list(input_dir.glob("*.AVI")) + list(input_dir.glob("*.avi")) + \ + list(input_dir.glob("*.mp4")) + list(input_dir.glob("*.MP4")) + audio_files = list(input_dir.glob("*.wav")) + list(input_dir.glob("*.WAV")) + + print(f" 视频文件:{len(video_files)} 个") + for f in video_files[:5]: + size_mb = f.stat().st_size / 1024 / 1024 + print(f" - {f.name} ({size_mb:.2f} MB)") + + print(f" 音频文件:{len(audio_files)} 个") + for f in audio_files[:5]: + size_mb = f.stat().st_size / 1024 / 1024 + print(f" - {f.name} ({size_mb:.2f} MB)") +else: + print(f"[X] 输入目录不存在:{input_dir}") + +# 6. 检查 ASR 模型 +print("\n" + "-" * 60) +print("ASR 模型检查") +print("-" * 60) + +try: + from app.asr.asr_service import ASRService + print("[OK] ASR 服务模块可导入") + + # 检查模型缓存目录 + import os + model_cache = os.environ.get("MODELSCOPE_CACHE", "未设置") + print(f" 模型缓存目录:{model_cache}") + + cache_path = Path(model_cache) if model_cache != "未设置" else None + if cache_path and cache_path.exists(): + model_files = list(cache_path.glob("*")) + print(f" 缓存的模型文件:{len(model_files)} 个") + else: + print(f" [!] 模型缓存目录不存在(首次运行会下载模型)") + +except Exception as e: + print(f"[X] ASR 服务导入失败:{e}") + +# 7. 检查 ffmpeg +print("\n" + "-" * 60) +print("FFmpeg 检查") +print("-" * 60) + +ffmpeg_path = project_root / "lib" / "ffmpeg.exe" +if ffmpeg_path.exists(): + print(f"[OK] ffmpeg 存在:{ffmpeg_path}") +else: + print(f"[X] ffmpeg 不存在:{ffmpeg_path}") + print(" 请确保 ffmpeg 在 lib 目录下") + +# 8. 运行建议 +print("\n" + "=" * 60) +print("运行建议") +print("=" * 60) + +print(""" +运行端到端测试: + pytest test/real/ -v + +只运行快速验证测试(不测试 ASR): + pytest test/real/test_real_e2e.py::TestVideoFileValidation -v + pytest test/real/test_real_e2e.py::TestAudioFileValidation -v + pytest test/real/test_real_e2e.py::TestErrorHandling -v + +运行单个 ASR 测试(带详细输出): + pytest test/real/test_real_e2e.py::TestASRRecognition::test_recognize_audio_file -v -s + +如果测试卡住,按 Ctrl+C 查看堆栈跟踪 + +注意事项: + 1. 首次运行会下载 ASR 模型(可能很大,需要时间) + 2. CPU 运行 ASR 推理较慢(1 分钟音频可能需要 2-5 分钟) + 3. 确保有足够的磁盘空间(临时文件 + 模型缓存) + 4. 如果内存不足,建议使用较小的测试文件 +""") + +print("=" * 60) +print("诊断完成") +print("=" * 60) diff --git a/test/real/test_config.yaml b/test/real/test_config.yaml new file mode 100644 index 0000000..0dfd709 --- /dev/null +++ b/test/real/test_config.yaml @@ -0,0 +1,67 @@ +# ============================================ +# 端到端真实测试配置文件 +# ============================================ +# 说明:此配置文件用于端到端真实测试,可根据需要修改测试文件路径 +# ============================================ + +# 测试文件配置 +test_files: + # 主测试视频文件(用于完整流程测试) + primary_video: "input/VID_20251104_085655_024.AVI" + + # 备用测试文件(当主文件不可用时) + backup_videos: + - "input/VID_20251104_090655_025.AVI" + + # 音频文件(用于单独测试 ASR) + audio_files: + - "input/VID_20251031_132320_019_mono.wav" + - "input/VID_20251031_132320_019_mono_speak_only.wav" + +# 测试超时配置(秒) +timeouts: + # ASR 识别超时(长音频可能需要更长时间) + asr_recognize: 600 + # 视频转码超时 + transcode: 300 + # 单个测试用例超时 + test_case: 900 + +# 性能测试阈值 +performance: + # 最大可接受的 ASR 处理时间(秒) + max_asr_time: 200 + # 最大可接受的转码时间(秒) + max_transcode_time: 600 + # 最小处理速度(秒/秒,即 realtime factor) + min_processing_speed: 0.5 + +# 测试环境配置 +environment: + # 是否跳过 GPU 测试(如果无 GPU 则设为 true) + skip_gpu_tests: false + # 是否跳过大文件测试(>100MB) + skip_large_files: false + # 最大测试文件大小(MB) + # 注意:此限制仅用于测试验证,实际项目无文件大小限制 + max_file_size_mb: 1024 # 1GB,适应大视频文件 + +# 测试数据验证 +validation: + # ASR 结果最小置信度 + min_asr_confidence: 0.6 + # 说话人分离最小说话人数 + min_speakers: 1 + # 说话人分离最大说话人数 + max_speakers: 10 + # 时间戳精度(秒) + timestamp_precision: 0.1 + +# 输出配置 +output: + # 是否保留测试生成的临时文件(调试用) + keep_temp_files: false + # 是否保存详细日志 + verbose_logging: true + # 测试结果输出目录 + result_dir: "test_output" diff --git a/test/real/test_real_e2e.py b/test/real/test_real_e2e.py new file mode 100644 index 0000000..77a8c97 --- /dev/null +++ b/test/real/test_real_e2e.py @@ -0,0 +1,531 @@ +""" +真实端到端测试 - 带超时保护 +测试完整的语音识别和转码流程,使用真实的视频/音频文件 +""" +import pytest +import json +import time +import os +import sys +from pathlib import Path +from datetime import datetime +import yaml + +project_root = Path(__file__).parent.parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app +from app.settings import config + + +def load_test_config(): + """加载测试配置文件""" + config_path = Path(__file__).parent / "test_config.yaml" + with open(config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + + +TEST_CONFIG = load_test_config() + + +@pytest.fixture(scope='session') +def e2e_config(): + """端到端测试配置""" + return TEST_CONFIG + + +@pytest.fixture(scope='session') +def test_video_file(e2e_config): + """获取测试视频文件路径""" + video_path = project_root / e2e_config['test_files']['primary_video'] + + if not video_path.exists(): + for backup in e2e_config['test_files']['backup_videos']: + backup_path = project_root / backup + if backup_path.exists(): + print(f"\n使用备用视频文件:{backup_path.name}") + return backup_path + pytest.skip(f"测试视频文件不存在:{video_path}") + + file_size_mb = video_path.stat().st_size / 1024 / 1024 + print(f"\n使用测试视频:{video_path.name} ({file_size_mb:.2f} MB)") + return video_path + + +@pytest.fixture(scope='session') +def test_audio_file(e2e_config): + """获取测试音频文件路径""" + for audio_path_str in e2e_config['test_files']['audio_files']: + audio_path = project_root / audio_path_str + if audio_path.exists(): + file_size_mb = audio_path.stat().st_size / 1024 / 1024 + print(f"\n使用测试音频:{audio_path.name} ({file_size_mb:.2f} MB)") + return audio_path + pytest.skip("测试音频文件不存在") + + +@pytest.fixture(scope='function') +def e2e_app(): + """创建用于端到端测试的 Flask 应用""" + app = create_app() + app.config['TESTING'] = True + app.config['TASK_TIMEOUT'] = TEST_CONFIG['timeouts']['asr_recognize'] + yield app + + +class TestVideoFileValidation: + """测试视频文件验证""" + + def test_primary_video_file_exists(self, test_video_file): + """测试主视频文件存在""" + assert test_video_file.exists(), f"视频文件不存在:{test_video_file}" + assert test_video_file.stat().st_size > 0, "视频文件大小为 0" + + def test_video_file_size_within_limit(self, test_video_file, e2e_config): + """测试视频文件大小在限制范围内""" + max_size = e2e_config['environment']['max_file_size_mb'] * 1024 * 1024 + file_size = test_video_file.stat().st_size + assert file_size <= max_size, f"视频文件过大:{file_size / 1024 / 1024:.2f}MB" + + def test_video_file_format(self, test_video_file): + """测试视频文件格式""" + supported_formats = ['.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv', '.m4v'] + assert test_video_file.suffix.lower() in supported_formats, \ + f"不支持的视频格式:{test_video_file.suffix}" + + +class TestAudioFileValidation: + """测试音频文件验证""" + + def test_audio_file_exists(self, test_audio_file): + """测试音频文件存在""" + assert test_audio_file.exists(), f"音频文件不存在:{test_audio_file}" + assert test_audio_file.stat().st_size > 0, "音频文件大小为 0" + + +class TestASRRecognition: + """测试 ASR 语音识别完整流程""" + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60) + def test_recognize_audio_file(self, e2e_app, test_audio_file): + """测试真实音频文件的语音识别""" + print(f"\n开始测试音频识别:{test_audio_file.name}") + start_time = time.time() + + app = e2e_app + timeout = TEST_CONFIG['timeouts']['asr_recognize'] + + with app.test_client() as client: + response = client.get( + f'/api/recognize?path={test_audio_file.name}' + ) + + elapsed = time.time() - start_time + print(f"音频识别耗时:{elapsed:.2f}秒") + + assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒" + + assert response.status_code == 200, \ + f"识别失败:{response.status_code} - {response.get_json()}" + + data = response.get_json() + assert data['status'] == 'success', \ + f"识别返回错误:{data}" + assert 'data' in data + assert 'task_id' in data['data'] + print(f"✓ 音频识别成功,task_id: {data['data']['task_id']}") + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60) + def test_recognize_video_file(self, e2e_app, test_video_file): + """测试真实视频文件的语音识别""" + print(f"\n开始测试视频识别:{test_video_file.name}") + start_time = time.time() + + app = e2e_app + timeout = TEST_CONFIG['timeouts']['asr_recognize'] + + with app.test_client() as client: + response = client.get( + f'/api/recognize?path={test_video_file.name}' + ) + + elapsed = time.time() - start_time + print(f"视频识别耗时:{elapsed:.2f}秒") + + assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒" + + assert response.status_code == 200, \ + f"识别失败:{response.status_code} - {response.get_json()}" + + data = response.get_json() + assert data['status'] == 'success' + print(f"✓ 视频识别成功") + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(60) + def test_get_recognition_result(self, e2e_app, test_audio_file): + """测试获取语音识别结果""" + print(f"\n开始获取识别结果:{test_audio_file.name}") + + app = e2e_app + timeout = 60 + + with app.test_client() as client: + start_time = time.time() + + response = client.get( + f'/api/result?path={test_audio_file.name}' + ) + + elapsed = time.time() - start_time + print(f"获取结果耗时:{elapsed:.2f}秒") + + assert elapsed < timeout, f"获取结果超时:{elapsed:.2f}秒" + + assert response.status_code == 200, \ + f"获取结果失败:{response.status_code} - {response.get_json()}" + + data = response.get_json() + assert data['status'] == 'success' + assert 'data' in data + + if 'sentences' in data['data']: + sentences = data['data']['sentences'] + print(f"✓ 识别到 {len(sentences)} 个句子") + assert isinstance(sentences, list) + + if len(sentences) > 0: + first_sentence = sentences[0] + print(f" 第一句:[{first_sentence.get('speaker', 'N/A')}] {first_sentence.get('text', '')[:50]}") + assert 'speaker' in first_sentence + assert 'text' in first_sentence + assert 'begin_time' in first_sentence + assert 'end_time' in first_sentence + + +class TestSpeakerDiarization: + """测试说话人分离功能""" + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(60) + def test_speaker_diarization_result(self, e2e_app, test_audio_file): + """测试说话人分离结果""" + print(f"\n测试说话人分离:{test_audio_file.name}") + + app = e2e_app + + with app.test_client() as client: + response = client.get( + f'/api/result?path={test_audio_file.name}' + ) + + if response.status_code == 200: + data = response.get_json() + + if 'data' in data and 'sentences' in data['data']: + sentences = data['data']['sentences'] + + if len(sentences) > 0: + speakers = set() + for sentence in sentences: + if 'speaker' in sentence: + speakers.add(sentence['speaker']) + + print(f"✓ 识别到 {len(speakers)} 个说话人:{speakers}") + + min_speakers = TEST_CONFIG['validation']['min_speakers'] + max_speakers = TEST_CONFIG['validation']['max_speakers'] + + assert len(speakers) >= min_speakers, \ + f"说话人数量过少:{len(speakers)}" + assert len(speakers) <= max_speakers, \ + f"说话人数量过多:{len(speakers)}" + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(60) + def test_speaker_timestamp_accuracy(self, e2e_app, test_audio_file): + """测试说话人时间戳准确性""" + print(f"\n测试时间戳准确性:{test_audio_file.name}") + + app = e2e_app + + with app.test_client() as client: + response = client.get( + f'/api/result?path={test_audio_file.name}' + ) + + if response.status_code == 200: + data = response.get_json() + + if 'data' in data and 'sentences' in data['data']: + sentences = data['data']['sentences'] + + valid_count = 0 + for sentence in sentences: + if all(k in sentence for k in ['begin_time', 'end_time']): + begin = sentence['begin_time'] + end = sentence['end_time'] + + assert begin >= 0, f"开始时间为负:{begin}" + assert end > begin, f"结束时间无效:begin={begin}, end={end}" + + precision = TEST_CONFIG['validation']['timestamp_precision'] + duration = end - begin + assert duration >= precision, \ + f"时间戳精度不足:{duration}" + valid_count += 1 + + print(f"✓ 验证 {valid_count} 个有效时间戳") + + +class TestVideoTranscoding: + """测试视频转码功能""" + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(TEST_CONFIG['timeouts']['transcode'] + 60) + @pytest.mark.requires_ffmpeg + def test_transcode_video(self, e2e_app, test_video_file): + """测试真实视频转码""" + print(f"\n测试视频转码:{test_video_file.name}") + start_time = time.time() + + app = e2e_app + timeout = TEST_CONFIG['timeouts']['transcode'] + + with app.test_client() as client: + response = client.get( + f'/api/convert?path={test_video_file.name}' + ) + + elapsed = time.time() - start_time + print(f"转码耗时:{elapsed:.2f}秒") + + assert elapsed < timeout, f"转码超时:{elapsed:.2f}秒 > {timeout}秒" + + assert response.status_code in [200, 404, 500], \ + f"转码请求失败:{response.status_code}" + + if response.status_code == 200: + data = response.get_json() + assert data['status'] == 'success' + print(f"✓ 转码成功") + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(60) + @pytest.mark.requires_ffmpeg + def test_get_transcoded_video_url(self, e2e_app, test_video_file): + """测试获取转码后视频 URL""" + print(f"\n测试获取转码视频 URL:{test_video_file.name}") + + app = e2e_app + + with app.test_client() as client: + response = client.get( + f'/api/getVidUrl?path={test_video_file.name}' + ) + + assert response.status_code in [200, 404], \ + f"获取 URL 失败:{response.status_code}" + + if response.status_code == 200: + data = response.get_json() + assert data['status'] == 'success' + assert 'data' in data + assert 'url' in data['data'] + print(f"✓ 获取 URL 成功:{data['data']['url'][:50]}...") + + +class TestPerformance: + """测试性能指标""" + + @pytest.mark.real + @pytest.mark.performance + @pytest.mark.slow + @pytest.mark.timeout(300) + def test_asr_processing_time(self, e2e_app, test_audio_file): + """测试 ASR 处理时间""" + print(f"\n性能测试:ASR 处理时间") + + app = e2e_app + max_time = TEST_CONFIG['performance']['max_asr_time'] + + with app.test_client() as client: + start_time = time.time() + + response = client.get( + f'/api/recognize?path={test_audio_file.name}' + ) + + elapsed = time.time() - start_time + print(f"ASR 处理时间:{elapsed:.2f}秒(最大允许:{max_time}秒)") + + assert response.status_code == 200 + assert elapsed < max_time, \ + f"ASR 处理时间过长:{elapsed:.2f}秒 > {max_time}秒" + + @pytest.mark.real + @pytest.mark.performance + @pytest.mark.slow + @pytest.mark.timeout(300) + def test_realtime_factor(self, e2e_app, test_audio_file): + """测试实时处理率(RTF)""" + print(f"\n性能测试:实时处理率") + + app = e2e_app + min_speed = TEST_CONFIG['performance']['min_processing_speed'] + + with app.test_client() as client: + start_time = time.time() + + response = client.get( + f'/api/recognize?path={test_audio_file.name}' + ) + + elapsed = time.time() - start_time + + if response.status_code == 200: + audio_duration = self._get_audio_duration(test_audio_file) + + if audio_duration > 0: + rtf = elapsed / audio_duration + processing_speed = 1 / rtf if rtf > 0 else float('inf') + + print(f"音频时长:{audio_duration:.2f}秒,处理时间:{elapsed:.2f}秒") + print(f"实时处理率:{processing_speed:.2f}x (RTF: {rtf:.2f})") + + assert processing_speed >= min_speed, \ + f"处理速度过慢:{processing_speed:.2f}x < {min_speed}x" + + def _get_audio_duration(self, audio_path): + """获取音频文件时长(秒)""" + try: + import librosa + y, sr = librosa.load(str(audio_path), sr=None) + return len(y) / sr + except Exception as e: + print(f"无法获取音频时长:{e}") + return 0 + + +class TestErrorHandling: + """测试错误处理""" + + @pytest.mark.real + @pytest.mark.timeout(30) + def test_nonexistent_file_error(self, e2e_app): + """测试文件不存在错误处理""" + print(f"\n测试错误处理:文件不存在") + + app = e2e_app + + with app.test_client() as client: + response = client.get('/api/recognize?path=nonexistent.wav') + + assert response.status_code in [400, 404, 500] + print(f"✓ 正确返回错误:{response.status_code}") + + if response.status_code == 200: + data = response.get_json() + assert data['status'] == 'error' + + @pytest.mark.real + @pytest.mark.timeout(30) + def test_missing_parameter_error(self, e2e_app): + """测试缺少参数错误处理""" + print(f"\n测试错误处理:缺少参数") + + app = e2e_app + + with app.test_client() as client: + response = client.get('/api/recognize') + + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + assert 'message' in data + print(f"✓ 正确返回参数错误") + + @pytest.mark.real + @pytest.mark.timeout(30) + @pytest.mark.requires_ffmpeg + def test_transcode_nonexistent_file(self, e2e_app): + """测试转码不存在的文件""" + print(f"\n测试错误处理:转码文件不存在") + + app = e2e_app + + with app.test_client() as client: + response = client.get('/api/convert?path=nonexistent.mp4') + + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + print(f"✓ 正确返回 404 错误") + + +class TestAPIResponseFormat: + """测试 API 响应格式""" + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(300) # 5 分钟超时,包括模型加载时间 + def test_recognize_response_format(self, e2e_app, test_audio_file): + """测试识别接口响应格式""" + print(f"\n测试响应格式:识别接口") + + app = e2e_app + + with app.test_client() as client: + response = client.get( + f'/api/recognize?path={test_audio_file.name}' + ) + + if response.status_code == 200: + data = response.get_json() + + assert 'status' in data + assert 'message' in data + assert 'timestamp' in data + assert 'data' in data + assert isinstance(data['data'], dict) + print(f"✓ 响应格式正确") + + @pytest.mark.real + @pytest.mark.slow + @pytest.mark.timeout(300) # 5 分钟超时 + def test_result_response_format(self, e2e_app, test_audio_file): + """测试结果接口响应格式""" + print(f"\n测试响应格式:结果接口") + + app = e2e_app + + with app.test_client() as client: + response = client.get( + f'/api/result?path={test_audio_file.name}' + ) + + if response.status_code == 200: + data = response.get_json() + + assert 'status' in data + assert 'message' in data + assert 'timestamp' in data + + if 'data' in data: + result_data = data['data'] + assert isinstance(result_data, dict) + print(f"✓ 结果响应格式正确") + + +if __name__ == '__main__': + pytest.main([__file__, '-v', '-m', 'real', '--tb=short']) diff --git a/test/real/test_requirements.txt b/test/real/test_requirements.txt new file mode 100644 index 0000000..99cdacf --- /dev/null +++ b/test/real/test_requirements.txt @@ -0,0 +1,23 @@ +# ============================================ +# 端到端真实测试额外依赖 +# ============================================ +# 说明:这些依赖用于支持端到端真实测试 +# 安装方式:pip install -r test_requirements.txt +# ============================================ + +# ---------- 测试配置文件支持 ---------- +pyyaml>=6.0 + +# ---------- 性能测试工具(可选) ---------- +# pytest-benchmark>=4.0.0 +# memory-profiler>=0.61.0 + +# ---------- 测试超时控制 ---------- +pytest-timeout>=2.2.0 + +# ---------- 代码覆盖率(可选) ---------- +# pytest-cov>=4.1.0 + +# ---------- 测试报告生成(可选) ---------- +# pytest-html>=4.0.0 +# pytest-xdist>=3.3.0 # 并行测试执行 diff --git a/test/real/修复记录.md b/test/real/修复记录.md new file mode 100644 index 0000000..cdc6f08 --- /dev/null +++ b/test/real/修复记录.md @@ -0,0 +1,176 @@ +# 端到端测试修复记录 + +## 2026-05-13 修复 + +### 问题 1:视频大小限制测试失败 + +**测试**:`TestVideoFileValidation::test_video_file_size_within_limit` + +**错误**: +``` +AssertionError: 视频文件过大:886.50MB +assert 929565696 <= 524288000 +``` + +**原因**: +- 测试配置设置的限制为 500MB +- 实际视频文件 `VID_20251104_085655_024.AVI` 大小为 886.50MB +- 此限制是我添加的防御性测试,但项目实际没有文件大小限制 + +**修复**: +```yaml +# test_config.yaml +environment: + max_file_size_mb: 1024 # 从 500MB 改为 1GB +``` + +**说明**: +- Flask 的 `MAX_CONTENT_LENGTH` (500MB) 用于 HTTP 上传限制 +- 本项目使用本地文件,不适用此限制 +- 测试限制仅用于验证,不应过于严格 + +--- + +### 问题 2:文件不存在错误处理测试失败 + +**测试**:`TestErrorHandling::test_nonexistent_file_error` + +**错误**: +``` +assert response.status_code in [400, 404, 500] +assert 200 in [400, 404, 500] +``` + +**原因**: +- `app/asr/core.py::main()` 函数在文件不存在时返回错误字符串 `"输入路径不存在"` +- 但 `app/asr/routes.py` 没有检查返回值,直接返回 200 成功 + +**代码问题**: +```python +# routes.py (修复前) +try: + from app.asr.core import main + main(path) # 返回值被忽略 +finally: + ... + +return jsonify(make_response(...)), 200 # 总是返回 200 +``` + +**修复**: +```python +# routes.py (修复后) +try: + from app.asr.core import main + result = main(path) + + # 检查返回值,如果是错误信息,返回 400 + if result and isinstance(result, str): + task_running[task_id] = False + return jsonify(make_response(status="error", message=result)), 400 + +finally: + ... +``` + +**影响**: +- ✅ 现在文件不存在时正确返回 400 错误 +- ✅ API 错误处理更一致 +- ✅ 测试通过 + +--- + +### 问题 3:测试超时导致进程终止 + +**测试**:`TestAPIResponseFormat::test_recognize_response_format` + +**错误**: +``` +The python test process was terminated before it could exit on its own +``` + +**原因**: +- 超时设置为 60 秒 +- 首次运行需要加载 ASR 模型(可能 30 秒 -2 分钟) +- 9.16MB 音频文件推理也需要时间 +- 总时间超过 60 秒导致被强制终止 + +**修复**: +```python +# test_real_e2e.py +@pytest.mark.real +@pytest.mark.slow +@pytest.mark.timeout(300) # 从 60 秒改为 300 秒(5 分钟) +def test_recognize_response_format(self, e2e_app, test_audio_file): + ... +``` + +**说明**: +- 首次运行:模型下载 + 加载 + 推理(可能需要 5-10 分钟) +- 后续运行:模型已缓存(1-2 分钟) +- 超时时间应该包含模型加载时间 + +--- + +## 修复总结 + +### 已修复的问题 + +| 问题 | 修复方式 | 文件 | +|------|---------|------| +| 视频大小限制过严 | 500MB → 1GB | test_config.yaml | +| 文件不存在返回 200 | 检查 main() 返回值 | app/asr/routes.py | +| 测试超时被终止 | 60 秒 → 300 秒 | test_real_e2e.py | + +### 修复后的预期行为 + +1. **视频大小测试**: + - ✅ 886MB 视频文件通过测试 + - ✅ 限制值 1GB 足够大 + +2. **错误处理测试**: + - ✅ 文件不存在返回 400 + - ✅ 错误信息正确传递 + +3. **响应格式测试**: + - ✅ 5 分钟超时足够完成首次推理 + - ✅ 进程不会被强制终止 + +--- + +## 运行建议 + +### 首次运行前预加载模型 +```bash +# 避免测试时等待模型加载 +python -c "from app.asr.asr_service import ASRService; ASRService()" +``` + +### 运行错误处理测试(快速验证) +```bash +pytest test/real/test_real_e2e.py::TestErrorHandling -v +``` + +### 运行完整测试(首次需要耐心) +```bash +pytest test/real/ -v -s +``` + +### 跳过慢速测试(只验证基本功能) +```bash +pytest test/real/ -v -m "not slow" +``` + +--- + +## 测试时间参考 + +| 测试类别 | 首次运行 | 后续运行 | +|---------|---------|---------| +| 文件验证 | <1 秒 | <1 秒 | +| 错误处理 | <1 秒 | <1 秒 | +| ASR 识别(9MB) | 2-5 分钟 | 1-2 分钟 | +| ASR 识别(50MB) | 5-10 分钟 | 2-5 分钟 | +| 视频转码 | 1-3 分钟 | 1-3 分钟 | + +**注意**:首次运行包括模型下载(10-30 分钟,取决于网络) diff --git a/test/test_asr_routes.py b/test/test_asr_routes.py new file mode 100644 index 0000000..5277e6d --- /dev/null +++ b/test/test_asr_routes.py @@ -0,0 +1,221 @@ +""" +测试 ASR 路由 +""" +import pytest +import json +from pathlib import Path +import tempfile +import os +import sys + +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app + + +class TestRecognizeRoute: + """测试语音识别路由""" + + def test_recognize_missing_path_parameter(self): + """测试缺少 path 参数时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/recognize') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + assert 'path' in data['message'] + + def test_recognize_with_empty_path(self): + """测试 path 为空时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/recognize?path=') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + + def test_recognize_with_valid_path(self, temp_dirs): + """测试有效 path 参数(文件不存在情况)""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + # 注意:由于实际文件不存在,可能返回错误 + # 这里主要测试路由是否能正确处理请求 + with app.test_client() as client: + response = client.get('/api/recognize?path=test.mp4') + # 可能返回 200(成功)或 500(文件不存在) + assert response.status_code in [200, 400, 500] + + def test_recognize_response_format(self, temp_dirs): + """测试识别响应格式""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + response = client.get('/api/recognize?path=test.mp4') + if response.status_code == 200: + data = response.get_json() + assert 'status' in data + assert 'message' in data + assert 'data' in data + if 'task_id' in data.get('data', {}): + assert isinstance(data['data']['task_id'], str) + + +class TestResultRoute: + """测试结果获取路由""" + + def test_result_missing_path_parameter(self): + """测试缺少 path 参数时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/result') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + assert 'path' in data['message'] + + def test_result_with_empty_path(self): + """测试 path 为空时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/result?path=') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + + def test_result_nonexistent_file(self, temp_dirs): + """测试结果文件不存在时返回 404""" + app = create_app() + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + response = client.get('/api/result?path=nonexistent.mp4') + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + + def test_result_with_valid_file(self, temp_dirs): + """测试获取存在的结果文件""" + app = create_app() + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + # 创建模拟结果文件 + output_dir = Path(temp_dirs['output']) / 'SpeechRecognition' + output_dir.mkdir(parents=True, exist_ok=True) + result_file = output_dir / 'test_result.json' + test_data = { + 'status': 'success', + 'data': { + 'sentences': [ + { + 'speaker': 'SPK1', + 'text': '测试文本', + 'begin_time': 0.0, + 'end_time': 1.0 + } + ] + } + } + with open(result_file, 'w', encoding='utf-8') as f: + json.dump(test_data, f, ensure_ascii=False) + + with app.test_client() as client: + response = client.get('/api/result?path=test.mp4') + assert response.status_code == 200 + data = response.get_json() + assert data['status'] == 'success' + assert 'data' in data + + +class TestASRGlobalState: + """测试 ASR 全局状态""" + + def test_task_running_dict_exists(self): + """测试任务运行状态字典存在""" + from app.asr.routes import task_running + assert task_running is not None + assert isinstance(task_running, dict) + + def test_global_asr_service_variable(self): + """测试全局 ASR 服务变量存在""" + from app.asr.routes import GLOBAL_ASR_SERVICE + # 初始可能为 None + assert GLOBAL_ASR_SERVICE is None or hasattr(GLOBAL_ASR_SERVICE, 'recognize') + + def test_global_diar_service_variable(self): + """测试全局说话人分离服务变量存在""" + from app.asr.routes import GLOBAL_DIAR_SERVICE + # 初始可能为 None + assert GLOBAL_DIAR_SERVICE is None or hasattr(GLOBAL_DIAR_SERVICE, 'diarize') + + +class TestASRServiceIntegration: + """测试 ASR 服务集成""" + + def test_asr_service_import(self): + """测试 ASR 服务可以导入""" + from app.asr.asr_service import ASRService + assert ASRService is not None + + def test_diarization_service_import(self): + """测试说话人分离服务可以导入""" + from app.asr.diarization_service import DiarizationService + assert DiarizationService is not None + + def test_asr_service_initialization(self): + """测试 ASR 服务初始化""" + from app.asr.asr_service import ASRService + service = ASRService() + assert service is not None + assert service.model_name == 'paraformer-zh' + + def test_diarization_service_initialization(self): + """测试说话人分离服务初始化""" + from app.asr.diarization_service import DiarizationService + service = DiarizationService() + assert service is not None + assert service.embedding_model == 'eres2net' + + +class TestASRCore: + """测试 ASR 核心功能""" + + def test_core_module_import(self): + """测试核心模块可以导入""" + from app.asr import core + assert core is not None + + def test_get_video_list_function(self): + """测试获取视频列表函数存在""" + from app.asr.core import get_video_list + assert get_video_list is not None + + def test_extract_wav_function(self): + """测试提取 WAV 函数存在""" + from app.asr.core import extract_wav + assert extract_wav is not None + + def test_get_video_list_with_empty_folder(self, temp_dirs): + """测试空文件夹获取视频列表""" + from app.asr.core import get_video_list + videos = get_video_list(temp_dirs['input']) + assert isinstance(videos, list) + assert len(videos) == 0 + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/test/test_core.py b/test/test_core.py new file mode 100644 index 0000000..7a4e548 --- /dev/null +++ b/test/test_core.py @@ -0,0 +1,323 @@ +""" +测试 ASR 核心功能 +""" +import pytest +from pathlib import Path +import tempfile +import shutil +import sys +import os + +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + + +class TestASRCoreFunctions: + """测试 ASR 核心函数""" + + def test_get_video_list_function_exists(self): + """测试获取视频列表函数存在""" + from app.asr.core import get_video_list + assert callable(get_video_list) + + def test_get_video_list_empty_folder(self, temp_dirs): + """测试空文件夹获取视频列表""" + from app.asr.core import get_video_list + videos = get_video_list(temp_dirs['input']) + assert isinstance(videos, list) + assert len(videos) == 0 + + def test_get_video_list_with_video_files(self, temp_dirs): + """测试有视频文件时获取列表""" + from app.asr.core import get_video_list + import tempfile + from pathlib import Path + + # 创建独立的临时目录 + test_dir = Path(tempfile.mkdtemp()) + (test_dir / 'test1.mp4').touch() + (test_dir / 'test2.mp4').touch() + (test_dir / 'test3.avi').touch() + + videos = get_video_list(test_dir) + assert len(videos) == 3 + assert all(isinstance(v, Path) for v in videos) + + # 清理 + import shutil + shutil.rmtree(test_dir) + + def test_get_video_list_sorting(self, temp_dirs): + """测试视频列表排序""" + from app.asr.core import get_video_list + import tempfile + from pathlib import Path + + # 创建独立的临时目录 + test_dir = Path(tempfile.mkdtemp()) + + # 创建带时间戳的文件名 + (test_dir / 'VID_20251031_132320.mp4').touch() + (test_dir / 'VID_20251031_132330.mp4').touch() + (test_dir / 'VID_20251031_132340.mp4').touch() + + videos = get_video_list(test_dir) + assert len(videos) == 3 + # 验证按文件名排序 + assert '132320' in str(videos[0]) + assert '132330' in str(videos[1]) + assert '132340' in str(videos[2]) + + # 清理 + import shutil + shutil.rmtree(test_dir) + + def test_get_video_list_supported_formats(self, temp_dirs): + """测试支持的视频格式""" + from app.asr.core import get_video_list + + formats = ['mp4', 'avi', 'mkv', 'mov', 'flv', 'wmv', 'm4v'] + for fmt in formats: + (temp_dirs['input'] / f'test.{fmt}').touch() + + videos = get_video_list(temp_dirs['input']) + assert len(videos) == len(formats) + + +class TestTempDirectory: + """测试临时目录管理""" + + def test_clear_temp_dir_function_exists(self): + """测试清空临时目录函数存在""" + from app.asr.core import clear_temp_dir + assert callable(clear_temp_dir) + + def test_ensure_output_dir_function_exists(self): + """测试确保输出目录函数存在""" + from app.asr.core import ensure_output_dir + assert callable(ensure_output_dir) + + def test_clear_temp_dir_creates_directory(self, temp_dirs): + """测试清空临时目录会创建目录""" + from app.asr.core import clear_temp_dir, TEMP_DIR + + # 临时修改 TEMP_DIR + import app.asr.core as core_module + original_temp = core_module.TEMP_DIR + core_module.TEMP_DIR = temp_dirs['temp'] + + clear_temp_dir() + assert temp_dirs['temp'].exists() + + # 恢复原路径 + core_module.TEMP_DIR = original_temp + + def test_ensure_output_dir_creates_directory(self, temp_dirs): + """测试确保输出目录会创建目录""" + from app.asr.core import ensure_output_dir, OUTPUT_DIR + + # 临时修改 OUTPUT_DIR + import app.asr.core as core_module + original_output = core_module.OUTPUT_DIR + core_module.OUTPUT_DIR = temp_dirs['output'] + + ensure_output_dir() + assert temp_dirs['output'].exists() + + # 恢复原路径 + core_module.OUTPUT_DIR = original_output + + +class TestExtractWav: + """测试 WAV 提取功能""" + + def test_extract_wav_function_exists(self): + """测试提取 WAV 函数存在""" + from app.asr.core import extract_wav + assert callable(extract_wav) + + def test_extract_wav_with_nonexistent_video(self, temp_dirs): + """测试不存在的视频文件""" + from app.asr.core import extract_wav + + video_path = temp_dirs['input'] / 'nonexistent.mp4' + result = extract_wav(video_path, temp_dirs['temp']) + assert result is None + + +class TestASRService: + """测试 ASR 服务类""" + + def test_asr_service_class_exists(self): + """测试 ASR 服务类存在""" + from app.asr.asr_service import ASRService + assert ASRService is not None + + def test_asr_service_initialization(self): + """测试 ASR 服务初始化""" + from app.asr.asr_service import ASRService + service = ASRService() + assert service is not None + assert service.model_name == 'paraformer-zh' + + def test_asr_service_custom_model(self): + """测试自定义模型初始化""" + from app.asr.asr_service import ASRService + service = ASRService(model_name='SenseVoice') + assert service.model_name == 'SenseVoice' + + def test_asr_service_device_auto(self): + """测试自动设备检测""" + from app.asr.asr_service import ASRService + service = ASRService(device='auto') + assert service.device in ['cpu', 'cuda'] + + def test_asr_service_sentence_class(self): + """测试句子数据类""" + from app.asr.asr_service import Sentence + sentence = Sentence( + speaker='SPK1', + text='测试文本', + begin_time=0.0, + end_time=1.0 + ) + assert sentence.speaker == 'SPK1' + assert sentence.text == '测试文本' + assert sentence.begin_time == 0.0 + assert sentence.end_time == 1.0 + + def test_sentence_to_dict(self): + """测试句子转字典""" + from app.asr.asr_service import Sentence + sentence = Sentence( + speaker='SPK1', + text='测试文本', + begin_time=0.0, + end_time=1.0 + ) + d = sentence.to_dict() + assert d['speaker'] == 'SPK1' + assert d['text'] == '测试文本' + assert d['begin_time'] == 0.0 + assert d['end_time'] == 1.0 + assert 'duration' in d + + +class TestDiarizationService: + """测试说话人分离服务类""" + + def test_diarization_service_class_exists(self): + """测试说话人分离服务类存在""" + from app.asr.diarization_service import DiarizationService + assert DiarizationService is not None + + def test_diarization_service_initialization(self): + """测试说话人分离服务初始化""" + from app.asr.diarization_service import DiarizationService + service = DiarizationService() + assert service is not None + assert service.embedding_model == 'eres2net' + + def test_diarization_service_custom_model(self): + """测试自定义嵌入模型""" + from app.asr.diarization_service import DiarizationService + service = DiarizationService(embedding_model='campplus') + assert service.embedding_model == 'campplus' + + def test_diarization_segment_class(self): + """测试说话人分离片段数据类""" + from app.asr.diarization_service import DiarizationSegment + segment = DiarizationSegment( + speaker='SPK1', + begin_time=0.0, + end_time=1.0 + ) + assert segment.speaker == 'SPK1' + assert segment.begin_time == 0.0 + assert segment.end_time == 1.0 + + def test_diarization_segment_to_dict(self): + """测试片段转字典""" + from app.asr.diarization_service import DiarizationSegment + segment = DiarizationSegment( + speaker='SPK1', + begin_time=0.0, + end_time=1.0 + ) + d = segment.to_dict() + assert d['speaker'] == 'SPK1' + assert d['begin_time'] == 0.0 + assert d['end_time'] == 1.0 + assert 'duration' in d + + +class TestMapSpeaker: + """测试说话人映射功能""" + + def test_map_speaker_module_import(self): + """测试说话人映射模块可以导入""" + from app.asr import map_speaker + assert map_speaker is not None + + +class TestTranscodeCore: + """测试转码核心功能""" + + def test_convert_to_h264_function_exists(self): + """测试转码函数存在""" + from app.transcode.core import convert_to_h264 + assert callable(convert_to_h264) + + def test_convert_to_h264_file_not_found(self, temp_dirs): + """测试文件不存在时抛出异常""" + from app.transcode.core import convert_to_h264 + + with pytest.raises(FileNotFoundError): + convert_to_h264( + input_root=temp_dirs['input'], + vid_full_name='nonexistent.mp4', + output_root=temp_dirs['output'] + ) + + def test_convert_to_h264_output_naming(self, temp_dirs): + """测试输出文件命名""" + from app.transcode.core import convert_to_h264 + + # 创建假输入文件 + input_file = temp_dirs['input'] / 'test_video.mp4' + input_file.touch() + + # 尝试验证输出命名(可能因 ffmpeg 失败) + try: + output = convert_to_h264( + input_root=temp_dirs['input'], + vid_full_name='test_video.mp4', + output_root=temp_dirs['output'] + ) + assert 'test_video_h264.mp4' in output + except Exception as e: + # 预期可能因为 ffmpeg 问题失败 + assert True # 只要不崩溃即可 + + +class TestCaddyRun: + """测试 Caddy 运行""" + + def test_run_caddy_function_exists(self): + """测试运行 Caddy 函数存在""" + from lib.caddy.run import run_caddy + assert callable(run_caddy) + + def test_run_caddy_default_port(self): + """测试默认端口""" + from lib.caddy.run import run_caddy + # 验证函数签名 + import inspect + sig = inspect.signature(run_caddy) + assert 'port' in sig.parameters + assert sig.parameters['port'].default == 8086 + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/test/test_integration.py b/test/test_integration.py new file mode 100644 index 0000000..f1bafcf --- /dev/null +++ b/test/test_integration.py @@ -0,0 +1,231 @@ +""" +集成测试 +测试整个系统的协同工作 +""" +import pytest +from pathlib import Path +import tempfile +import sys +import os + +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app + + +class TestAppIntegration: + """测试应用集成""" + + def test_full_app_startup(self): + """测试完整应用启动""" + app = create_app() + assert app is not None + + # 验证所有路由都已注册 + rules = [rule.rule for rule in app.url_map.iter_rules()] + + # 根路由 + assert '/' in rules + + # ASR 路由 + assert '/api/recognize' in rules + assert '/api/result' in rules + + # 转码路由 + assert '/api/convert' in rules + assert '/api/getVidUrl' in rules + + # CORS 路由 + assert '/config' in rules + + def test_all_routes_respond(self): + """测试所有路由都能响应""" + app = create_app() + app.config['TESTING'] = True + + with app.test_client() as client: + # 测试根路由 + response = client.get('/') + assert response.status_code == 200 + + # 测试 CORS OPTIONS + response = client.options('/config') + assert response.status_code == 200 + + def test_error_handling_consistency(self): + """测试错误处理一致性""" + app = create_app() + app.config['TESTING'] = True + + with app.test_client() as client: + # 测试 404 路由 + response = client.get('/nonexistent') + assert response.status_code == 404 + + # 测试错误响应格式 + response = client.get('/api/recognize') # 缺少参数 + assert response.status_code == 400 + data = response.get_json() + assert 'status' in data + assert 'message' in data + assert data['status'] == 'error' + + +class TestModuleIsolation: + """测试模块隔离""" + + def test_asr_module_independent(self): + """测试 ASR 模块独立性""" + # 只导入 ASR 路由,不应该影响其他模块 + from app.asr.routes import register_asr_routes + from flask import Flask + + app = Flask(__name__) + register_asr_routes(app) + + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/api/recognize' in rules + assert '/api/result' in rules + # 不应该有转码路由 + assert '/api/convert' not in rules + + def test_transcode_module_independent(self): + """测试转码模块独立性""" + # 只导入转码路由,不应该影响其他模块 + from app.transcode.routes import register_transcode_routes + from flask import Flask + import tempfile + + app = Flask(__name__) + app.config['OUTPUT_DIR'] = tempfile.mkdtemp() + register_transcode_routes(app) + + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/api/convert' in rules + assert '/api/getVidUrl' in rules + # 不应该有 ASR 路由 + assert '/api/recognize' not in rules + + +class TestConfigIsolation: + """测试配置隔离""" + + def test_test_config_isolation(self): + """测试测试配置隔离""" + app = create_app() + app.config['TESTING'] = True + + # 修改测试配置不应该影响全局配置 + from app.settings import config + original_port = config['API_PORT'] + + app.config['API_PORT'] = 9999 + assert app.config['API_PORT'] == 9999 + assert config['API_PORT'] == original_port + + +class TestConcurrentRequests: + """测试并发请求""" + + def test_multiple_requests(self): + """测试多个请求""" + app = create_app() + app.config['TESTING'] = True + + with app.test_client() as client: + # 发送多个请求 + for i in range(5): + response = client.get('/') + assert response.status_code == 200 + + def test_request_isolation(self): + """测试请求隔离""" + app = create_app() + app.config['TESTING'] = True + + with app.test_client() as client: + # 第一个请求 + response1 = client.get('/') + data1 = response1.get_json() + + # 第二个请求 + response2 = client.get('/') + data2 = response2.get_json() + + # 时间戳应该不同(或至少请求是独立的) + assert data1['timestamp'] is not None + assert data2['timestamp'] is not None + + +class TestGlobalState: + """测试全局状态""" + + def test_task_running_is_dict(self): + """测试任务运行状态是字典类型""" + from app.asr.routes import task_running + # 验证是字典类型 + assert isinstance(task_running, dict) + + def test_global_services_initial_state(self): + """测试全局服务初始状态""" + from app.asr.routes import GLOBAL_ASR_SERVICE, GLOBAL_DIAR_SERVICE + # 初始应该是 None + assert GLOBAL_ASR_SERVICE is None + assert GLOBAL_DIAR_SERVICE is None + + +class TestPathHandling: + """测试路径处理""" + + def test_relative_path_handling(self, temp_dirs): + """测试相对路径处理""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + # 使用相对路径 + response = client.get('/api/convert?path=test.mp4') + # 应该能处理(可能返回 404,但不会崩溃) + assert response.status_code in [404, 500] + + def test_absolute_path_handling(self, temp_dirs): + """测试绝对路径处理""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + # 使用绝对路径 + abs_path = str(temp_dirs['input'] / 'test.mp4') + response = client.get(f'/api/convert?path={abs_path}') + # 应该能处理(可能返回 404,但不会崩溃) + assert response.status_code in [404, 500] + + +class TestResourceCleanup: + """测试资源清理""" + + def test_temp_dir_cleanup(self, temp_dirs): + """测试临时目录清理""" + # 创建临时文件 + temp_file = temp_dirs['temp'] / 'test.txt' + temp_file.touch() + assert temp_file.exists() + + # fixture 会自动清理,这里验证清理机制 + # 实际清理由 conftest.py 中的 fixture 处理 + + def test_output_dir_creation(self, temp_dirs): + """测试输出目录创建""" + output_dir = temp_dirs['output'] / 'test_subdir' + output_dir.mkdir(parents=True, exist_ok=True) + assert output_dir.exists() + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/test/test_main.py b/test/test_main.py new file mode 100644 index 0000000..cc26aad --- /dev/null +++ b/test/test_main.py @@ -0,0 +1,195 @@ +""" +测试 main.py 入口和 Flask 应用创建 +""" +import pytest +from pathlib import Path +import tempfile +import sys +import os + +# 添加项目根目录到路径 +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app +from app.settings import config + + +class TestAppCreation: + """测试应用创建""" + + def test_create_app_returns_flask_app(self): + """测试 create_app 返回 Flask 应用实例""" + app = create_app() + assert app is not None + assert app.name == 'main' + + def test_create_app_loads_config(self): + """测试应用加载全局配置""" + app = create_app() + assert 'MAX_CONTENT_LENGTH' in app.config + assert 'SEND_FILE_MAX_AGE_DEFAULT' in app.config + assert 'INPUT_DIR' in app.config + assert 'OUTPUT_DIR' in app.config + assert 'TASK_TIMEOUT' in app.config + assert 'API_PORT' in app.config + assert 'VIDEO_PORT' in app.config + + def test_create_app_config_values(self): + """测试配置值正确性""" + app = create_app() + assert app.config['MAX_CONTENT_LENGTH'] == 500 * 1024 * 1024 + assert app.config['SEND_FILE_MAX_AGE_DEFAULT'] == 300 + assert app.config['TASK_TIMEOUT'] == 600 + assert app.config['API_PORT'] == 5000 + assert app.config['VIDEO_PORT'] == 8086 + + def test_create_app_registers_cors(self): + """测试应用注册了 CORS""" + app = create_app() + # 检查是否注册了 OPTIONS 路由 + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/config' in rules + + +class TestIndexRoute: + """测试根路由""" + + def test_index_route_exists(self): + """测试根路由存在""" + app = create_app() + with app.test_client() as client: + response = client.get('/') + assert response.status_code == 200 + + def test_index_route_returns_success(self): + """测试根路由返回成功响应""" + app = create_app() + with app.test_client() as client: + response = client.get('/') + data = response.get_json() + assert data is not None + assert data['status'] == 'success' + assert 'API 服务运行中' in data['message'] + + def test_index_route_response_format(self): + """测试根路由响应格式""" + app = create_app() + with app.test_client() as client: + response = client.get('/') + data = response.get_json() + assert 'status' in data + assert 'message' in data + assert 'timestamp' in data + assert 'data' in data + + +class TestASRRoutes: + """测试 ASR 路由注册""" + + def test_asr_routes_registered(self): + """测试 ASR 路由已注册""" + app = create_app() + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/api/recognize' in rules + assert '/api/result' in rules + + def test_asr_recognize_route_methods(self): + """测试 ASR 识别路由方法""" + app = create_app() + for rule in app.url_map.iter_rules(): + if rule.rule == '/api/recognize': + assert rule.methods is not None and 'GET' in rule.methods + + def test_asr_result_route_methods(self): + """测试 ASR 结果路由方法""" + app = create_app() + for rule in app.url_map.iter_rules(): + if rule.rule == '/api/result': + assert rule.methods is not None and 'GET' in rule.methods + + +class TestTranscodeRoutes: + """测试转码路由注册""" + + def test_transcode_routes_registered(self): + """测试转码路由已注册""" + app = create_app() + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/api/convert' in rules + assert '/api/getVidUrl' in rules + + def test_transcode_convert_route_methods(self): + """测试转码路由方法""" + app = create_app() + for rule in app.url_map.iter_rules(): + if rule.rule == '/api/convert': + assert rule.methods is not None and 'GET' in rule.methods + + def test_transcode_getvidurl_route_methods(self): + """测试获取视频 URL 路由方法""" + app = create_app() + for rule in app.url_map.iter_rules(): + if rule.rule == '/api/getVidUrl': + assert rule.methods is not None and 'GET' in rule.methods + + +class TestCORS: + """测试 CORS 配置""" + + def test_cors_options_route_exists(self): + """测试 CORS OPTIONS 路由存在""" + app = create_app() + with app.test_client() as client: + response = client.options('/config') + assert response.status_code == 200 + + def test_cors_headers_in_response(self): + """测试响应包含 CORS 头""" + app = create_app() + with app.test_client() as client: + response = client.get('/', headers={'Origin': 'http://test.com'}) + assert 'Access-Control-Allow-Origin' in response.headers + assert 'Access-Control-Allow-Methods' in response.headers + assert 'Access-Control-Allow-Headers' in response.headers + assert 'Access-Control-Allow-Credentials' in response.headers + + +class TestConfigModule: + """测试配置模块""" + + def test_config_exists(self): + """测试配置存在""" + assert config is not None + assert isinstance(config, dict) + + def test_config_required_keys(self): + """测试配置包含必需的键""" + required_keys = [ + 'MAX_CONTENT_LENGTH', + 'SEND_FILE_MAX_AGE_DEFAULT', + 'INPUT_DIR', + 'OUTPUT_DIR', + 'TASK_TIMEOUT', + 'API_PORT', + 'VIDEO_PORT' + ] + for key in required_keys: + assert key in config, f"配置缺少必需的键:{key}" + + def test_config_port_values(self): + """测试端口配置值""" + assert isinstance(config['API_PORT'], int) + assert isinstance(config['VIDEO_PORT'], int) + assert config['API_PORT'] > 0 + assert config['VIDEO_PORT'] > 0 + + def test_config_timeout_value(self): + """测试超时配置值""" + assert isinstance(config['TASK_TIMEOUT'], int) + assert config['TASK_TIMEOUT'] > 0 + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/test/test_transcode_routes.py b/test/test_transcode_routes.py new file mode 100644 index 0000000..782e563 --- /dev/null +++ b/test/test_transcode_routes.py @@ -0,0 +1,255 @@ +""" +测试转码路由 +""" +import pytest +from pathlib import Path +import tempfile +import os +import sys + +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from main import create_app + + +class TestConvertRoute: + """测试视频转码路由""" + + def test_convert_missing_path_parameter(self): + """测试缺少 path 参数时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + assert 'path' in data['message'] + + def test_convert_with_empty_path(self): + """测试 path 为空时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert?path=') + assert response.status_code == 400 + data = response.get_json() + assert data['status'] == 'error' + + def test_convert_file_not_found(self, temp_dirs): + """测试文件不存在时返回 404""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + response = client.get('/api/convert?path=nonexistent.mp4') + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + assert '不存在' in data['message'] + + def test_convert_response_format(self, temp_dirs): + """测试转码响应格式""" + app = create_app() + app.config['TESTING'] = True + app.config['INPUT_DIR'] = str(temp_dirs['input']) + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + with app.test_client() as client: + response = client.get('/api/convert?path=test.mp4') + # 文件不存在时返回 404 + assert response.status_code in [200, 404, 500] + data = response.get_json() + assert 'status' in data + assert 'message' in data + + +class TestGetVidUrlRoute: + """测试获取视频 URL 路由""" + + def test_getvidurl_missing_path_parameter(self): + """测试缺少 path 参数时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/getVidUrl') + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + + def test_getvidurl_with_empty_path(self): + """测试 path 为空时返回错误""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/getVidUrl?path=') + # Path('') 在布尔检查中为 True,但后续检查会返回 404 + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + + def test_getvidurl_video_not_found(self, temp_dirs): + """测试视频不存在时返回 404""" + app = create_app() + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + # 创建转码输出目录 + transcode_dir = Path(temp_dirs['output']) / 'vid_h264' + transcode_dir.mkdir(parents=True, exist_ok=True) + + with app.test_client() as client: + response = client.get('/api/getVidUrl?path=nonexistent.mp4') + assert response.status_code == 404 + data = response.get_json() + assert data['status'] == 'error' + + def test_getvidurl_with_valid_video(self, temp_dirs): + """测试获取存在的视频 URL""" + # 使用 conftest 的 app fixture + from flask import Flask + import tempfile + + # 创建临时输出目录 + temp_output = tempfile.mkdtemp() + app = Flask(__name__) + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = temp_output + + # 注册转码路由 + from app.transcode.routes import register_transcode_routes + register_transcode_routes(app) + + # 创建模拟转码视频文件 + transcode_dir = Path(temp_output) / 'vid_h264' + transcode_dir.mkdir(parents=True, exist_ok=True) + video_file = transcode_dir / 'test_h264.mp4' + video_file.touch() + + with app.test_client() as client: + response = client.get('/api/getVidUrl?path=test.mp4') + assert response.status_code == 200 + data = response.get_json() + assert data['status'] == 'success' + assert 'data' in data + assert 'url' in data['data'] + assert 'test_h264.mp4' in data['data']['url'] + assert 'localhost:8086' in data['data']['url'] + + +class TestTranscodeCore: + """测试转码核心功能""" + + def test_transcode_core_import(self): + """测试转码核心模块可以导入""" + from app.transcode.core import convert_to_h264 + assert convert_to_h264 is not None + + def test_convert_to_h264_file_not_found(self, temp_dirs): + """测试文件不存在时抛出异常""" + from app.transcode.core import convert_to_h264 + + with pytest.raises(FileNotFoundError): + convert_to_h264( + input_root=temp_dirs['input'], + vid_full_name='nonexistent.mp4', + output_root=temp_dirs['output'] + ) + + def test_convert_to_h264_output_path_format(self, temp_dirs): + """测试输出路径格式""" + from app.transcode.core import convert_to_h264 + + # 创建一个假的输入文件 + input_file = temp_dirs['input'] / 'test.mp4' + input_file.touch() + + # 由于需要 ffmpeg,这里只测试路径处理逻辑 + # 实际转码会失败,但可以验证路径处理 + try: + output_path = convert_to_h264( + input_root=temp_dirs['input'], + vid_full_name='test.mp4', + output_root=temp_dirs['output'] + ) + # 如果成功,验证输出路径格式 + assert 'test_h264.mp4' in output_path + except Exception as e: + # 预期可能因为 ffmpeg 问题失败 + assert 'ffmpeg' in str(e).lower() or 'returncode' in str(e).lower() + + +class TestTranscodeOutputDirectory: + """测试转码输出目录""" + + def test_transcode_output_dir_created(self, temp_dirs): + """测试转码输出目录自动创建""" + from flask import Flask + + # 创建新 app 并配置 OUTPUT_DIR + app = Flask(__name__) + app.config['TESTING'] = True + app.config['OUTPUT_DIR'] = str(temp_dirs['output']) + + # 触发路由注册以创建输出目录 + from app.transcode.routes import register_transcode_routes + register_transcode_routes(app) + + transcode_dir = Path(temp_dirs['output']) / 'vid_h264' + assert transcode_dir.exists() + assert transcode_dir.is_dir() + + def test_transcode_output_dir_with_main_app(self): + """测试主应用创建时输出目录也会创建""" + # 使用 create_app 会自动注册路由并创建输出目录 + app = create_app() + + # 验证路由已注册 + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/api/convert' in rules + assert '/api/getVidUrl' in rules + + +class TestVideoFormats: + """测试支持的视频格式""" + + def test_mp4_format(self): + """测试 MP4 格式支持""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert?path=test.mp4') + # 文件不存在,但路由应该能处理 + assert response.status_code in [404, 500] + + def test_avi_format(self): + """测试 AVI 格式支持""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert?path=test.avi') + assert response.status_code in [404, 500] + + def test_mkv_format(self): + """测试 MKV 格式支持""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert?path=test.mkv') + assert response.status_code in [404, 500] + + def test_mov_format(self): + """测试 MOV 格式支持""" + app = create_app() + app.config['TESTING'] = True + with app.test_client() as client: + response = client.get('/api/convert?path=test.mov') + assert response.status_code in [404, 500] + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 0000000..6c7133a --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,225 @@ +""" +测试工具函数和配置 +""" +import pytest +from datetime import datetime +from pathlib import Path +import sys +import os + +project_root = Path(__file__).parent.parent.absolute() +sys.path.insert(0, str(project_root)) +os.chdir(project_root) + +from app.utils import make_response, register_cors +from app.settings import config + + +class TestMakeResponse: + """测试 make_response 工具函数""" + + def test_make_response_default_success(self): + """测试默认成功响应""" + response = make_response() + assert response['status'] == 'success' + assert response['message'] == '操作成功' + assert response['data'] == {} + assert response['errors'] == [] + assert 'timestamp' in response + + def test_make_response_custom_status(self): + """测试自定义状态""" + response = make_response(status='error') + assert response['status'] == 'error' + assert response['message'] == '操作失败' + + def test_make_response_custom_message(self): + """测试自定义消息""" + response = make_response(message='自定义消息') + assert response['message'] == '自定义消息' + + def test_make_response_with_data(self): + """测试带数据的响应""" + test_data = {'key': 'value', 'number': 42} + response = make_response(data=test_data) + assert response['data'] == test_data + + def test_make_response_with_errors(self): + """测试带错误的响应""" + errors = ['错误 1', '错误 2'] + response = make_response(errors=errors) + assert response['errors'] == errors + assert len(response['errors']) == 2 + + def test_make_response_with_extra(self): + """测试带额外信息的响应""" + extra = {'extra_key': 'extra_value'} + response = make_response(extra=extra) + assert response['extra_key'] == 'extra_value' + + def test_make_response_timestamp_format(self): + """测试时间戳格式""" + response = make_response() + timestamp = response['timestamp'] + # 验证 ISO 8601 格式 + assert 'T' in timestamp + assert 'Z' in timestamp + # 验证可以解析 + try: + datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + except ValueError: + pytest.fail("时间戳格式不正确") + + def test_make_response_combined(self): + """测试组合响应""" + response = make_response( + status='success', + message='测试消息', + data={'result': 'ok'}, + errors=[], + extra={'page': 1} + ) + assert response['status'] == 'success' + assert response['message'] == '测试消息' + assert response['data']['result'] == 'ok' + assert response['errors'] == [] + assert response['page'] == 1 + + +class TestRegisterCORS: + """测试 CORS 注册""" + + def test_register_cors_sets_headers(self): + """测试 CORS 设置正确的头""" + from flask import Flask + app = Flask(__name__) + register_cors(app) + + # 验证注册了 after_request 处理器 + assert len(app.after_request_funcs) > 0 + + def test_register_cors_options_route(self): + """测试 OPTIONS 路由注册""" + from flask import Flask + app = Flask(__name__) + register_cors(app) + + rules = [rule.rule for rule in app.url_map.iter_rules()] + assert '/config' in rules + + +class TestConfig: + """测试配置模块""" + + def test_config_is_dict(self): + """测试配置是字典""" + assert isinstance(config, dict) + + def test_config_max_content_length(self): + """测试最大内容长度配置""" + assert 'MAX_CONTENT_LENGTH' in config + assert config['MAX_CONTENT_LENGTH'] == 500 * 1024 * 1024 # 500MB + + def test_config_send_file_max_age(self): + """测试文件发送最大年龄配置""" + assert 'SEND_FILE_MAX_AGE_DEFAULT' in config + assert config['SEND_FILE_MAX_AGE_DEFAULT'] == 300 # 300 秒 + + def test_config_input_dir(self): + """测试输入目录配置""" + assert 'INPUT_DIR' in config + assert config['INPUT_DIR'] == 'input' + + def test_config_output_dir(self): + """测试输出目录配置""" + assert 'OUTPUT_DIR' in config + assert config['OUTPUT_DIR'] == 'output' + + def test_config_task_timeout(self): + """测试任务超时配置""" + assert 'TASK_TIMEOUT' in config + assert config['TASK_TIMEOUT'] == 600 # 600 秒 + + def test_config_api_port(self): + """测试 API 端口配置""" + assert 'API_PORT' in config + assert config['API_PORT'] == 5000 + assert isinstance(config['API_PORT'], int) + assert config['API_PORT'] > 0 + + def test_config_video_port(self): + """测试视频端口配置""" + assert 'VIDEO_PORT' in config + assert config['VIDEO_PORT'] == 8086 + assert isinstance(config['VIDEO_PORT'], int) + assert config['VIDEO_PORT'] > 0 + + def test_config_all_required_keys(self): + """测试所有必需的键""" + required_keys = [ + 'MAX_CONTENT_LENGTH', + 'SEND_FILE_MAX_AGE_DEFAULT', + 'INPUT_DIR', + 'OUTPUT_DIR', + 'TASK_TIMEOUT', + 'API_PORT', + 'VIDEO_PORT' + ] + for key in required_keys: + assert key in config, f"配置缺少必需的键:{key}" + + def test_config_no_extra_keys(self): + """测试没有多余的键(可选)""" + allowed_keys = [ + 'MAX_CONTENT_LENGTH', + 'SEND_FILE_MAX_AGE_DEFAULT', + 'INPUT_DIR', + 'OUTPUT_DIR', + 'TASK_TIMEOUT', + 'API_PORT', + 'VIDEO_PORT' + ] + for key in config.keys(): + assert key in allowed_keys, f"配置包含未预期的键:{key}" + + +class TestResponseFormat: + """测试响应格式规范""" + + def test_response_has_required_fields(self): + """测试响应包含必需字段""" + response = make_response() + required_fields = ['status', 'data', 'errors', 'message', 'timestamp'] + for field in required_fields: + assert field in response, f"响应缺少必需字段:{field}" + + def test_response_status_values(self): + """测试响应状态值""" + valid_statuses = ['success', 'error'] + for status in valid_statuses: + response = make_response(status=status) + assert response['status'] == status + + def test_response_data_is_dict(self): + """测试数据字段是字典""" + response = make_response() + assert isinstance(response['data'], dict) + + def test_response_errors_is_list(self): + """测试错误字段是列表""" + response = make_response() + assert isinstance(response['errors'], list) + + def test_response_message_is_string(self): + """测试消息字段是字符串""" + response = make_response() + assert isinstance(response['message'], str) + + def test_response_timestamp_is_string(self): + """测试时间戳字段是字符串""" + response = make_response() + assert isinstance(response['timestamp'], str) + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])