2 Commits 354df30582 ... c1cd04531e

Auteur SHA1 Message Date
  leod c1cd04531e 重构:向量化 key_factors + 拆分 ML 因子分析独立 Tab il y a 4 jours
  leod ba090c608f 优化:修复 Timestamp.utcnow 弃用警告 + CSV 编码统一为 utf-8-sig il y a 4 jours
4 fichiers modifiés avec 120 ajouts et 56 suppressions
  1. 43 0
      analyze_key_factors.py
  2. 34 21
      app.py
  3. 1 1
      defect_analysis/cases.py
  4. 42 34
      defect_analysis/ml/key_factors.py

+ 43 - 0
analyze_key_factors.py

@@ -0,0 +1,43 @@
+"""查找不良关键因子。"""
+
+import argparse
+
+import pandas as pd
+
+from defect_analysis.ml.key_factors import find_key_factors
+from defect_analysis.schemas import normalize_defect_schema
+
+
+def analyze_csv_key_factors(csv_path, *, target_defect_type=None, target_severity=None, top_n=20):
+    df = pd.read_csv(csv_path, parse_dates=["timestamp"], encoding="utf-8-sig")
+    df = normalize_defect_schema(df)
+    return find_key_factors(
+        df,
+        target_defect_type=target_defect_type,
+        target_severity=target_severity,
+        top_n=top_n,
+    )
+
+
+def main():
+    parser = argparse.ArgumentParser(description="查找不良关键因子")
+    parser.add_argument("--csv", default="defect_data.csv", help="缺陷 CSV 文件路径")
+    parser.add_argument("--target-defect-type", help="目标缺陷类型,例如 气泡/划痕/漏光")
+    parser.add_argument("--target-severity", help="目标严重等级,例如 严重")
+    parser.add_argument("--top-n", type=int, default=20)
+    args = parser.parse_args()
+
+    result = analyze_csv_key_factors(
+        args.csv,
+        target_defect_type=args.target_defect_type,
+        target_severity=args.target_severity,
+        top_n=args.top_n,
+    )
+    if result.empty:
+        print("未找到显著关键因子,请检查目标条件或增加样本量。")
+    else:
+        print(result.to_string(index=False))
+
+
+if __name__ == "__main__":
+    main()

+ 34 - 21
app.py

