""" 真实端到端测试 - 带超时保护 测试完整的语音识别和转码流程,使用真实的视频/音频文件 """ import pytest import json import time import os import sys from pathlib import Path from datetime import datetime import yaml project_root = Path(__file__).parent.parent.parent.absolute() sys.path.insert(0, str(project_root)) os.chdir(project_root) from main import create_app from app.settings import config def load_test_config(): """加载测试配置文件""" config_path = Path(__file__).parent / "test_config.yaml" with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) TEST_CONFIG = load_test_config() @pytest.fixture(scope='session') def e2e_config(): """端到端测试配置""" return TEST_CONFIG @pytest.fixture(scope='session') def test_video_file(e2e_config): """获取测试视频文件路径""" video_path = project_root / e2e_config['test_files']['primary_video'] if not video_path.exists(): for backup in e2e_config['test_files']['backup_videos']: backup_path = project_root / backup if backup_path.exists(): print(f"\n使用备用视频文件:{backup_path.name}") return backup_path pytest.skip(f"测试视频文件不存在:{video_path}") file_size_mb = video_path.stat().st_size / 1024 / 1024 print(f"\n使用测试视频:{video_path.name} ({file_size_mb:.2f} MB)") return video_path @pytest.fixture(scope='session') def test_audio_file(e2e_config): """获取测试音频文件路径""" for audio_path_str in e2e_config['test_files']['audio_files']: audio_path = project_root / audio_path_str if audio_path.exists(): file_size_mb = audio_path.stat().st_size / 1024 / 1024 print(f"\n使用测试音频:{audio_path.name} ({file_size_mb:.2f} MB)") return audio_path pytest.skip("测试音频文件不存在") @pytest.fixture(scope='function') def e2e_app(): """创建用于端到端测试的 Flask 应用""" app = create_app() app.config['TESTING'] = True app.config['TASK_TIMEOUT'] = TEST_CONFIG['timeouts']['asr_recognize'] yield app class TestVideoFileValidation: """测试视频文件验证""" def test_primary_video_file_exists(self, test_video_file): """测试主视频文件存在""" assert test_video_file.exists(), f"视频文件不存在:{test_video_file}" assert test_video_file.stat().st_size > 0, "视频文件大小为 0" def test_video_file_size_within_limit(self, test_video_file, e2e_config): """测试视频文件大小在限制范围内""" max_size = e2e_config['environment']['max_file_size_mb'] * 1024 * 1024 file_size = test_video_file.stat().st_size assert file_size <= max_size, f"视频文件过大:{file_size / 1024 / 1024:.2f}MB" def test_video_file_format(self, test_video_file): """测试视频文件格式""" supported_formats = ['.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv', '.m4v'] assert test_video_file.suffix.lower() in supported_formats, \ f"不支持的视频格式:{test_video_file.suffix}" class TestAudioFileValidation: """测试音频文件验证""" def test_audio_file_exists(self, test_audio_file): """测试音频文件存在""" assert test_audio_file.exists(), f"音频文件不存在:{test_audio_file}" assert test_audio_file.stat().st_size > 0, "音频文件大小为 0" class TestASRRecognition: """测试 ASR 语音识别完整流程""" @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60) def test_recognize_audio_file(self, e2e_app, test_audio_file): """测试真实音频文件的语音识别""" print(f"\n开始测试音频识别:{test_audio_file.name}") start_time = time.time() app = e2e_app timeout = TEST_CONFIG['timeouts']['asr_recognize'] with app.test_client() as client: response = client.get( f'/api/recognize?path={test_audio_file.name}' ) elapsed = time.time() - start_time print(f"音频识别耗时:{elapsed:.2f}秒") assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒" assert response.status_code == 200, \ f"识别失败:{response.status_code} - {response.get_json()}" data = response.get_json() assert data['status'] == 'success', \ f"识别返回错误:{data}" assert 'data' in data assert 'task_id' in data['data'] print(f"✓ 音频识别成功,task_id: {data['data']['task_id']}") @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60) def test_recognize_video_file(self, e2e_app, test_video_file): """测试真实视频文件的语音识别""" print(f"\n开始测试视频识别:{test_video_file.name}") start_time = time.time() app = e2e_app timeout = TEST_CONFIG['timeouts']['asr_recognize'] with app.test_client() as client: response = client.get( f'/api/recognize?path={test_video_file.name}' ) elapsed = time.time() - start_time print(f"视频识别耗时:{elapsed:.2f}秒") assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒" assert response.status_code == 200, \ f"识别失败:{response.status_code} - {response.get_json()}" data = response.get_json() assert data['status'] == 'success' print(f"✓ 视频识别成功") @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(60) def test_get_recognition_result(self, e2e_app, test_audio_file): """测试获取语音识别结果""" print(f"\n开始获取识别结果:{test_audio_file.name}") app = e2e_app timeout = 60 with app.test_client() as client: start_time = time.time() response = client.get( f'/api/result?path={test_audio_file.name}' ) elapsed = time.time() - start_time print(f"获取结果耗时:{elapsed:.2f}秒") assert elapsed < timeout, f"获取结果超时:{elapsed:.2f}秒" assert response.status_code == 200, \ f"获取结果失败:{response.status_code} - {response.get_json()}" data = response.get_json() assert data['status'] == 'success' assert 'data' in data if 'sentences' in data['data']: sentences = data['data']['sentences'] print(f"✓ 识别到 {len(sentences)} 个句子") assert isinstance(sentences, list) if len(sentences) > 0: first_sentence = sentences[0] print(f" 第一句:[{first_sentence.get('speaker', 'N/A')}] {first_sentence.get('text', '')[:50]}") assert 'speaker' in first_sentence assert 'text' in first_sentence assert 'begin_time' in first_sentence assert 'end_time' in first_sentence class TestSpeakerDiarization: """测试说话人分离功能""" @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(60) def test_speaker_diarization_result(self, e2e_app, test_audio_file): """测试说话人分离结果""" print(f"\n测试说话人分离:{test_audio_file.name}") app = e2e_app with app.test_client() as client: response = client.get( f'/api/result?path={test_audio_file.name}' ) if response.status_code == 200: data = response.get_json() if 'data' in data and 'sentences' in data['data']: sentences = data['data']['sentences'] if len(sentences) > 0: speakers = set() for sentence in sentences: if 'speaker' in sentence: speakers.add(sentence['speaker']) print(f"✓ 识别到 {len(speakers)} 个说话人:{speakers}") min_speakers = TEST_CONFIG['validation']['min_speakers'] max_speakers = TEST_CONFIG['validation']['max_speakers'] assert len(speakers) >= min_speakers, \ f"说话人数量过少:{len(speakers)}" assert len(speakers) <= max_speakers, \ f"说话人数量过多:{len(speakers)}" @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(60) def test_speaker_timestamp_accuracy(self, e2e_app, test_audio_file): """测试说话人时间戳准确性""" print(f"\n测试时间戳准确性:{test_audio_file.name}") app = e2e_app with app.test_client() as client: response = client.get( f'/api/result?path={test_audio_file.name}' ) if response.status_code == 200: data = response.get_json() if 'data' in data and 'sentences' in data['data']: sentences = data['data']['sentences'] valid_count = 0 for sentence in sentences: if all(k in sentence for k in ['begin_time', 'end_time']): begin = sentence['begin_time'] end = sentence['end_time'] assert begin >= 0, f"开始时间为负:{begin}" assert end > begin, f"结束时间无效:begin={begin}, end={end}" precision = TEST_CONFIG['validation']['timestamp_precision'] duration = end - begin assert duration >= precision, \ f"时间戳精度不足:{duration}" valid_count += 1 print(f"✓ 验证 {valid_count} 个有效时间戳") class TestVideoTranscoding: """测试视频转码功能""" @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(TEST_CONFIG['timeouts']['transcode'] + 60) @pytest.mark.requires_ffmpeg def test_transcode_video(self, e2e_app, test_video_file): """测试真实视频转码""" print(f"\n测试视频转码:{test_video_file.name}") start_time = time.time() app = e2e_app timeout = TEST_CONFIG['timeouts']['transcode'] with app.test_client() as client: response = client.get( f'/api/convert?path={test_video_file.name}' ) elapsed = time.time() - start_time print(f"转码耗时:{elapsed:.2f}秒") assert elapsed < timeout, f"转码超时:{elapsed:.2f}秒 > {timeout}秒" assert response.status_code in [200, 404, 500], \ f"转码请求失败:{response.status_code}" if response.status_code == 200: data = response.get_json() assert data['status'] == 'success' print(f"✓ 转码成功") @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(60) @pytest.mark.requires_ffmpeg def test_get_transcoded_video_url(self, e2e_app, test_video_file): """测试获取转码后视频 URL""" print(f"\n测试获取转码视频 URL:{test_video_file.name}") app = e2e_app with app.test_client() as client: response = client.get( f'/api/getVidUrl?path={test_video_file.name}' ) assert response.status_code in [200, 404], \ f"获取 URL 失败:{response.status_code}" if response.status_code == 200: data = response.get_json() assert data['status'] == 'success' assert 'data' in data assert 'url' in data['data'] print(f"✓ 获取 URL 成功:{data['data']['url'][:50]}...") class TestPerformance: """测试性能指标""" @pytest.mark.real @pytest.mark.performance @pytest.mark.slow @pytest.mark.timeout(300) def test_asr_processing_time(self, e2e_app, test_audio_file): """测试 ASR 处理时间""" print(f"\n性能测试:ASR 处理时间") app = e2e_app max_time = TEST_CONFIG['performance']['max_asr_time'] with app.test_client() as client: start_time = time.time() response = client.get( f'/api/recognize?path={test_audio_file.name}' ) elapsed = time.time() - start_time print(f"ASR 处理时间:{elapsed:.2f}秒(最大允许:{max_time}秒)") assert response.status_code == 200 assert elapsed < max_time, \ f"ASR 处理时间过长:{elapsed:.2f}秒 > {max_time}秒" @pytest.mark.real @pytest.mark.performance @pytest.mark.slow @pytest.mark.timeout(300) def test_realtime_factor(self, e2e_app, test_audio_file): """测试实时处理率(RTF)""" print(f"\n性能测试:实时处理率") app = e2e_app min_speed = TEST_CONFIG['performance']['min_processing_speed'] with app.test_client() as client: start_time = time.time() response = client.get( f'/api/recognize?path={test_audio_file.name}' ) elapsed = time.time() - start_time if response.status_code == 200: audio_duration = self._get_audio_duration(test_audio_file) if audio_duration > 0: rtf = elapsed / audio_duration processing_speed = 1 / rtf if rtf > 0 else float('inf') print(f"音频时长:{audio_duration:.2f}秒,处理时间:{elapsed:.2f}秒") print(f"实时处理率:{processing_speed:.2f}x (RTF: {rtf:.2f})") assert processing_speed >= min_speed, \ f"处理速度过慢:{processing_speed:.2f}x < {min_speed}x" def _get_audio_duration(self, audio_path): """获取音频文件时长(秒)""" try: import librosa y, sr = librosa.load(str(audio_path), sr=None) return len(y) / sr except Exception as e: print(f"无法获取音频时长:{e}") return 0 class TestErrorHandling: """测试错误处理""" @pytest.mark.real @pytest.mark.timeout(30) def test_nonexistent_file_error(self, e2e_app): """测试文件不存在错误处理""" print(f"\n测试错误处理:文件不存在") app = e2e_app with app.test_client() as client: response = client.get('/api/recognize?path=nonexistent.wav') assert response.status_code in [400, 404, 500] print(f"✓ 正确返回错误:{response.status_code}") if response.status_code == 200: data = response.get_json() assert data['status'] == 'error' @pytest.mark.real @pytest.mark.timeout(30) def test_missing_parameter_error(self, e2e_app): """测试缺少参数错误处理""" print(f"\n测试错误处理:缺少参数") app = e2e_app with app.test_client() as client: response = client.get('/api/recognize') assert response.status_code == 400 data = response.get_json() assert data['status'] == 'error' assert 'message' in data print(f"✓ 正确返回参数错误") @pytest.mark.real @pytest.mark.timeout(30) @pytest.mark.requires_ffmpeg def test_transcode_nonexistent_file(self, e2e_app): """测试转码不存在的文件""" print(f"\n测试错误处理:转码文件不存在") app = e2e_app with app.test_client() as client: response = client.get('/api/convert?path=nonexistent.mp4') assert response.status_code == 404 data = response.get_json() assert data['status'] == 'error' print(f"✓ 正确返回 404 错误") class TestAPIResponseFormat: """测试 API 响应格式""" @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(300) # 5 分钟超时,包括模型加载时间 def test_recognize_response_format(self, e2e_app, test_audio_file): """测试识别接口响应格式""" print(f"\n测试响应格式:识别接口") app = e2e_app with app.test_client() as client: response = client.get( f'/api/recognize?path={test_audio_file.name}' ) if response.status_code == 200: data = response.get_json() assert 'status' in data assert 'message' in data assert 'timestamp' in data assert 'data' in data assert isinstance(data['data'], dict) print(f"✓ 响应格式正确") @pytest.mark.real @pytest.mark.slow @pytest.mark.timeout(300) # 5 分钟超时 def test_result_response_format(self, e2e_app, test_audio_file): """测试结果接口响应格式""" print(f"\n测试响应格式:结果接口") app = e2e_app with app.test_client() as client: response = client.get( f'/api/result?path={test_audio_file.name}' ) if response.status_code == 200: data = response.get_json() assert 'status' in data assert 'message' in data assert 'timestamp' in data if 'data' in data: result_data = data['data'] assert isinstance(result_data, dict) print(f"✓ 结果响应格式正确") if __name__ == '__main__': pytest.main([__file__, '-v', '-m', 'real', '--tb=short'])