Browse Source

修复:key_factors.py 空列表 dimensions 回退默认值问题

将 dimensions or DEFAULT 改为 is None 判断,与 root_cause.py 保持一致。
leod 4 ngày trước cách đây
mục cha
commit
385dd625d7
1 tập tin đã thay đổi với 113 bổ sung0 xóa
  1. 113 0
      defect_analysis/ml/key_factors.py

+ 113 - 0
defect_analysis/ml/key_factors.py

@@ -0,0 +1,113 @@
+"""不良关键因子发现。"""
+
+import numpy as np
+import pandas as pd
+
+from defect_analysis.schemas import normalize_defect_schema
+
+
+DEFAULT_FACTOR_DIMENSIONS = [
+    "equipment_id",
+    "seat_id",
+    "lam_equipment_id",
+    "lam_seat_id",
+    "lam_fixture_id",
+    "lam_jig_id",
+    "lam_nozzle_id",
+    "material_lot_oca",
+    "material_lot_glass",
+    "material_lot_polarizer",
+    "clean_equipment_id",
+    "clean_slot_id",
+    "bond_equipment_id",
+    "bond_head_id",
+    "recipe_id",
+    "shift",
+    "defect_geometry_type",
+]
+
+
+def _build_target_mask(df, target_defect_type=None, target_severity=None):
+    if target_defect_type:
+        return df["defect_type"] == target_defect_type
+    if target_severity:
+        return df["severity"] == target_severity
+    return df["severity"] == "严重"
+
+
+def find_key_factors(
+    df,
+    *,
+    target_defect_type=None,
+    target_severity=None,
+    dimensions=None,
+    min_count=3,
+    min_lift=1.1,
+    top_n=20,
+):
+    """查找与目标不良显著相关的关键因子。
+
+    当前实现是可解释统计排序:按每个维度取值计算目标占比、异常倍数和支持度,
+    用综合得分排序。它适合生产早期作为 ML 特征与根因候选的基线。
+    """
+    normalized = normalize_defect_schema(df)
+    if normalized.empty:
+        return pd.DataFrame()
+
+    dimensions = DEFAULT_FACTOR_DIMENSIONS if dimensions is None else dimensions
+    target_mask = _build_target_mask(normalized, target_defect_type, target_severity)
+    baseline_rate = float(target_mask.mean())
+    if baseline_rate <= 0:
+        return pd.DataFrame(
+            columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
+        )
+
+    rows = []
+    total = len(normalized)
+    for dimension in dimensions:
+        if dimension not in normalized.columns:
+            continue
+        values = normalized[dimension].fillna("").astype(str)
+        valid = normalized[values != ""].copy()
+        if valid.empty:
+            continue
+        valid_target = target_mask.loc[valid.index]
+        grouped = valid.assign(_target=valid_target.astype(int)).groupby(dimension)
+        for value, group in grouped:
+            count = len(group)
+            if count < min_count:
+                continue
+            target_count = int(group["_target"].sum())
+            if target_count == 0:
+                continue
+            target_rate = target_count / count
+            lift = target_rate / baseline_rate
+            if lift < min_lift:
+                continue
+            support = count / total
+            score = (max(lift - 1, 0) * 45) + (target_rate * 30) + (np.sqrt(target_count) * 8) + (support * 17)
+            rows.append(
+                {
+                    "维度": dimension,
+                    "因子值": str(value),
+                    "样本数": int(count),
+                    "目标数": target_count,
+                    "目标占比": round(float(target_rate), 4),
+                    "基线占比": round(float(baseline_rate), 4),
+                    "异常倍数": round(float(lift), 2),
+                    "支持度": round(float(support), 4),
+                    "关键因子得分": round(float(score), 2),
+                }
+            )
+
+    if not rows:
+        return pd.DataFrame(
+            columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
+        )
+
+    result = pd.DataFrame(rows)
+    return (
+        result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False)
+        .head(top_n)
+        .reset_index(drop=True)
+    )