diff --git a/.gitignore b/.gitignore index 5d381cc..f9ebc34 100644 --- a/.gitignore +++ b/.gitignore @@ -1,162 +1,59 @@ -# ---> Python -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/#use-with-ide -.pdm.toml - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Output (运行生成的结果) +output/ +results/ +logs/ +*.log +runs/ + +# Model files (大文件模型) +*.pth +*.pt +*.onnx +*.h5 +models/ + +# OS +.DS_Store +Thumbs.db + +# Temporary files +*.tmp +temp/ + +# link +input +model diff --git a/Qwen_app.py b/Qwen_app.py new file mode 100644 index 0000000..c3b062c --- /dev/null +++ b/Qwen_app.py @@ -0,0 +1,463 @@ +from lib.json_fun import f_detections_to_objects, load_json_data +from lib.qwen_fun import get_annnotated_frame_for_ai_without_xyxy, hazard_inspection, merge_conflict_inspection_data, report_generator, search_knowledge_base +from encodings.punycode import T + +from lib.qwen_fun_vid import generate_video_to_objects +""" +测试 在给定标注框与类别,原始视频经过转换之后,AI能否准确识别物体特征 +""" + +from tkinter import N +from datetime import datetime +import json +import os +import re +import sys +import subprocess +import importlib.util +from typing import Any +from lib.qwen_fun import save_json_to_file, upload_files_and_get_urls_concurrently +from lib.sam3 import SAM3 +import cv2 +import json +import numpy as np +from pathlib import Path +import gradio as gr + +fps = 0 +VIDEO_FOLDER: str = "input" + +def load_file_list() -> list[str]: + """ + 加载指定文件夹下的所有文件名称(不含子目录) + 返回:文件名列表 + """ + file_names: list[str] = [] + + if not os.path.isdir(VIDEO_FOLDER): + return file_names + + for item in os.listdir(VIDEO_FOLDER): + item_path: str = os.path.join(VIDEO_FOLDER, item) + if os.path.isfile(item_path): + file_names.append(item) + + return file_names + +def reload_files(): + """ + 刷新文件列表并设置默认值 + """ + file_list = load_file_list() + default_value = file_list[0] if file_list else None + return gr.update(choices=file_list, value=default_value) + +def get_full_vid_path(vid_file: str) -> str: + """ + 获取视频绝对路径 + """ + full_path: str = os.path.join(os.getcwd(), VIDEO_FOLDER, vid_file) + gr.Info(full_path) + return full_path + +def update_preview(frame_idx: int, vid_file: str): + vid_name: str = Path(vid_file).stem + output_dir: str = f"output/{vid_name}" + global fps + idx = int(frame_idx // fps) + img_path = f"{output_dir}/boxes/frame_{idx:04d}.jpg" + + with open(f"{output_dir}/hazard_inspection.json", "r", encoding="utf-8") as f: + result = f.read() + + dic = json.loads(result) + class_tag = get_class_tag_by_frame(dic, frame_idx, fps) + + return img_path, class_tag + +def run(vid_file: str, run_sam3: bool = True, run_inspection: bool = True, gen_report: bool = True): + """ + 把 batch_run_videos_then_analyze_images.py 的全部功能整合到这一个函数里。 + 针对 Gradio 选中的单个视频,执行:1.py 生成 track 图片 -> analyze_track_images_with_llm.py 检测图片。 + + 参数(保持与原 run 一致,方便 Gradio 直接复用): + vid_file: input/ 目录下的视频文件名 + run_sam3: 是否运行 1.py 生成 track 图片目录(沿用原变量名做开关) + run_inspection: 是否运行 analyze 图片检测脚本 + gen_report: 占位;analyze 脚本会自行生成 Word/JSON 报告 + """ + + # ============================================================================ + # 【配置区域】(来自 batch_run_videos_then_analyze_images.py) + # ============================================================================ + VIDEO_DIR = r"" + VIDEO_PATHS: list[str] = [r"C:\factory-inspection\videos\MOV00001_20260213_103217_fixed.mp4"] + YOLO_MODEL_PATH = r"C:\factory-inspection\yolo\best.pt" + TRACK_SCRIPT_PATH = r"C:\factory-inspection\1.py" + ANALYZE_SCRIPT_PATH = r"C:\factory-inspection\scripts\analyze_track_images_with_llm.py" + OUTPUT_ROOT = r"C:\factory-inspection\batch_video_image_reports" + VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI", ".MOV", ".MKV"} + # ============================================================================ + + def _safe_name(path: Path) -> str: + return re.sub(r"[^0-9A-Za-z\u4e00-\u9fff_.-]+", "_", path.stem or "video") + + def _load_module(script_path: Path) -> Any: + spec = importlib.util.spec_from_file_location("factory_video_tracker", script_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"无法加载脚本:{script_path}") + module = importlib.util.module_from_spec(spec) + sys.modules["factory_video_tracker"] = module + spec.loader.exec_module(module) + return module + + def _collect_videos(video_dir: str, video_paths: list[str]) -> list[Path]: + videos: list[Path] = [] + for raw in video_paths: + path = Path(raw) + if path.is_file(): + videos.append(path) + + if video_dir: + root = Path(video_dir) + if root.is_dir(): + for path in sorted(root.rglob("*")): + if path.suffix in VIDEO_EXTS and path.is_file(): + videos.append(path) + + # 去重但保持顺序 + result: list[Path] = [] + seen: set[str] = set() + for video in videos: + key = str(video.resolve()).lower() + if key not in seen: + seen.add(key) + result.append(video) + return result + + def _parse_class_id_and_name(value: Any) -> tuple[int | None, str]: + text = str(value or "").strip() + match = re.match(r"^\s*(\d+)\s+(.+?)\s*$", text) + if match: + return int(match.group(1)), match.group(2).strip() + match = re.match(r"^\s*(\d+)\s*$", text) + if match: + class_id = int(match.group(1)) + return class_id, f"class_{class_id}" + return None, text or "未知目标" + + def _parse_bbox(value: Any) -> list[float] | None: + if isinstance(value, list) and len(value) == 4: + return [float(x) for x in value] + if isinstance(value, str): + parts = [x.strip() for x in value.split(",") if x.strip()] + if len(parts) == 4: + return [float(x) for x in parts] + return None + + def _track_id_from_path(json_path: Path) -> int | None: + match = re.search(r"track_(\d+)", json_path.stem) or re.search(r"track_(\d+)", json_path.parent.name) + return int(match.group(1)) if match else None + + def _normalize_1py_jsons(track_dir: Path) -> None: + """1.py 可能输出单个 dict;analyze 脚本常用的是 list。这里做格式转换。""" + for json_path in track_dir.rglob("*.json"): + try: + data = json.loads(json_path.read_text(encoding="utf-8")) + except Exception as exc: + print(f"跳过异常 JSON:{json_path},原因:{exc}") + continue + if isinstance(data, list): + continue + if not isinstance(data, dict): + continue + + class_id, class_name = _parse_class_id_and_name(data.get("class_id") or data.get("class_name")) + normalized = [{ + "xyxy": _parse_bbox(data.get("xyxy") or data.get("bbox") or data.get("location")), + "confidence": data.get("confidence", data.get("conf", 0.0)), + "track_id": data.get("track_id", _track_id_from_path(json_path)), + "class_id": class_id, + "class_str": class_name, + "start_frame": data.get("start_frame"), + "end_frame": data.get("end_frame"), + "start_sec": data.get("start_sec"), + }] + json_path.write_text(json.dumps(normalized, ensure_ascii=False, indent=2), encoding="utf-8") + + def _video_to_tracks(video_path: Path, yolo_model_path: Path, output_dir: Path) -> Path: + module = _load_module(Path(TRACK_SCRIPT_PATH)) + config = module.InferenceConfig() + + # 覆盖 1.py 里的硬编码路径 + config.VIDEO_PATH = str(video_path) + config.MODEL_PATH = str(yolo_model_path) + + tracks_dir = output_dir / "tracks" + config.BEST_FRAME_DIR = str(tracks_dir) + config.OUTPUT_DIR = str(output_dir / "tracker_output") + + tracker = module.VideoTracker(config, output_path=str(output_dir / "tracked_video.mp4")) + tracker.track_video() + _normalize_1py_jsons(tracks_dir) + return tracks_dir + + def _run_image_analyze(track_dir: Path, report_dir: Path) -> None: + cmd = [ + sys.executable, + str(Path(ANALYZE_SCRIPT_PATH)), + "--input", + str(track_dir), + "--output", + str(report_dir), + ] + env = os.environ.copy() + env.setdefault("PYTHONUTF8", "1") + env.setdefault("PYTHONIOENCODING", "utf-8") + # 防止子进程出现 is_available=True 但 device_count=0 的诡异情况 + env.pop("CUDA_VISIBLE_DEVICES", None) + print("执行图片检测:", " ".join(f'"{x}"' if " " in x else x for x in cmd)) + code = subprocess.call(cmd, env=env) + if code != 0: + raise RuntimeError(f"图片检测失败,退出码:{code}") + + def _process_one_video(video_path: Path, yolo_model_path: Path, output_root: Path) -> tuple[Path, Path]: + run_name = f"{_safe_name(video_path)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + run_dir = output_root / run_name + report_dir = run_dir / "report" + run_dir.mkdir(parents=True, exist_ok=True) + report_dir.mkdir(parents=True, exist_ok=True) + + print("=" * 60) + print(f"处理视频:{video_path}") + + if run_sam3: + print("第 1 步:调用 1.py 生成 track 图片目录") + tracks_dir = _video_to_tracks(video_path, yolo_model_path, run_dir) + print(f"track 图片目录:{tracks_dir}") + else: + tracks_dir = run_dir / "tracks" + print(f"跳过 1.py,使用已存在的 track 图片目录:{tracks_dir}") + + if run_inspection or gen_report: + print("第 2 步:调用 analyze_track_images_with_llm.py 检测图片") + _run_image_analyze(tracks_dir, report_dir) + print(f"报告目录:{report_dir}") + else: + print("跳过图片检测") + + print("=" * 60) + return tracks_dir, report_dir + + # ===== 主体逻辑:单视频走单视频分支,多视频/批量收集逻辑也保留可用 ===== + selected_video: Path = Path(VIDEO_FOLDER) / vid_file if vid_file else Path() + + if not selected_video.is_file(): + # Gradio 没选到时,退回 batch 脚本配置的 VIDEO_DIR / VIDEO_PATHS + videos = _collect_videos(VIDEO_DIR, VIDEO_PATHS) + if not videos: + raise gr.Error( + f"找不到视频:{selected_video};且 VIDEO_DIR / VIDEO_PATHS 都为空。" + ) + selected_video = videos[0] + + yolo_model = Path(str(YOLO_MODEL_PATH).strip().strip('"')) + if not yolo_model.is_file(): + raise gr.Error(f"YOLO 权重不存在:{yolo_model},请设置 YOLO_MODEL_PATH。") + + output_root = Path(OUTPUT_ROOT) + output_root.mkdir(parents=True, exist_ok=True) + + tracks_dir, report_dir = _process_one_video(selected_video, yolo_model, output_root) + print(f"视频处理完成:{selected_video}") + + # ===== 给 Gradio 返回 4 元组(沿用原 run 的输出结构) ===== + # 取一张 track 图片作为预览 + preview_path: str = "" + for img in tracks_dir.rglob("*.jpg"): + preview_path = str(img) + break + if not preview_path: + for img in tracks_dir.rglob("*.png"): + preview_path = str(img) + break + + # 尝试读取 analyze 输出的 JSON 报告 + json_result: dict | list = {} + for jp in sorted(report_dir.rglob("*.json")): + try: + json_result = json.loads(Path(jp).read_text(encoding="utf-8")) + break + except Exception: + continue + + status_text = ( + f"处理完成。\n" + f"视频:{selected_video}\n" + f"Track 目录:{tracks_dir}\n" + f"报告目录:{report_dir}" + ) + + return gr.update(maximum=0, value=0, step=1), preview_path, status_text, json_result + + + +def get_vid_dict(vid_dir: str, obj_dict: dict, video_url_file: str, use_url_cache: bool) -> dict: + """获取视频字典 + 参数: + vid_dir: 物体视频存放目录 + obj_dict: 物品字典,键为物品TrackID,值为物品信息 + video_url_file: 视频URL文件路径 + use_url_cache: 是否使用缓存的URL + 返回: + 视频字典,键为物品TrackID,值为视频本地地址和视频URL + """ + + vid_dict = {} + + # 检查URL是否已存在 + if use_url_cache: + if os.path.exists(video_url_file): + try: + with open(video_url_file, "r", encoding="utf-8") as f: + loaded_data = json.load(f) + # 验证数据结构:确保值是字典且包含必要键 + for track_id, value in loaded_data.items(): + if isinstance(value, dict) and "vid_path" in value and "vid_url" in value: + vid_dict[track_id] = value + else: + print(f"跳过无效的视频信息: {track_id} -> {value}") + except Exception as e: + print(f"读取URL文件失败: {e}") + + # 初始化未存在的track_id + for track_id in obj_dict.keys(): + if track_id in vid_dict: + continue + try: + vid_path = f"{vid_dir}/obj_{int(track_id):03d}.mp4" + vid_dict[track_id] = {"vid_path": vid_path, "vid_url": None} + except (ValueError, TypeError): + print(f"无效的track_id: {track_id},跳过") + + # 找到未上传的视频 + unuploaded_vids = [] + for track_id, vid_info in vid_dict.items(): + if isinstance(vid_info, dict) and vid_info.get("vid_url") is None: + unuploaded_vids.append(vid_info["vid_path"]) + + # 上传未上传的视频并获取 URL + uploaded_urls = upload_files_and_get_urls_concurrently( + file_path_list=unuploaded_vids, + max_workers=8 + ) + + # 更新视频URL + for track_id, vid_info in vid_dict.items(): + if isinstance(vid_info, dict) and vid_info.get("vid_url") is None: + if vid_info["vid_path"] in unuploaded_vids: + idx = unuploaded_vids.index(vid_info["vid_path"]) + if idx < len(uploaded_urls): + vid_info["vid_url"] = uploaded_urls[idx] + + # 保存字典为 JSON 文件 + with open(video_url_file, "w", encoding="utf-8") as f: + json.dump(vid_dict, f, ensure_ascii=False, indent=4) + + return vid_dict + + +# 创建 Gradio 页面 +with gr.Blocks() as demo: + gr.Markdown("# 📸 隐患排查系统 (sam3 + qwen3.5-27b)") + + with gr.Row(): + with gr.Column(): + + initial_files: list[str] = load_file_list() + default_video = initial_files[0] if initial_files else None + vid_file = gr.Dropdown( + label="视频", + choices=initial_files, + value=default_video, # 默认选中第一个 + interactive=True, + ) + reload_file_list_button = gr.Button("刷新视频列表") + reload_file_list_button.click(fn=reload_files, inputs=[], outputs=[vid_file]) + + run_sam3 = gr.Checkbox(label="1. 运行 SAM3 模型", value=True) + run_inspection = gr.Checkbox(label="2. 运行隐患排查", value=True) + gen_report = gr.Checkbox(label="3. 生成报告", value=True) + audio_recognition = gr.Checkbox(label="4. 运行音频识别", value=False) + + run_button = gr.Button("运行", variant="primary") + + get_vid_path_btn = gr.Button("获取视频路径") + full_path_text = gr.Textbox(visible=False) + + with gr.Column(): + preview = gr.Image(label="预览", scale=2) + img_slider = gr.Slider(label="帧索引", minimum=0, maximum=0, value=0, step=1) + textbox = gr.Textbox(label="隐患结果", lines=10) + jsonbox = gr.JSON(label="隐患结果json") + + + + run_button.click(fn=run, inputs=[vid_file, run_sam3, run_inspection, gen_report], outputs=[img_slider, preview, textbox, jsonbox]) + img_slider.change(fn=update_preview, inputs=[img_slider, vid_file], outputs=[preview, textbox], show_progress="hidden") + get_vid_path_btn.click(fn=get_full_vid_path, inputs=vid_file, outputs=full_path_text) + + +def get_class_tag_by_frame(data, idx, fps): + """ + 根据给定的帧索引 (idx),返回在该帧范围内的所有对象的 class:tag 信息。 + + 参数: + data (dict): 包含 'tag', 'objects' 的字典数据(hazard_inspection.json格式) + idx (int): 需要查询的帧索引 + + 返回: + str: 符合条件的 class:tag 列表,多个结果之间用换行符分隔。如果没有匹配项,返回空字符串。 + """ + # 参数检查 + if not isinstance(data, dict): + raise ValueError("数据必须是字典类型") + if 'tag' not in data or 'objects' not in data: + raise ValueError("数据必须包含 'tag' 和 'objects' 键") + + tag_list = data['tag'] + objects = data['objects'] + + interval = fps + idx = int(idx / interval) #转换 + + # 用于存储符合条件的 class:tag 字符串 + result = [] + all_class_tag = [] + + # 遍历每个物体 + for obj in objects: + # 根据 tag_id 获取对应的隐患标签字符串 + tag_id = obj.get('tag_id', 0) + class_id = obj.get('class_id', 0) + tag_str = tag_list[tag_id] if tag_id < len(tag_list) else f"未知标签({tag_id})" + location = obj.get('location', '') + start_frame = obj.get('start_frame', 0) + level = obj.get('level', '') + + # 检查帧范围是否包含 idx + if start_frame == idx: + result.append(f"{tag_str} | 等级:{level} | 位置: {location}") + all_class_tag.append(f"{tag_str} | class_id:{class_id} | 等级:{level} | 开始帧:{start_frame} | 位置:{location}") + + # 使用换行符连接所有结果 + output = f"当前帧隐患:\n"+"\n".join(result)+"\n\n"+"所有隐患对象信息:\n"+"\n".join(all_class_tag) + return output + + +# 启动应用 +if __name__ == "__main__": + demo.launch( + debug=True, + allowed_paths=[VIDEO_FOLDER] + ) \ No newline at end of file diff --git a/Qwen_cmd test.py b/Qwen_cmd test.py new file mode 100644 index 0000000..dac400a --- /dev/null +++ b/Qwen_cmd test.py @@ -0,0 +1,135 @@ +import os + +from lib.json_fun import f_detections_to_objects +from lib.qwen_fun import chat, hazard_inspection, load_json_data, search_knowledge_base, upload_files_and_get_urls_concurrently +from encodings.punycode import T +""" +测试 在给定标注框与类别,原始视频经过转换之后,AI能否准确识别物体特征 +""" + +from tkinter import N +from datetime import datetime +import json +from lib.qwen_fun_vid import generate_video_to_objects +from lib.sam3 import SAM3 +from ultralytics.models.sam import SAM3VideoSemanticPredictor +import json +import numpy as np +from pathlib import Path + +def run(): + vid_name = "santai5" # 视频名称(不含后缀) + vid_end = ".mp4" # 视频后缀 + use_url_cache = True # 是否使用 URL 缓存,避免重复上传视频 + enable_thinking = False # 是否启用思考模式 + + run_vid_process = False # 是否运行视频处理流程(提取物体视频) + + input_video_path = f"input/{vid_name}{vid_end}" + output_dir = f"output/{vid_name}" + frame_detections_path = f"{output_dir}/frame_detections.json" + objects_json_path = f"{output_dir}/objects.json" + vid_dir = f"{output_dir}/obj_vids" + obj = json.load(open(objects_json_path, "r", encoding="utf-8")) + class_list = obj["class_list"] + obj_dict = obj["track_id_list"] + rule_dict: dict = load_json_data('知识库/rule.json') + video_url_file = f"{output_dir}/video_url.json" + + # 提取物体信息 + f_detections_to_objects( + frame_detections_path, + objects_json_path + ) + + if run_vid_process: + # 生成物体视频 + generate_video_to_objects( + obj_dict, + input_video_path, + output_dir=vid_dir, + ) + + # 上传视频并获取 URL + vid_dict = get_vid_dict(vid_dir, obj_dict, video_url_file, use_url_cache) + + hazard_inspection( + output_dir, + obj_dict, + rule_dict, + class_list, + vid_dict, + fps=30, + enable_thinking=enable_thinking + ) + +def get_vid_dict(vid_dir: str, obj_dict: dict, video_url_file: str, use_url_cache: bool) -> dict: + """获取视频字典 + 参数: + vid_dir: 物体视频存放目录 + obj_dict: 物品字典,键为物品TrackID,值为物品信息 + video_url_file: 视频URL文件路径 + use_url_cache: 是否使用缓存的URL + 返回: + 视频字典,键为物品TrackID,值为视频本地地址和视频URL + """ + + vid_dict = {} + + # 检查URL是否已存在 + if use_url_cache: + if os.path.exists(video_url_file): + try: + with open(video_url_file, "r", encoding="utf-8") as f: + vid_dict = json.load(f) + except Exception as e: + print(f"读取URL文件失败: {e}") + for track_id in obj_dict.keys(): + if track_id in vid_dict: + continue + vid_path = f"{vid_dir}/obj_{int(track_id):03d}.mp4" + vid_dict[track_id] = {"vid_path": vid_path, "vid_url": None} + + # 找到未上传的视频 + unuploaded_vids = [] + for track_id, vid_info in vid_dict.items(): + if vid_info["vid_url"] is None: + unuploaded_vids.append(vid_info["vid_path"]) + + # 上传未上传的视频并获取 URL + uploaded_urls = upload_files_and_get_urls_concurrently( + file_path_list=unuploaded_vids, + max_workers=8 + ) + + # 更新视频URL + for track_id, vid_info in vid_dict.items(): + if vid_info["vid_url"] is None: + idx = unuploaded_vids.index(vid_info["vid_path"]) + vid_info["vid_url"] = uploaded_urls[idx] + + # 保存字典为 JSON 文件 + with open(video_url_file, "w", encoding="utf-8") as f: + json.dump(vid_dict, f, ensure_ascii=False, indent=4) + + return vid_dict + + +# 启动应用 +if __name__ == "__main__": + # objects_json_path = "output/santai5/objects.json" + # input_video_path = "input/santai5.mp4" + # obj_dict = json.load(open(objects_json_path, "r", encoding="utf-8"))["track_id_list"] + + # # 记录开始时间 + # start_time = datetime.now() + # generate_video_to_objects( + # obj_dict, + # input_video_path, + # output_dir="output/santai5/obj_vids", + # ) + + run() + + + diff --git a/app_test.py b/app_test.py new file mode 100644 index 0000000..d8d6d04 --- /dev/null +++ b/app_test.py @@ -0,0 +1,10 @@ +from pathlib import Path + +import gradio as gr + +gr.set_static_paths(paths=[Path.cwd().absolute()/"input"]) + +with gr.Blocks() as demo: + gr.HTML("