Просмотр исходного кода

新增:Case 管理 UI — 创建/流转/列表/审计全流程

在 Streamlit 中新增 📋 Case 管理 Tab,包含三个子页面:
1. Case 列表:状态筛选、统计卡片、状态更新(含状态机校验)
2. 创建 Case:从根因候选维度手动新建异常追踪
3. 审计日志:查看 CREATE_CASE / UPDATE_STATUS 操作记录

侧边栏新增数据库路径配置(默认 defect_analysis.db),
工程师和管理者角色可见,操作员不可见。
leod 4 дней назад
Родитель
Сommit
aa30dd442d
1 измененных файлов с 182 добавлено и 1 удалено
  1. 182 1
      app.py

+ 182 - 1
app.py

@@ -18,6 +18,14 @@ from sklearn.cluster import DBSCAN
 from sklearn.decomposition import PCA
 from sklearn.preprocessing import StandardScaler
 from defect_analysis.data_quality import build_data_quality_report
+from defect_analysis.cases import (
+    VALID_CASE_STATUSES,
+    VALID_CASE_TRANSITIONS,
+    create_root_cause_case,
+    get_audit_logs,
+    list_cases,
+    update_case_status,
+)
 from app_utils import (
     apply_defect_filters,
     build_diagnostic_dashboard,
@@ -126,6 +134,15 @@ else:
 if df is None:
     st.stop()
 
+# --- 数据库路径 ---
+st.sidebar.divider()
+st.sidebar.subheader("🗄️ 数据库")
+db_path = st.sidebar.text_input(
+    "数据库路径",
+    value="defect_analysis.db",
+    help="Case 管理和数据持久化使用的 SQLite 数据库路径",
+)
+
 # --- 角色视图 ---
 st.sidebar.divider()
 st.sidebar.subheader("👤 视图模式")
@@ -151,7 +168,7 @@ tab_visibility = {
     },
     "管理者": {
         "tabs": ["🚨 SPC 控制图与预警", "🔬 缺陷模式识别", "💚 设备健康与共性分析",
-                 "📊 类型集中性 (帕累托)", "📈 时间集中性", "🧭 诊断驾驶舱"],
+                 "📊 类型集中性 (帕累托)", "📈 时间集中性", "🧭 诊断驾驶舱", "📋 Case 管理"],
         "show_kpi": True,
         "show_export": True,
     },