@@ -267,6 +267,7 @@ if filtered_df.empty:
 # --- Tab 布局 (按角色动态) ---
 ALL_TABS = [
     "🧭 诊断驾驶舱",
+    "🔬 ML 因子分析",
     "🗺️ 空间集中性",
     "📊 类型集中性 (帕累托)",
     "📈 时间集中性",
@@ -299,7 +300,6 @@ if _t:
         dashboard = build_diagnostic_dashboard(filtered_df)
         industry_diagnosis = generate_industry_diagnosis(filtered_df, dashboard)
         quality_report = build_data_quality_report(filtered_df)
-        key_factors = find_key_factors(filtered_df, target_defect_type=dashboard["top_defect_type"], top_n=10)
         level_colors = {
             "严重": ("#7f1d1d", "#fee2e2"),
             "关注": ("#92400e", "#fef3c7"),
@@ -551,26 +551,6 @@ if _t:
             st.dataframe(root_table, use_container_width=True, hide_index=True)
             st.caption("风险分 = 贡献规模 + 异常倍数 + 严重占比 + 涉及面板数。先查高贡献且高偏离的组合。")
 
-            extended_root_causes = dashboard.get("extended_root_causes")
-            if extended_root_causes is not None and not extended_root_causes.empty:
-                st.subheader("扩展根因候选")
-                extended_table = extended_root_causes.copy()
-                extended_table["占比"] = extended_table["占比"].map(lambda v: f"{v:.1%}")
-                extended_table["异常倍数"] = extended_table["异常倍数"].map(lambda v: f"{v:.2f}x")
-                extended_table["严重占比"] = extended_table["严重占比"].map(lambda v: f"{v:.1%}")
-                st.dataframe(extended_table, use_container_width=True, hide_index=True)
-                st.caption("覆盖治具、吸嘴、材料批次、清洗/绑定等维度,用于多前制程链路追溯。")
-
-            if not key_factors.empty:
-                st.subheader(f"关键因子分析:{dashboard['top_defect_type']}")
-                key_factor_table = key_factors.copy()
-                key_factor_table["目标占比"] = key_factor_table["目标占比"].map(lambda v: f"{v:.1%}")
-                key_factor_table["基线占比"] = key_factor_table["基线占比"].map(lambda v: f"{v:.1%}")
-                key_factor_table["异常倍数"] = key_factor_table["异常倍数"].map(lambda v: f"{v:.2f}x")
-                key_factor_table["支持度"] = key_factor_table["支持度"].map(lambda v: f"{v:.1%}")
-                st.dataframe(key_factor_table, use_container_width=True, hide_index=True)
-                st.caption("关键因子按目标缺陷占比、异常倍数、样本数和支持度综合排序。")
-
         trend_col, pareto_col = st.columns([1, 1])
         with trend_col:
             st.subheader("每日缺陷走势")
@@ -620,6 +600,39 @@ if _t:
             )
             st.plotly_chart(fig_pareto_dash, use_container_width=True)
 
+# ========== Tab 0.5: ML 因子分析 ==========
+_t = get_tab("🔬 ML 因子分析")
+if _t:
+    with _t:
+        dashboard = build_diagnostic_dashboard(filtered_df)
+        key_factors = find_key_factors(filtered_df, target_defect_type=dashboard["top_defect_type"], top_n=10)
+        extended_root_causes = dashboard.get("extended_root_causes")
+
+        st.header("根因与关键因子分析")
+        st.markdown("综合规则评分、统计分析与行业维度,输出可解释的异常候选。")
+        st.divider()
+
+        if extended_root_causes is not None and not extended_root_causes.empty:
+            st.subheader("扩展根因候选")
+            extended_table = extended_root_causes.copy()
+            extended_table["占比"] = extended_table["占比"].map(lambda v: f"{v:.1%}")
+            extended_table["异常倍数"] = extended_table["异常倍数"].map(lambda v: f"{v:.2f}x")
+            extended_table["严重占比"] = extended_table["严重占比"].map(lambda v: f"{v:.1%}")
+            st.dataframe(extended_table, use_container_width=True, hide_index=True)
+            st.caption("覆盖治具、吸嘴、材料批次、清洗/绑定等维度,用于多前制程链路追溯。")
+
+        if not key_factors.empty:
+            st.subheader(f"关键因子分析:{dashboard['top_defect_type']}")
+            key_factor_table = key_factors.copy()
+            key_factor_table["目标占比"] = key_factor_table["目标占比"].map(lambda v: f"{v:.1%}")
+            key_factor_table["基线占比"] = key_factor_table["基线占比"].map(lambda v: f"{v:.1%}")
+            key_factor_table["异常倍数"] = key_factor_table["异常倍数"].map(lambda v: f"{v:.2f}x")
+            key_factor_table["支持度"] = key_factor_table["支持度"].map(lambda v: f"{v:.1%}")
+            st.dataframe(key_factor_table, use_container_width=True, hide_index=True)
+            st.caption("关键因子按目标缺陷占比、异常倍数、样本数和支持度综合排序。")
+        else:
+            st.info("当前数据未找到显著关键因子,可放宽筛选条件或增加样本量。")
+
 # ========== Tab 1: 空间集中性 ==========
 _t = get_tab("🗺️ 空间集中性")
 if _t:

+ 1 - 1
defect_analysis/cases.py

@@ -98,7 +98,7 @@ def update_case_status(db_path, *, case_id, status, actor, note=""):
         current_status = current["status"]
         if status not in VALID_CASE_TRANSITIONS.get(current_status, set()):
             raise ValueError(f"不允许的 Case 状态流转: {current_status} -> {status}")
-        closed_at = pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S") if status == "CLOSED" else None
+        closed_at = pd.Timestamp.now('UTC').strftime("%Y-%m-%d %H:%M:%S") if status == "CLOSED" else None
         conn.execute(
             """
             UPDATE root_cause_cases

+ 42 - 34
defect_analysis/ml/key_factors.py

@@ -62,50 +62,58 @@ def find_key_factors(
             columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
         )
 
-    rows = []
     total = len(normalized)
+    all_rows = []
     for dimension in dimensions:
         if dimension not in normalized.columns:
             continue
-        values = normalized[dimension].fillna("").astype(str)
-        valid = normalized[values != ""].copy()
-        if valid.empty:
+        series = normalized[dimension].fillna("").astype(str)
+        valid_idx = series != ""
+        if not valid_idx.any():
             continue
-        valid_target = target_mask.loc[valid.index]
-        grouped = valid.assign(_target=valid_target.astype(int)).groupby(dimension)
-        for value, group in grouped:
-            count = len(group)
-            if count < min_count:
-                continue
-            target_count = int(group["_target"].sum())
-            if target_count == 0:
-                continue
-            target_rate = target_count / count
-            lift = target_rate / baseline_rate
-            if lift < min_lift:
-                continue
-            support = count / total
-            score = (max(lift - 1, 0) * 45) + (target_rate * 30) + (np.sqrt(target_count) * 8) + (support * 17)
-            rows.append(
-                {
-                    "维度": dimension,
-                    "因子值": str(value),
-                    "样本数": int(count),
-                    "目标数": target_count,
-                    "目标占比": round(float(target_rate), 4),
-                    "基线占比": round(float(baseline_rate), 4),
-                    "异常倍数": round(float(lift), 2),
-                    "支持度": round(float(support), 4),
-                    "关键因子得分": round(float(score), 2),
-                }
-            )
+        valid = normalized.loc[valid_idx].copy()
+        valid["_target"] = target_mask.loc[valid.index].astype(int)
+        valid["_value"] = series.loc[valid_idx]
+        grouped = valid.groupby("_value").agg(
+            count=("defect_id", "count"),
+            target_count=("_target", "sum"),
+        )
+        grouped = grouped[grouped["count"] >= min_count]
+        grouped = grouped[grouped["target_count"] > 0]
+        if grouped.empty:
+            continue
+        grouped["target_rate"] = grouped["target_count"] / grouped["count"]
+        grouped["lift"] = grouped["target_rate"] / baseline_rate
+        grouped = grouped[grouped["lift"] >= min_lift]
+        if grouped.empty:
+            continue
+        grouped["support"] = grouped["count"] / total
+        grouped["score"] = (
+            (grouped["lift"] - 1).clip(lower=0) * 45
+            + grouped["target_rate"] * 30
+            + np.sqrt(grouped["target_count"]) * 8
+            + grouped["support"] * 17
+        )
+        grouped = grouped.reset_index()
+        grouped = grouped.rename(columns={"_value": "因子值"})
+        grouped["维度"] = dimension
+        grouped["样本数"] = grouped["count"].astype(int)
+        grouped["目标数"] = grouped["target_count"].astype(int)
+        grouped["目标占比"] = grouped["target_rate"].round(4)
+        grouped["基线占比"] = round(baseline_rate, 4)
+        grouped["异常倍数"] = grouped["lift"].round(2)
+        grouped["支持度"] = grouped["support"].round(4)
+        grouped["关键因子得分"] = grouped["score"].round(2)
+        all_rows.append(
+            grouped[["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]]
+        )
 
-    if not rows:
+    if not all_rows:
         return pd.DataFrame(
             columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
         )
 
-    result = pd.DataFrame(rows)
+    result = pd.concat(all_rows, ignore_index=True)
     return (
         result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False)
         .head(top_n)