重构:优化项目结构,以适配更多模块

This commit is contained in:
yueliuli 2026-05-13 09:43:29 +08:00
parent ffcf785353
commit 5b1e03f809
13 changed files with 210 additions and 324 deletions

BIN
.coverage Normal file

Binary file not shown.

View File

@ -20,9 +20,9 @@ from pathlib import Path
from typing import Dict, List, Optional
# 路径配置
BASE_DIR = Path(__file__).parent.absolute()
TEMP_DIR = BASE_DIR / "temp"
OUTPUT_DIR = BASE_DIR / "output"
BASE_DIR = os.getcwd()
TEMP_DIR = Path(BASE_DIR) / "temp"
OUTPUT_DIR = Path(BASE_DIR) / "output/SpeechRecognition"
@ -155,7 +155,7 @@ def process_batch_diarization(video_paths, max_workers=1):
# 加载说话人分离模型(只加载一次)
print("加载说话人分离模型...")
from diarization_service import DiarizationService
from app.asr.diarization_service import DiarizationService
diar_service = DiarizationService(
embedding_model="eres2netv2",
@ -206,7 +206,7 @@ def process_batch_diarization(video_paths, max_workers=1):
diar_result = {
"segments": [seg.to_dict() for seg in diar_segments]
}
from map_speaker import save_json
from app.asr.map_speaker import save_json
save_json(temp_diar_path, diar_result)
video_process_time = time.time() - video_start_time
@ -269,7 +269,7 @@ def process_batch_asr(video_paths, diar_results, max_workers=1):
# 加载 ASR 模型(只加载一次)
print("加载 ASR 模型...")
from asr_service import ASRService
from app.asr.asr_service import ASRService
asr_service = ASRService(
model_name="paraformer-zh",
@ -342,7 +342,7 @@ def process_batch_asr(video_paths, diar_results, max_workers=1):
continue
# 2. 加载说话人分离结果
from map_speaker import load_json
from app.asr.map_speaker import load_json
diar_result = load_json(diar_path)
# 3. 执行 ASR 识别(不使用 ASR 自带的说话人)
@ -498,7 +498,7 @@ def main(path: str):
"""主函数"""
import torch
# 拼接根目录
path = os.path.join(os.path.dirname(__file__), "input", path)
path = os.path.join(BASE_DIR, "input", path)
print(f"开始处理路径:{path}")
print("\n" + "=" * 60)
@ -593,10 +593,10 @@ def main(path: str):
print(f"输出目录:{OUTPUT_DIR}")
if __name__ == "__main__":
# if __name__ == "__main__":
# 视频文件夹路径(全局变量)
# PATH = r"D:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio2\input\宁波北仑区鼎邦杰西雅服饰有限公司"
# PATH = r"D:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio2\input\temp"
# PATH = r"D:\Userfile\Projects\AnzezxianxHazardInspectAI\Code\audio2\input\temp\VID_20251104_085655_024.AVI"
PATH = r"VID_20251104_085655_024.AVI"
main(PATH)
# PATH = r"VID_20251104_085655_024.AVI"
# main(PATH)

70
app/asr/routes.py Normal file
View File

@ -0,0 +1,70 @@
from flask import request, jsonify
from app.utils import make_response
from pathlib import Path
import uuid
import threading
import signal
import os
import json
# 全局状态(模块内部)
task_running = {}
GLOBAL_ASR_SERVICE = None
GLOBAL_DIAR_SERVICE = None
ASR_MODEL_LOADED = False
DIAR_MODEL_LOADED = False
ASR_MODEL_LOCK = threading.Lock()
DIAR_MODEL_LOCK = threading.Lock()
def register_asr_routes(app):
"""【模块唯一入口】只注册路由不创建app"""
@app.route('/api/recognize', methods=['GET'])
def recognize():
task_id = str(uuid.uuid4())
task_running[task_id] = True
try:
path = request.args.get('path', '')
if not path:
return jsonify(make_response(status="error", message="缺少path参数")), 400
server_dir = Path(__file__).parent.parent.parent.absolute()
os.chdir(server_dir)
def timeout_handler(signum, frame):
task_running[task_id] = False
raise TimeoutError(f"超时 {app.config['TASK_TIMEOUT']}s")
use_alarm = False
try:
signal.signal(signal.SIGALRM, timeout_handler) # pyright: ignore
signal.alarm(app.config['TASK_TIMEOUT']) # pyright: ignore
use_alarm = True
except:
pass
try:
from app.asr.core import main
main(path)
finally:
if use_alarm:
signal.alarm(0) # pyright: ignore
task_running[task_id] = False
return jsonify(make_response(message="推理完成", data={"task_id": task_id, "path": path}))
except Exception as e:
task_running[task_id] = False
return jsonify(make_response(status="error", message=str(e))), 500
@app.route('/api/result', methods=['GET'])
def result():
path = request.args.get('path', '')
if not path:
return jsonify(make_response(status="error", message="缺少path")), 400
result_file = Path(app.config['OUTPUT_DIR']) / "SpeechRecognition" / f"{Path(path).stem}_result.json"
if not result_file.exists():
return jsonify(make_response(status="error", message="结果不存在")), 404
with open(result_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return jsonify(make_response(data=data))

10
app/settings.py Normal file
View File

@ -0,0 +1,10 @@
# 全局配置
config = {
'MAX_CONTENT_LENGTH': 500 * 1024 * 1024,
'SEND_FILE_MAX_AGE_DEFAULT': 300,
'INPUT_DIR': 'input',
'OUTPUT_DIR': 'output',
'TASK_TIMEOUT': 600,
'API_PORT': 5000,
'VIDEO_PORT': 8086,
}

View File

@ -2,7 +2,7 @@ import subprocess
from pathlib import Path
def convert_to_h264(input_path: str, output_path: str = None) -> str:
def convert_to_h264(input_root: Path, vid_full_name: str, output_root: Path) -> str:
"""
将视频文件转码为 H.264 编码格式
@ -17,17 +17,14 @@ def convert_to_h264(input_path: str, output_path: str = None) -> str:
FileNotFoundError: 输入文件不存在
subprocess.CalledProcessError: FFmpeg 转码失败
"""
input_path_all = Path("input/" + input_path).resolve()
output_path_all = Path("vid_h264/" + input_path).resolve()
input_path_all = Path(input_root,vid_full_name).resolve()
output_vid_full_name = f"{Path(vid_full_name).stem}_h264.mp4"
output_path_all = Path(output_root, output_vid_full_name).resolve()
if not input_path_all.exists():
raise FileNotFoundError(f"输入文件不存在:{input_path}")
raise FileNotFoundError(f"输入文件不存在:{input_path_all}")
print(f"开始转码:{input_path_all}")
# if output_path is None:
output_path = str(output_path_all.with_name(f"{input_path_all.stem}_h264.mp4"))
# else:
# output_path = Path(output_path).resolve()
cmd = [
"ffmpeg",
@ -36,7 +33,7 @@ def convert_to_h264(input_path: str, output_path: str = None) -> str:
"-c:a", "aac",
"-b:a", "128k",
"-y",
str(output_path)
str(output_path_all)
]
result = subprocess.run(cmd, capture_output=True, text=True)
@ -48,9 +45,5 @@ def convert_to_h264(input_path: str, output_path: str = None) -> str:
result.stdout,
result.stderr
)
print(f"转码完成:{output_path}")
return str(output_path)
if __name__ == "__main__":
convert_to_h264("VID_20251104_085655_024.AVI", "vid_h264/VID_20251104_085655_024_h264.mp4")
# convert_to_h264("input/short.AVI", "vid_h264/short_h264.mp4")
print(f"转码完成:{output_path_all}")
return str(output_path_all)

40
app/transcode/routes.py Normal file
View File

@ -0,0 +1,40 @@
from flask import request, jsonify
from app.utils import make_response
from app.transcode.core import convert_to_h264
from pathlib import Path
def register_transcode_routes(app):
"""【转码模块入口】只注册路由"""
# 初始化转码输出目录
transcode_output_dir = Path(app.config['OUTPUT_DIR'], "vid_h264")
transcode_output_dir.mkdir(parents=True, exist_ok=True)
@app.route('/api/convert', methods=['GET'])
def convert():
# 获取参数
path = request.args.get('path', '')
# 1. 路径为空时,返回错误响应
if not path:
return jsonify(make_response(status="error", message="缺少 path")), 400
# 2. 路径不为空,转码视频文件
try:
out = convert_to_h264(app.config['INPUT_DIR'], path, transcode_output_dir)
return jsonify(make_response(message="转码完成", data={"path": out}))
except FileNotFoundError as e:
return jsonify(make_response(status="error", message=str(e))), 404
except Exception as e:
return jsonify(make_response(status="error", message=f"转码失败:{str(e)}")), 500
@app.route('/api/getVidUrl', methods=['GET'])
def getVidUrl():
path = Path(request.args.get('path', ''))
if not path:
return jsonify(make_response(status="error", message="缺少path")), 400
vid_path = Path(transcode_output_dir, f"{path.stem}_h264.mp4")
if not vid_path.exists():
return jsonify(make_response(status="error", message="视频不存在")), 404
url = f"http://localhost:8086/{path.stem}_h264.mp4"
return jsonify(make_response(data={"url": url}))

30
app/utils.py Normal file
View File

@ -0,0 +1,30 @@
from datetime import datetime, timezone
from flask import request, jsonify
def make_response(status="success", data=None, errors=None, message=None, extra=None):
"""统一响应格式"""
response = {
"status": status,
"data": data or {},
"errors": errors or [],
"message": message or ("操作成功" if status == "success" else "操作失败"),
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
}
if extra:
response.update(extra)
return response
def register_cors(app):
"""统一注册 CORS"""
@app.after_request
def after_request(response):
origin = request.headers.get('Origin', '*')
response.headers['Access-Control-Allow-Origin'] = origin
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type,Authorization'
response.headers['Access-Control-Allow-Methods'] = 'GET,PUT,POST,DELETE,OPTIONS'
return response
@app.route('/config', methods=['OPTIONS'])
def config_options():
return '', 200

8
lib/caddy/run.py Normal file
View File

@ -0,0 +1,8 @@
import os
import subprocess
def run_caddy(port: int = 8086):
caddy_dir = os.path.join(os.getcwd(), "output", "vid_h264")
caddy_exe = os.path.join(os.path.dirname(__file__), "caddy_windows_amd64.exe")
subprocess.Popen([caddy_exe, "file-server", "--listen", f":{port}", "--browse"], cwd=caddy_dir, shell=True)

331
server.py
View File

@ -1,311 +1,46 @@
"""
Web API Server for ASR and Speaker Diarization
提供语音识别和说话人分离的 REST API 服务
"""
from flask import Flask
from app.settings import config
from app.utils import register_cors, make_response
from app.asr.routes import register_asr_routes
from app.transcode.routes import register_transcode_routes
from waitress import serve
import os
import subprocess
import sys
import gc
import json
import shutil
import signal
import threading
from pathlib import Path
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
import uuid
from datetime import datetime, timezone
from lib.caddy.run import run_caddy
from lib.convert import convert_to_h264
def create_app():
"""【统一创建 Flask 服务】唯一创建 app 的地方"""
app = Flask(__name__)
# 加载全局配置
app.config.update(config)
def make_response(status="success", data=None, errors=None, message=None, extra=None):
"""
统一 API 响应格式
# 注册全局工具
register_cors(app)
Args:
status: 状态 ("success" "error")
data: 返回的数据
errors: 错误列表
message: 消息
extra: 其他额外字段
# 注册各个业务模块路由(模块只提供入口)
register_asr_routes(app)
register_transcode_routes(app)
Returns:
统一格式的 JSON 响应
"""
response = {
"status": status,
"data": data if data is not None else {},
"errors": errors if errors is not None else [],
"message": message or ("操作成功" if status == "success" else "操作失败"),
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
}
# 根路由
@app.route('/')
def index():
return make_response(message="ASR & Speaker Diarization API 服务运行中")
if extra:
response.update(extra)
return response
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['RESULT_FOLDER'] = 'results'
# 增加请求超时时间(秒),支持长时间运行的任务
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 300 # 5
OUTPUT_DIR = Path('output')
# 手动添加 CORS 响应头
@app.after_request
def after_request(response):
# 获取请求的 Origin
origin = request.headers.get('Origin', '*')
# 设置允许的来源(不使用通配符)
response.headers.add('Access-Control-Allow-Origin', origin)
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
return response
# 处理 OPTIONS 预检请求
@app.route('/config', methods=['OPTIONS'])
def config_options():
"""处理 /config 接口的 OPTIONS 预检请求"""
return '', 200
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['RESULT_FOLDER'], exist_ok=True)
GLOBAL_ASR_SERVICE = None
GLOBAL_DIAR_SERVICE = None
ASR_MODEL_LOADED = False
DIAR_MODEL_LOADED = False
ASR_MODEL_LOCK = threading.Lock()
DIAR_MODEL_LOCK = threading.Lock()
# 全局变量用于控制任务执行
task_running = {}
task_timeout = 600 # 10 分钟超时(秒)
@app.route('/api/recognize', methods=['GET'])
def recognize():
"""文件推理 - 调用 main.py 的 main 函数"""
task_id = str(uuid.uuid4())
task_running[task_id] = True
try:
# 从请求参数获取路径(只接受文件名或 input 目录下的相对路径)
path = request.args.get('path', '')
if not path:
return jsonify(make_response(
status="error",
message="请提供文件路径",
errors=["缺少必要参数path"]
)), 400
print(f"\n{'='*60}")
print(f"API 收到请求path={path}, task_id={task_id}")
print(f"{'='*60}")
print(f"开始调用 main 函数...")
# 切换到 server.py 所在目录
server_dir = Path(__file__).parent.absolute()
os.chdir(server_dir)
print(f"当前工作目录:{os.getcwd()}")
# 设置超时处理
def timeout_handler(signum, frame):
task_running[task_id] = False
raise TimeoutError(f"任务执行超时 ({task_timeout}秒)")
# 注册信号处理器(仅在 Unix/Linux/Mac 有效Windows 下会被忽略)
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(task_timeout)
use_alarm = True
except (AttributeError, ValueError):
# Windows 不支持 SIGALRM
use_alarm = False
print("注意Windows 系统,使用超时检测线程")
try:
# 调用 main.py 的 main 函数
from main import main
main(path)
# 取消超时
if use_alarm:
signal.alarm(0)
print(f"main 函数执行完成")
return jsonify(make_response(
status="success",
message="文件推理完成",
data={"path": path, "task_id": task_id}
)), 200
except TimeoutError as e:
print(f"任务超时:{e}")
return jsonify(make_response(
status="error",
message=str(e),
errors=[str(e)]
)), 504 # Gateway Timeout
finally:
# 清理信号处理器
if use_alarm:
signal.alarm(0)
task_running[task_id] = False
except Exception as e:
import traceback
traceback.print_exc()
task_running[task_id] = False
return jsonify(make_response(
status="error",
message=str(e),
errors=[str(e)]
)), 500
@app.route('/api/result', methods=['GET'])
def result():
"""获取文件推理结果"""
try:
# 从请求参数获取路径
path = request.args.get('path', '')
if not path:
return jsonify(make_response(
status="error",
message="请提供文件路径",
errors=["缺少必要参数path"]
)), 400
# 读取结果
print(Path(path).stem)
result_file = OUTPUT_DIR / f"{Path(path).stem}_result.json"
if result_file.exists():
with open(result_file, 'r', encoding='utf-8') as f:
result_data = json.load(f)
return jsonify(make_response(
status="success",
message="获取成功",
data=result_data
)), 200
else:
return jsonify(make_response(
status="error",
message="处理完成但未找到结果文件",
errors=["结果文件不存在"]
)), 404
except Exception as e:
import traceback
traceback.print_exc()
return jsonify(make_response(
status="error",
message=str(e),
errors=[str(e)]
)), 500
@app.route('/api/convert', methods=['GET'])
def convert():
"""视频文件转码"""
try:
# 从请求参数获取路径
path = request.args.get('path', '')
if not path:
return jsonify(make_response(
status="error",
message="请提供文件路径",
errors=["缺少必要参数path"]
)), 400
# 转码视频文件
output_path = convert_to_h264(path)
return jsonify(make_response(
status="success",
message="视频文件转码完成",
data={"path": output_path}
)), 200
except Exception as e:
import traceback
traceback.print_exc()
return jsonify(make_response(
status="error",
message=str(e),
errors=[str(e)]
)), 500
@app.route('/api/getVidUrl', methods=['GET'])
def getVidUrl():
"""获取视频文件URL"""
try:
# 从请求参数获取路径
path = request.args.get('path', '')
if not path:
return jsonify(make_response(
status="error",
message="请提供文件路径",
errors=["缺少必要参数path"]
)), 400
# 检查视频文件是否存在
if not Path(f"vid_h264/{Path(path).stem}_h264.mp4").exists():
return jsonify(make_response(
status="error",
message="视频文件不存在",
errors=["视频文件不存在"]
)), 404
# 生成视频文件URL
url = f"http://localhost:8086/{Path(path).stem}_h264.mp4"
print(url)
return jsonify(make_response(
status="success",
message="获取成功",
data={"url": url}
)), 200
except Exception as e:
import traceback
traceback.print_exc()
return jsonify(make_response(
status="error",
message=str(e),
errors=[str(e)]
)), 500
return app
if __name__ == '__main__':
api_port = config['API_PORT']
video_port = config['VIDEO_PORT']
app = create_app()
print("=" * 60)
print(" ASR & Speaker Diarization API Server")
print("=" * 60)
print("\nAPI 接口:")
print(" GET /api/recognize - 文件推理")
print(" GET /api/result - 获取文件推理结果")
print(" GET /api/convert - 转码视频文件")
print(" GET /api/getVidUrl - 获取视频文件URL")
print("\n" + "=" * 60)
print("启动服务http://localhost:5000")
print("使用 Waitress WSGI 服务器(无超时限制)")
print(f" API Server (模块化架构) 监听端口:{api_port}")
print(f" 视频文件服务监听端口:{video_port}")
print("=" * 60)
# 启动 Caddy 服务(后台运行)
caddy_dir = os.path.join(os.path.dirname(__file__), "vid_h264")
caddy_exe = os.path.join(os.path.dirname(__file__), "lib", "caddy_windows_amd64.exe")
subprocess.Popen([caddy_exe, "file-server", "--listen", ":8086", "--browse"], cwd=caddy_dir, shell=True)
# 启动视频文件服务
run_caddy(port=video_port)
from waitress import serve
serve(app, host='0.0.0.0', port=5000, threads=4, connection_limit=100)
# 启动服务
serve(app, host='0.0.0.0', port=api_port, threads=4, connection_limit=100)