SpeechRecognition/test/real/test_real_e2e.py

532 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
真实端到端测试 - 带超时保护
测试完整的语音识别和转码流程,使用真实的视频/音频文件
"""
import pytest
import json
import time
import os
import sys
from pathlib import Path
from datetime import datetime
import yaml
project_root = Path(__file__).parent.parent.parent.absolute()
sys.path.insert(0, str(project_root))
os.chdir(project_root)
from main import create_app
from app.settings import config
def load_test_config():
"""加载测试配置文件"""
config_path = Path(__file__).parent / "test_config.yaml"
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
TEST_CONFIG = load_test_config()
@pytest.fixture(scope='session')
def e2e_config():
"""端到端测试配置"""
return TEST_CONFIG
@pytest.fixture(scope='session')
def test_video_file(e2e_config):
"""获取测试视频文件路径"""
video_path = project_root / e2e_config['test_files']['primary_video']
if not video_path.exists():
for backup in e2e_config['test_files']['backup_videos']:
backup_path = project_root / backup
if backup_path.exists():
print(f"\n使用备用视频文件:{backup_path.name}")
return backup_path
pytest.skip(f"测试视频文件不存在:{video_path}")
file_size_mb = video_path.stat().st_size / 1024 / 1024
print(f"\n使用测试视频:{video_path.name} ({file_size_mb:.2f} MB)")
return video_path
@pytest.fixture(scope='session')
def test_audio_file(e2e_config):
"""获取测试音频文件路径"""
for audio_path_str in e2e_config['test_files']['audio_files']:
audio_path = project_root / audio_path_str
if audio_path.exists():
file_size_mb = audio_path.stat().st_size / 1024 / 1024
print(f"\n使用测试音频:{audio_path.name} ({file_size_mb:.2f} MB)")
return audio_path
pytest.skip("测试音频文件不存在")
@pytest.fixture(scope='function')
def e2e_app():
"""创建用于端到端测试的 Flask 应用"""
app = create_app()
app.config['TESTING'] = True
app.config['TASK_TIMEOUT'] = TEST_CONFIG['timeouts']['asr_recognize']
yield app
class TestVideoFileValidation:
"""测试视频文件验证"""
def test_primary_video_file_exists(self, test_video_file):
"""测试主视频文件存在"""
assert test_video_file.exists(), f"视频文件不存在:{test_video_file}"
assert test_video_file.stat().st_size > 0, "视频文件大小为 0"
def test_video_file_size_within_limit(self, test_video_file, e2e_config):
"""测试视频文件大小在限制范围内"""
max_size = e2e_config['environment']['max_file_size_mb'] * 1024 * 1024
file_size = test_video_file.stat().st_size
assert file_size <= max_size, f"视频文件过大:{file_size / 1024 / 1024:.2f}MB"
def test_video_file_format(self, test_video_file):
"""测试视频文件格式"""
supported_formats = ['.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv', '.m4v']
assert test_video_file.suffix.lower() in supported_formats, \
f"不支持的视频格式:{test_video_file.suffix}"
class TestAudioFileValidation:
"""测试音频文件验证"""
def test_audio_file_exists(self, test_audio_file):
"""测试音频文件存在"""
assert test_audio_file.exists(), f"音频文件不存在:{test_audio_file}"
assert test_audio_file.stat().st_size > 0, "音频文件大小为 0"
class TestASRRecognition:
"""测试 ASR 语音识别完整流程"""
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60)
def test_recognize_audio_file(self, e2e_app, test_audio_file):
"""测试真实音频文件的语音识别"""
print(f"\n开始测试音频识别:{test_audio_file.name}")
start_time = time.time()
app = e2e_app
timeout = TEST_CONFIG['timeouts']['asr_recognize']
with app.test_client() as client:
response = client.get(
f'/api/recognize?path={test_audio_file.name}'
)
elapsed = time.time() - start_time
print(f"音频识别耗时:{elapsed:.2f}")
assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}"
assert response.status_code == 200, \
f"识别失败:{response.status_code} - {response.get_json()}"
data = response.get_json()
assert data['status'] == 'success', \
f"识别返回错误:{data}"
assert 'data' in data
assert 'task_id' in data['data']
print(f"✓ 音频识别成功task_id: {data['data']['task_id']}")
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60)
def test_recognize_video_file(self, e2e_app, test_video_file):
"""测试真实视频文件的语音识别"""
print(f"\n开始测试视频识别:{test_video_file.name}")
start_time = time.time()
app = e2e_app
timeout = TEST_CONFIG['timeouts']['asr_recognize']
with app.test_client() as client:
response = client.get(
f'/api/recognize?path={test_video_file.name}'
)
elapsed = time.time() - start_time
print(f"视频识别耗时:{elapsed:.2f}")
assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}"
assert response.status_code == 200, \
f"识别失败:{response.status_code} - {response.get_json()}"
data = response.get_json()
assert data['status'] == 'success'
print(f"✓ 视频识别成功")
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(60)
def test_get_recognition_result(self, e2e_app, test_audio_file):
"""测试获取语音识别结果"""
print(f"\n开始获取识别结果:{test_audio_file.name}")
app = e2e_app
timeout = 60
with app.test_client() as client:
start_time = time.time()
response = client.get(
f'/api/result?path={test_audio_file.name}'
)
elapsed = time.time() - start_time
print(f"获取结果耗时:{elapsed:.2f}")
assert elapsed < timeout, f"获取结果超时:{elapsed:.2f}"
assert response.status_code == 200, \
f"获取结果失败:{response.status_code} - {response.get_json()}"
data = response.get_json()
assert data['status'] == 'success'
assert 'data' in data
if 'sentences' in data['data']:
sentences = data['data']['sentences']
print(f"✓ 识别到 {len(sentences)} 个句子")
assert isinstance(sentences, list)
if len(sentences) > 0:
first_sentence = sentences[0]
print(f" 第一句:[{first_sentence.get('speaker', 'N/A')}] {first_sentence.get('text', '')[:50]}")
assert 'speaker' in first_sentence
assert 'text' in first_sentence
assert 'begin_time' in first_sentence
assert 'end_time' in first_sentence
class TestSpeakerDiarization:
"""测试说话人分离功能"""
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(60)
def test_speaker_diarization_result(self, e2e_app, test_audio_file):
"""测试说话人分离结果"""
print(f"\n测试说话人分离:{test_audio_file.name}")
app = e2e_app
with app.test_client() as client:
response = client.get(
f'/api/result?path={test_audio_file.name}'
)
if response.status_code == 200:
data = response.get_json()
if 'data' in data and 'sentences' in data['data']:
sentences = data['data']['sentences']
if len(sentences) > 0:
speakers = set()
for sentence in sentences:
if 'speaker' in sentence:
speakers.add(sentence['speaker'])
print(f"✓ 识别到 {len(speakers)} 个说话人:{speakers}")
min_speakers = TEST_CONFIG['validation']['min_speakers']
max_speakers = TEST_CONFIG['validation']['max_speakers']
assert len(speakers) >= min_speakers, \
f"说话人数量过少:{len(speakers)}"
assert len(speakers) <= max_speakers, \
f"说话人数量过多:{len(speakers)}"
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(60)
def test_speaker_timestamp_accuracy(self, e2e_app, test_audio_file):
"""测试说话人时间戳准确性"""
print(f"\n测试时间戳准确性:{test_audio_file.name}")
app = e2e_app
with app.test_client() as client:
response = client.get(
f'/api/result?path={test_audio_file.name}'
)
if response.status_code == 200:
data = response.get_json()
if 'data' in data and 'sentences' in data['data']:
sentences = data['data']['sentences']
valid_count = 0
for sentence in sentences:
if all(k in sentence for k in ['begin_time', 'end_time']):
begin = sentence['begin_time']
end = sentence['end_time']
assert begin >= 0, f"开始时间为负:{begin}"
assert end > begin, f"结束时间无效begin={begin}, end={end}"
precision = TEST_CONFIG['validation']['timestamp_precision']
duration = end - begin
assert duration >= precision, \
f"时间戳精度不足:{duration}"
valid_count += 1
print(f"✓ 验证 {valid_count} 个有效时间戳")
class TestVideoTranscoding:
"""测试视频转码功能"""
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(TEST_CONFIG['timeouts']['transcode'] + 60)
@pytest.mark.requires_ffmpeg
def test_transcode_video(self, e2e_app, test_video_file):
"""测试真实视频转码"""
print(f"\n测试视频转码:{test_video_file.name}")
start_time = time.time()
app = e2e_app
timeout = TEST_CONFIG['timeouts']['transcode']
with app.test_client() as client:
response = client.get(
f'/api/convert?path={test_video_file.name}'
)
elapsed = time.time() - start_time
print(f"转码耗时:{elapsed:.2f}")
assert elapsed < timeout, f"转码超时:{elapsed:.2f}秒 > {timeout}"
assert response.status_code in [200, 404, 500], \
f"转码请求失败:{response.status_code}"
if response.status_code == 200:
data = response.get_json()
assert data['status'] == 'success'
print(f"✓ 转码成功")
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(60)
@pytest.mark.requires_ffmpeg
def test_get_transcoded_video_url(self, e2e_app, test_video_file):
"""测试获取转码后视频 URL"""
print(f"\n测试获取转码视频 URL{test_video_file.name}")
app = e2e_app
with app.test_client() as client:
response = client.get(
f'/api/getVidUrl?path={test_video_file.name}'
)
assert response.status_code in [200, 404], \
f"获取 URL 失败:{response.status_code}"
if response.status_code == 200:
data = response.get_json()
assert data['status'] == 'success'
assert 'data' in data
assert 'url' in data['data']
print(f"✓ 获取 URL 成功:{data['data']['url'][:50]}...")
class TestPerformance:
"""测试性能指标"""
@pytest.mark.real
@pytest.mark.performance
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_asr_processing_time(self, e2e_app, test_audio_file):
"""测试 ASR 处理时间"""
print(f"\n性能测试ASR 处理时间")
app = e2e_app
max_time = TEST_CONFIG['performance']['max_asr_time']
with app.test_client() as client:
start_time = time.time()
response = client.get(
f'/api/recognize?path={test_audio_file.name}'
)
elapsed = time.time() - start_time
print(f"ASR 处理时间:{elapsed:.2f}秒(最大允许:{max_time}秒)")
assert response.status_code == 200
assert elapsed < max_time, \
f"ASR 处理时间过长:{elapsed:.2f}秒 > {max_time}"
@pytest.mark.real
@pytest.mark.performance
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_realtime_factor(self, e2e_app, test_audio_file):
"""测试实时处理率RTF"""
print(f"\n性能测试:实时处理率")
app = e2e_app
min_speed = TEST_CONFIG['performance']['min_processing_speed']
with app.test_client() as client:
start_time = time.time()
response = client.get(
f'/api/recognize?path={test_audio_file.name}'
)
elapsed = time.time() - start_time
if response.status_code == 200:
audio_duration = self._get_audio_duration(test_audio_file)
if audio_duration > 0:
rtf = elapsed / audio_duration
processing_speed = 1 / rtf if rtf > 0 else float('inf')
print(f"音频时长:{audio_duration:.2f}秒,处理时间:{elapsed:.2f}")
print(f"实时处理率:{processing_speed:.2f}x (RTF: {rtf:.2f})")
assert processing_speed >= min_speed, \
f"处理速度过慢:{processing_speed:.2f}x < {min_speed}x"
def _get_audio_duration(self, audio_path):
"""获取音频文件时长(秒)"""
try:
import librosa
y, sr = librosa.load(str(audio_path), sr=None)
return len(y) / sr
except Exception as e:
print(f"无法获取音频时长:{e}")
return 0
class TestErrorHandling:
"""测试错误处理"""
@pytest.mark.real
@pytest.mark.timeout(30)
def test_nonexistent_file_error(self, e2e_app):
"""测试文件不存在错误处理"""
print(f"\n测试错误处理:文件不存在")
app = e2e_app
with app.test_client() as client:
response = client.get('/api/recognize?path=nonexistent.wav')
assert response.status_code in [400, 404, 500]
print(f"✓ 正确返回错误:{response.status_code}")
if response.status_code == 200:
data = response.get_json()
assert data['status'] == 'error'
@pytest.mark.real
@pytest.mark.timeout(30)
def test_missing_parameter_error(self, e2e_app):
"""测试缺少参数错误处理"""
print(f"\n测试错误处理:缺少参数")
app = e2e_app
with app.test_client() as client:
response = client.get('/api/recognize')
assert response.status_code == 400
data = response.get_json()
assert data['status'] == 'error'
assert 'message' in data
print(f"✓ 正确返回参数错误")
@pytest.mark.real
@pytest.mark.timeout(30)
@pytest.mark.requires_ffmpeg
def test_transcode_nonexistent_file(self, e2e_app):
"""测试转码不存在的文件"""
print(f"\n测试错误处理:转码文件不存在")
app = e2e_app
with app.test_client() as client:
response = client.get('/api/convert?path=nonexistent.mp4')
assert response.status_code == 404
data = response.get_json()
assert data['status'] == 'error'
print(f"✓ 正确返回 404 错误")
class TestAPIResponseFormat:
"""测试 API 响应格式"""
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(300) # 5 分钟超时,包括模型加载时间
def test_recognize_response_format(self, e2e_app, test_audio_file):
"""测试识别接口响应格式"""
print(f"\n测试响应格式:识别接口")
app = e2e_app
with app.test_client() as client:
response = client.get(
f'/api/recognize?path={test_audio_file.name}'
)
if response.status_code == 200:
data = response.get_json()
assert 'status' in data
assert 'message' in data
assert 'timestamp' in data
assert 'data' in data
assert isinstance(data['data'], dict)
print(f"✓ 响应格式正确")
@pytest.mark.real
@pytest.mark.slow
@pytest.mark.timeout(300) # 5 分钟超时
def test_result_response_format(self, e2e_app, test_audio_file):
"""测试结果接口响应格式"""
print(f"\n测试响应格式:结果接口")
app = e2e_app
with app.test_client() as client:
response = client.get(
f'/api/result?path={test_audio_file.name}'
)
if response.status_code == 200:
data = response.get_json()
assert 'status' in data
assert 'message' in data
assert 'timestamp' in data
if 'data' in data:
result_data = data['data']
assert isinstance(result_data, dict)
print(f"✓ 结果响应格式正确")
if __name__ == '__main__':
pytest.main([__file__, '-v', '-m', 'real', '--tb=short'])