| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- """不良关键因子发现。"""
- import numpy as np
- import pandas as pd
- from defect_analysis.schemas import normalize_defect_schema
- DEFAULT_FACTOR_DIMENSIONS = [
- "equipment_id",
- "seat_id",
- "lam_equipment_id",
- "lam_seat_id",
- "lam_fixture_id",
- "lam_jig_id",
- "lam_nozzle_id",
- "material_lot_oca",
- "material_lot_glass",
- "material_lot_polarizer",
- "clean_equipment_id",
- "clean_slot_id",
- "bond_equipment_id",
- "bond_head_id",
- "recipe_id",
- "shift",
- "defect_geometry_type",
- ]
- def _build_target_mask(df, target_defect_type=None, target_severity=None):
- if target_defect_type:
- return df["defect_type"] == target_defect_type
- if target_severity:
- return df["severity"] == target_severity
- return df["severity"] == "严重"
- def find_key_factors(
- df,
- *,
- target_defect_type=None,
- target_severity=None,
- dimensions=None,
- min_count=3,
- min_lift=1.1,
- top_n=20,
- ):
- """查找与目标不良显著相关的关键因子。
- 当前实现是可解释统计排序:按每个维度取值计算目标占比、异常倍数和支持度,
- 用综合得分排序。它适合生产早期作为 ML 特征与根因候选的基线。
- """
- normalized = normalize_defect_schema(df)
- if normalized.empty:
- return pd.DataFrame()
- dimensions = DEFAULT_FACTOR_DIMENSIONS if dimensions is None else dimensions
- target_mask = _build_target_mask(normalized, target_defect_type, target_severity)
- baseline_rate = float(target_mask.mean())
- if baseline_rate <= 0:
- return pd.DataFrame(
- columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
- )
- total = len(normalized)
- all_rows = []
- for dimension in dimensions:
- if dimension not in normalized.columns:
- continue
- series = normalized[dimension].fillna("").astype(str)
- valid_idx = series != ""
- if not valid_idx.any():
- continue
- valid = normalized.loc[valid_idx].copy()
- valid["_target"] = target_mask.loc[valid.index].astype(int)
- valid["_value"] = series.loc[valid_idx]
- grouped = valid.groupby("_value").agg(
- count=("defect_id", "count"),
- target_count=("_target", "sum"),
- )
- grouped = grouped[grouped["count"] >= min_count]
- grouped = grouped[grouped["target_count"] > 0]
- if grouped.empty:
- continue
- grouped["target_rate"] = grouped["target_count"] / grouped["count"]
- grouped["lift"] = grouped["target_rate"] / baseline_rate
- grouped = grouped[grouped["lift"] >= min_lift]
- if grouped.empty:
- continue
- grouped["support"] = grouped["count"] / total
- grouped["score"] = (
- (grouped["lift"] - 1).clip(lower=0) * 45
- + grouped["target_rate"] * 30
- + np.sqrt(grouped["target_count"]) * 8
- + grouped["support"] * 17
- )
- grouped = grouped.reset_index()
- grouped = grouped.rename(columns={"_value": "因子值"})
- grouped["维度"] = dimension
- grouped["样本数"] = grouped["count"].astype(int)
- grouped["目标数"] = grouped["target_count"].astype(int)
- grouped["目标占比"] = grouped["target_rate"].round(4)
- grouped["基线占比"] = round(baseline_rate, 4)
- grouped["异常倍数"] = grouped["lift"].round(2)
- grouped["支持度"] = grouped["support"].round(4)
- grouped["关键因子得分"] = grouped["score"].round(2)
- all_rows.append(
- grouped[["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]]
- )
- if not all_rows:
- return pd.DataFrame(
- columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
- )
- result = pd.concat(all_rows, ignore_index=True)
- return (
- result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False)
- .head(top_n)
- .reset_index(drop=True)
- )
|