@@ -279,6 +296,7 @@ if filtered_df.empty:
 ALL_TABS = [
     "🧭 诊断驾驶舱",
     "🔬 ML 因子分析",
+    "📋 Case 管理",
     "🗺️ 空间集中性",
     "📊 类型集中性 (帕累托)",
     "📈 时间集中性",
@@ -693,6 +711,169 @@ if _t:
         else:
             st.info("当前数据未找到显著关键因子,可放宽筛选条件或增加样本量。")
 
+# ========== Tab: Case 管理 ==========
+_t = get_tab("📋 Case 管理")
+if _t:
+    with _t:
+        st.header("异常 Case 闭环管理")
+        st.markdown("从根因分析发现异常,创建 Case 追踪改善过程,直至关闭并审计。")
+
+        from defect_analysis.database import init_database
+        init_database(db_path)
+
+        # 子 Tab
+        case_list_tab, case_create_tab, case_audit_tab = st.tabs(["Case 列表", "创建 Case", "审计日志"])
+
+        # ---- Case 列表 ----
+        with case_list_tab:
+            status_filter = st.selectbox(
+                "状态筛选",
+                options=["全部"] + sorted(VALID_CASE_STATUSES),
+                index=0,
+                label_visibility="collapsed",
+            )
+            all_cases = list_cases(
+                db_path,
+                status=None if status_filter == "全部" else status_filter,
+            )
+
+            if all_cases.empty:
+                st.info("暂无 Case 记录,请先在「创建 Case」中新建异常追踪。")
+            else:
+                status_counts = all_cases["status"].value_counts()
+                st_cols = st.columns(len(status_counts))
+                for idx, (status, count) in enumerate(status_counts.items()):
+                    st_cols[idx].metric(status, count)
+
+                display = all_cases.copy()
+                display["created_at"] = pd.to_datetime(display["created_at"]).dt.strftime("%Y-%m-%d %H:%M")
+                display["updated_at"] = pd.to_datetime(display["updated_at"]).dt.strftime("%Y-%m-%d %H:%M")
+                st.dataframe(
+                    display[["case_id", "title", "status", "candidate_type", "candidate_value",
+                             "defect_type", "panel_zone", "owner", "created_by", "created_at", "updated_at"]],
+                    use_container_width=True,
+                    hide_index=True,
+                )
+
+                # 状态更新
+                st.subheader("更新 Case 状态")
+                upd_col1, upd_col2, upd_col3, upd_col4 = st.columns([1, 2, 1, 2])
+                with upd_col1:
+                    sel_case_id = st.number_input("Case ID", min_value=1, step=1, key="upd_case_id")
+                with upd_col2:
+                    current_row = all_cases[all_cases["case_id"] == int(sel_case_id)]
+                    if not current_row.empty:
+                        current_status = current_row.iloc[0]["status"]
+                        allowed = VALID_CASE_TRANSITIONS.get(current_status, set())
+                        if allowed:
+                            st.selectbox(
+                                f"当前: {current_status} → 目标状态",
+                                options=sorted(allowed),
+                                key="upd_target",
+                            )
+                        else:
+                            st.warning(f"当前状态 {current_status} 不可流转,Case 已终态。")
+                    else:
+                        st.warning("请选择有效的 Case ID")
+                with upd_col3:
+                    actor = st.text_input("操作人", value="engineer", key="upd_actor")
+                with upd_col4:
+                    note = st.text_input("备注", value="", key="upd_note")
+
+                can_update = not current_row.empty and bool(VALID_CASE_TRANSITIONS.get(current_row.iloc[0]["status"], set()))
+                if st.button("确认更新状态", key="upd_submit", disabled=not can_update):
+                    try:
+                        target_status = st.session_state.get("upd_target", "")
+                        if target_status:
+                            update_case_status(
+                                db_path,
+                                case_id=int(sel_case_id),
+                                status=target_status,
+                                actor=actor or "system",
+                                note=note,
+                            )
+                            st.success(f"Case {sel_case_id} 已更新至 {target_status}")
+                            st.rerun()
+                    except ValueError as e:
+                        st.error(str(e))
+
+        # ---- 创建 Case ----
+        with case_create_tab:
+            st.subheader("新建异常 Case")
+            cr_col1, cr_col2 = st.columns(2)
+            with cr_col1:
+                cr_title = st.text_input("Case 标题", key="cr_title")
+                cr_candidate_type = st.selectbox(
+                    "候选维度",
+                    options=[
+                        "lam_fixture_id", "lam_jig_id", "lam_nozzle_id",
+                        "material_lot_oca", "material_lot_glass", "material_lot_polarizer",
+                        "clean_equipment_id", "clean_slot_id", "bond_equipment_id", "bond_head_id",
+                        "equipment_id", "seat_id", "shift", "recipe_id",
+                    ],
+                    key="cr_type",
+                )
+                cr_candidate_value = st.text_input("候选值", key="cr_value")
+            with cr_col2:
+                cr_defect_type = st.selectbox(
+                    "缺陷类型",
+                    options=sorted(df["defect_type"].dropna().unique()),
+                    key="cr_defect",
+                )
+                cr_panel_zone = st.text_input("面板区域", value="", key="cr_zone")
+                cr_owner = st.text_input("责任人", value="", key="cr_owner")
+            cr_created_by = st.text_input("创建人", value="engineer", key="cr_creator")
+            cr_recommendation = st.text_area("改善建议", value="", key="cr_recommendation", height=80)
+
+            if st.button("创建 Case", key="cr_submit"):
+                if not cr_title or not cr_candidate_value:
+                    st.error("标题和候选值不能为空")
+                else:
+                    case_id = create_root_cause_case(
+                        db_path,
+                        title=cr_title,
+                        candidate_type=cr_candidate_type,
+                        candidate_value=cr_candidate_value,
+                        defect_type=cr_defect_type,
+                        panel_zone=cr_panel_zone or "未指定",
+                        owner=cr_owner or cr_created_by,
+                        created_by=cr_created_by,
+                        recommendation=cr_recommendation or "待分析",
+                    )
+                    st.success(f"Case #{case_id} 已创建")
+                    st.rerun()
+
+        # ---- 审计日志 ----
+        with case_audit_tab:
+            st.subheader("操作审计日志")
+            audit_filter = st.selectbox(
+                "实体筛选",
+                options=["全部", "case"],
+                index=1,
+                key="audit_entity_filter",
+            )
+            audit_entity_id = st.number_input(
+                "实体 ID(留空查全部)",
+                min_value=0,
+                value=0,
+                step=1,
+                key="audit_entity_id_input",
+            )
+            logs = get_audit_logs(
+                db_path,
+                entity_type="case" if audit_filter == "case" else None,
+                entity_id=audit_entity_id if audit_entity_id > 0 else None,
+            )
+            if logs.empty:
+                st.info("暂无审计日志")
+            else:
+                logs["created_at"] = pd.to_datetime(logs["created_at"]).dt.strftime("%Y-%m-%d %H:%M:%S")
+                st.dataframe(
+                    logs[["audit_id", "entity_type", "entity_id", "action", "actor", "details", "created_at"]],
+                    use_container_width=True,
+                    hide_index=True,
+                )
+
 # ========== Tab 1: 空间集中性 ==========
 _t = get_tab("🗺️ 空间集中性")
 if _t: