532 lines
18 KiB
Python
532 lines
18 KiB
Python
"""
|
||
真实端到端测试 - 带超时保护
|
||
测试完整的语音识别和转码流程,使用真实的视频/音频文件
|
||
"""
|
||
import pytest
|
||
import json
|
||
import time
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
import yaml
|
||
|
||
project_root = Path(__file__).parent.parent.parent.absolute()
|
||
sys.path.insert(0, str(project_root))
|
||
os.chdir(project_root)
|
||
|
||
from main import create_app
|
||
from app.settings import config
|
||
|
||
|
||
def load_test_config():
|
||
"""加载测试配置文件"""
|
||
config_path = Path(__file__).parent / "test_config.yaml"
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f)
|
||
|
||
|
||
TEST_CONFIG = load_test_config()
|
||
|
||
|
||
@pytest.fixture(scope='session')
|
||
def e2e_config():
|
||
"""端到端测试配置"""
|
||
return TEST_CONFIG
|
||
|
||
|
||
@pytest.fixture(scope='session')
|
||
def test_video_file(e2e_config):
|
||
"""获取测试视频文件路径"""
|
||
video_path = project_root / e2e_config['test_files']['primary_video']
|
||
|
||
if not video_path.exists():
|
||
for backup in e2e_config['test_files']['backup_videos']:
|
||
backup_path = project_root / backup
|
||
if backup_path.exists():
|
||
print(f"\n使用备用视频文件:{backup_path.name}")
|
||
return backup_path
|
||
pytest.skip(f"测试视频文件不存在:{video_path}")
|
||
|
||
file_size_mb = video_path.stat().st_size / 1024 / 1024
|
||
print(f"\n使用测试视频:{video_path.name} ({file_size_mb:.2f} MB)")
|
||
return video_path
|
||
|
||
|
||
@pytest.fixture(scope='session')
|
||
def test_audio_file(e2e_config):
|
||
"""获取测试音频文件路径"""
|
||
for audio_path_str in e2e_config['test_files']['audio_files']:
|
||
audio_path = project_root / audio_path_str
|
||
if audio_path.exists():
|
||
file_size_mb = audio_path.stat().st_size / 1024 / 1024
|
||
print(f"\n使用测试音频:{audio_path.name} ({file_size_mb:.2f} MB)")
|
||
return audio_path
|
||
pytest.skip("测试音频文件不存在")
|
||
|
||
|
||
@pytest.fixture(scope='function')
|
||
def e2e_app():
|
||
"""创建用于端到端测试的 Flask 应用"""
|
||
app = create_app()
|
||
app.config['TESTING'] = True
|
||
app.config['TASK_TIMEOUT'] = TEST_CONFIG['timeouts']['asr_recognize']
|
||
yield app
|
||
|
||
|
||
class TestVideoFileValidation:
|
||
"""测试视频文件验证"""
|
||
|
||
def test_primary_video_file_exists(self, test_video_file):
|
||
"""测试主视频文件存在"""
|
||
assert test_video_file.exists(), f"视频文件不存在:{test_video_file}"
|
||
assert test_video_file.stat().st_size > 0, "视频文件大小为 0"
|
||
|
||
def test_video_file_size_within_limit(self, test_video_file, e2e_config):
|
||
"""测试视频文件大小在限制范围内"""
|
||
max_size = e2e_config['environment']['max_file_size_mb'] * 1024 * 1024
|
||
file_size = test_video_file.stat().st_size
|
||
assert file_size <= max_size, f"视频文件过大:{file_size / 1024 / 1024:.2f}MB"
|
||
|
||
def test_video_file_format(self, test_video_file):
|
||
"""测试视频文件格式"""
|
||
supported_formats = ['.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv', '.m4v']
|
||
assert test_video_file.suffix.lower() in supported_formats, \
|
||
f"不支持的视频格式:{test_video_file.suffix}"
|
||
|
||
|
||
class TestAudioFileValidation:
|
||
"""测试音频文件验证"""
|
||
|
||
def test_audio_file_exists(self, test_audio_file):
|
||
"""测试音频文件存在"""
|
||
assert test_audio_file.exists(), f"音频文件不存在:{test_audio_file}"
|
||
assert test_audio_file.stat().st_size > 0, "音频文件大小为 0"
|
||
|
||
|
||
class TestASRRecognition:
|
||
"""测试 ASR 语音识别完整流程"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60)
|
||
def test_recognize_audio_file(self, e2e_app, test_audio_file):
|
||
"""测试真实音频文件的语音识别"""
|
||
print(f"\n开始测试音频识别:{test_audio_file.name}")
|
||
start_time = time.time()
|
||
|
||
app = e2e_app
|
||
timeout = TEST_CONFIG['timeouts']['asr_recognize']
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/recognize?path={test_audio_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"音频识别耗时:{elapsed:.2f}秒")
|
||
|
||
assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒"
|
||
|
||
assert response.status_code == 200, \
|
||
f"识别失败:{response.status_code} - {response.get_json()}"
|
||
|
||
data = response.get_json()
|
||
assert data['status'] == 'success', \
|
||
f"识别返回错误:{data}"
|
||
assert 'data' in data
|
||
assert 'task_id' in data['data']
|
||
print(f"✓ 音频识别成功,task_id: {data['data']['task_id']}")
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(TEST_CONFIG['timeouts']['asr_recognize'] + 60)
|
||
def test_recognize_video_file(self, e2e_app, test_video_file):
|
||
"""测试真实视频文件的语音识别"""
|
||
print(f"\n开始测试视频识别:{test_video_file.name}")
|
||
start_time = time.time()
|
||
|
||
app = e2e_app
|
||
timeout = TEST_CONFIG['timeouts']['asr_recognize']
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/recognize?path={test_video_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"视频识别耗时:{elapsed:.2f}秒")
|
||
|
||
assert elapsed < timeout, f"ASR 识别超时:{elapsed:.2f}秒 > {timeout}秒"
|
||
|
||
assert response.status_code == 200, \
|
||
f"识别失败:{response.status_code} - {response.get_json()}"
|
||
|
||
data = response.get_json()
|
||
assert data['status'] == 'success'
|
||
print(f"✓ 视频识别成功")
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(60)
|
||
def test_get_recognition_result(self, e2e_app, test_audio_file):
|
||
"""测试获取语音识别结果"""
|
||
print(f"\n开始获取识别结果:{test_audio_file.name}")
|
||
|
||
app = e2e_app
|
||
timeout = 60
|
||
|
||
with app.test_client() as client:
|
||
start_time = time.time()
|
||
|
||
response = client.get(
|
||
f'/api/result?path={test_audio_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"获取结果耗时:{elapsed:.2f}秒")
|
||
|
||
assert elapsed < timeout, f"获取结果超时:{elapsed:.2f}秒"
|
||
|
||
assert response.status_code == 200, \
|
||
f"获取结果失败:{response.status_code} - {response.get_json()}"
|
||
|
||
data = response.get_json()
|
||
assert data['status'] == 'success'
|
||
assert 'data' in data
|
||
|
||
if 'sentences' in data['data']:
|
||
sentences = data['data']['sentences']
|
||
print(f"✓ 识别到 {len(sentences)} 个句子")
|
||
assert isinstance(sentences, list)
|
||
|
||
if len(sentences) > 0:
|
||
first_sentence = sentences[0]
|
||
print(f" 第一句:[{first_sentence.get('speaker', 'N/A')}] {first_sentence.get('text', '')[:50]}")
|
||
assert 'speaker' in first_sentence
|
||
assert 'text' in first_sentence
|
||
assert 'begin_time' in first_sentence
|
||
assert 'end_time' in first_sentence
|
||
|
||
|
||
class TestSpeakerDiarization:
|
||
"""测试说话人分离功能"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(60)
|
||
def test_speaker_diarization_result(self, e2e_app, test_audio_file):
|
||
"""测试说话人分离结果"""
|
||
print(f"\n测试说话人分离:{test_audio_file.name}")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/result?path={test_audio_file.name}'
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
|
||
if 'data' in data and 'sentences' in data['data']:
|
||
sentences = data['data']['sentences']
|
||
|
||
if len(sentences) > 0:
|
||
speakers = set()
|
||
for sentence in sentences:
|
||
if 'speaker' in sentence:
|
||
speakers.add(sentence['speaker'])
|
||
|
||
print(f"✓ 识别到 {len(speakers)} 个说话人:{speakers}")
|
||
|
||
min_speakers = TEST_CONFIG['validation']['min_speakers']
|
||
max_speakers = TEST_CONFIG['validation']['max_speakers']
|
||
|
||
assert len(speakers) >= min_speakers, \
|
||
f"说话人数量过少:{len(speakers)}"
|
||
assert len(speakers) <= max_speakers, \
|
||
f"说话人数量过多:{len(speakers)}"
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(60)
|
||
def test_speaker_timestamp_accuracy(self, e2e_app, test_audio_file):
|
||
"""测试说话人时间戳准确性"""
|
||
print(f"\n测试时间戳准确性:{test_audio_file.name}")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/result?path={test_audio_file.name}'
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
|
||
if 'data' in data and 'sentences' in data['data']:
|
||
sentences = data['data']['sentences']
|
||
|
||
valid_count = 0
|
||
for sentence in sentences:
|
||
if all(k in sentence for k in ['begin_time', 'end_time']):
|
||
begin = sentence['begin_time']
|
||
end = sentence['end_time']
|
||
|
||
assert begin >= 0, f"开始时间为负:{begin}"
|
||
assert end > begin, f"结束时间无效:begin={begin}, end={end}"
|
||
|
||
precision = TEST_CONFIG['validation']['timestamp_precision']
|
||
duration = end - begin
|
||
assert duration >= precision, \
|
||
f"时间戳精度不足:{duration}"
|
||
valid_count += 1
|
||
|
||
print(f"✓ 验证 {valid_count} 个有效时间戳")
|
||
|
||
|
||
class TestVideoTranscoding:
|
||
"""测试视频转码功能"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(TEST_CONFIG['timeouts']['transcode'] + 60)
|
||
@pytest.mark.requires_ffmpeg
|
||
def test_transcode_video(self, e2e_app, test_video_file):
|
||
"""测试真实视频转码"""
|
||
print(f"\n测试视频转码:{test_video_file.name}")
|
||
start_time = time.time()
|
||
|
||
app = e2e_app
|
||
timeout = TEST_CONFIG['timeouts']['transcode']
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/convert?path={test_video_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"转码耗时:{elapsed:.2f}秒")
|
||
|
||
assert elapsed < timeout, f"转码超时:{elapsed:.2f}秒 > {timeout}秒"
|
||
|
||
assert response.status_code in [200, 404, 500], \
|
||
f"转码请求失败:{response.status_code}"
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
assert data['status'] == 'success'
|
||
print(f"✓ 转码成功")
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(60)
|
||
@pytest.mark.requires_ffmpeg
|
||
def test_get_transcoded_video_url(self, e2e_app, test_video_file):
|
||
"""测试获取转码后视频 URL"""
|
||
print(f"\n测试获取转码视频 URL:{test_video_file.name}")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/getVidUrl?path={test_video_file.name}'
|
||
)
|
||
|
||
assert response.status_code in [200, 404], \
|
||
f"获取 URL 失败:{response.status_code}"
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
assert data['status'] == 'success'
|
||
assert 'data' in data
|
||
assert 'url' in data['data']
|
||
print(f"✓ 获取 URL 成功:{data['data']['url'][:50]}...")
|
||
|
||
|
||
class TestPerformance:
|
||
"""测试性能指标"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.performance
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(300)
|
||
def test_asr_processing_time(self, e2e_app, test_audio_file):
|
||
"""测试 ASR 处理时间"""
|
||
print(f"\n性能测试:ASR 处理时间")
|
||
|
||
app = e2e_app
|
||
max_time = TEST_CONFIG['performance']['max_asr_time']
|
||
|
||
with app.test_client() as client:
|
||
start_time = time.time()
|
||
|
||
response = client.get(
|
||
f'/api/recognize?path={test_audio_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
print(f"ASR 处理时间:{elapsed:.2f}秒(最大允许:{max_time}秒)")
|
||
|
||
assert response.status_code == 200
|
||
assert elapsed < max_time, \
|
||
f"ASR 处理时间过长:{elapsed:.2f}秒 > {max_time}秒"
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.performance
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(300)
|
||
def test_realtime_factor(self, e2e_app, test_audio_file):
|
||
"""测试实时处理率(RTF)"""
|
||
print(f"\n性能测试:实时处理率")
|
||
|
||
app = e2e_app
|
||
min_speed = TEST_CONFIG['performance']['min_processing_speed']
|
||
|
||
with app.test_client() as client:
|
||
start_time = time.time()
|
||
|
||
response = client.get(
|
||
f'/api/recognize?path={test_audio_file.name}'
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
|
||
if response.status_code == 200:
|
||
audio_duration = self._get_audio_duration(test_audio_file)
|
||
|
||
if audio_duration > 0:
|
||
rtf = elapsed / audio_duration
|
||
processing_speed = 1 / rtf if rtf > 0 else float('inf')
|
||
|
||
print(f"音频时长:{audio_duration:.2f}秒,处理时间:{elapsed:.2f}秒")
|
||
print(f"实时处理率:{processing_speed:.2f}x (RTF: {rtf:.2f})")
|
||
|
||
assert processing_speed >= min_speed, \
|
||
f"处理速度过慢:{processing_speed:.2f}x < {min_speed}x"
|
||
|
||
def _get_audio_duration(self, audio_path):
|
||
"""获取音频文件时长(秒)"""
|
||
try:
|
||
import librosa
|
||
y, sr = librosa.load(str(audio_path), sr=None)
|
||
return len(y) / sr
|
||
except Exception as e:
|
||
print(f"无法获取音频时长:{e}")
|
||
return 0
|
||
|
||
|
||
class TestErrorHandling:
|
||
"""测试错误处理"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.timeout(30)
|
||
def test_nonexistent_file_error(self, e2e_app):
|
||
"""测试文件不存在错误处理"""
|
||
print(f"\n测试错误处理:文件不存在")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get('/api/recognize?path=nonexistent.wav')
|
||
|
||
assert response.status_code in [400, 404, 500]
|
||
print(f"✓ 正确返回错误:{response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
assert data['status'] == 'error'
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.timeout(30)
|
||
def test_missing_parameter_error(self, e2e_app):
|
||
"""测试缺少参数错误处理"""
|
||
print(f"\n测试错误处理:缺少参数")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get('/api/recognize')
|
||
|
||
assert response.status_code == 400
|
||
data = response.get_json()
|
||
assert data['status'] == 'error'
|
||
assert 'message' in data
|
||
print(f"✓ 正确返回参数错误")
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.timeout(30)
|
||
@pytest.mark.requires_ffmpeg
|
||
def test_transcode_nonexistent_file(self, e2e_app):
|
||
"""测试转码不存在的文件"""
|
||
print(f"\n测试错误处理:转码文件不存在")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get('/api/convert?path=nonexistent.mp4')
|
||
|
||
assert response.status_code == 404
|
||
data = response.get_json()
|
||
assert data['status'] == 'error'
|
||
print(f"✓ 正确返回 404 错误")
|
||
|
||
|
||
class TestAPIResponseFormat:
|
||
"""测试 API 响应格式"""
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(300) # 5 分钟超时,包括模型加载时间
|
||
def test_recognize_response_format(self, e2e_app, test_audio_file):
|
||
"""测试识别接口响应格式"""
|
||
print(f"\n测试响应格式:识别接口")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/recognize?path={test_audio_file.name}'
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
|
||
assert 'status' in data
|
||
assert 'message' in data
|
||
assert 'timestamp' in data
|
||
assert 'data' in data
|
||
assert isinstance(data['data'], dict)
|
||
print(f"✓ 响应格式正确")
|
||
|
||
@pytest.mark.real
|
||
@pytest.mark.slow
|
||
@pytest.mark.timeout(300) # 5 分钟超时
|
||
def test_result_response_format(self, e2e_app, test_audio_file):
|
||
"""测试结果接口响应格式"""
|
||
print(f"\n测试响应格式:结果接口")
|
||
|
||
app = e2e_app
|
||
|
||
with app.test_client() as client:
|
||
response = client.get(
|
||
f'/api/result?path={test_audio_file.name}'
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.get_json()
|
||
|
||
assert 'status' in data
|
||
assert 'message' in data
|
||
assert 'timestamp' in data
|
||
|
||
if 'data' in data:
|
||
result_data = data['data']
|
||
assert isinstance(result_data, dict)
|
||
print(f"✓ 结果响应格式正确")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pytest.main([__file__, '-v', '-m', 'real', '--tb=short'])
|