"""不良关键因子发现。""" import numpy as np import pandas as pd from defect_analysis.schemas import normalize_defect_schema DEFAULT_FACTOR_DIMENSIONS = [ "equipment_id", "seat_id", "lam_equipment_id", "lam_seat_id", "lam_fixture_id", "lam_jig_id", "lam_nozzle_id", "material_lot_oca", "material_lot_glass", "material_lot_polarizer", "clean_equipment_id", "clean_slot_id", "bond_equipment_id", "bond_head_id", "recipe_id", "shift", "defect_geometry_type", ] def _build_target_mask(df, target_defect_type=None, target_severity=None): if target_defect_type: return df["defect_type"] == target_defect_type if target_severity: return df["severity"] == target_severity return df["severity"] == "严重" def find_key_factors( df, *, target_defect_type=None, target_severity=None, dimensions=None, min_count=3, min_lift=1.1, top_n=20, ): """查找与目标不良显著相关的关键因子。 当前实现是可解释统计排序:按每个维度取值计算目标占比、异常倍数和支持度, 用综合得分排序。它适合生产早期作为 ML 特征与根因候选的基线。 """ normalized = normalize_defect_schema(df) if normalized.empty: return pd.DataFrame() dimensions = DEFAULT_FACTOR_DIMENSIONS if dimensions is None else dimensions target_mask = _build_target_mask(normalized, target_defect_type, target_severity) baseline_rate = float(target_mask.mean()) if baseline_rate <= 0: return pd.DataFrame( columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"] ) total = len(normalized) all_rows = [] for dimension in dimensions: if dimension not in normalized.columns: continue series = normalized[dimension].fillna("").astype(str) valid_idx = series != "" if not valid_idx.any(): continue valid = normalized.loc[valid_idx].copy() valid["_target"] = target_mask.loc[valid.index].astype(int) valid["_value"] = series.loc[valid_idx] grouped = valid.groupby("_value").agg( count=("defect_id", "count"), target_count=("_target", "sum"), ) grouped = grouped[grouped["count"] >= min_count] grouped = grouped[grouped["target_count"] > 0] if grouped.empty: continue grouped["target_rate"] = grouped["target_count"] / grouped["count"] grouped["lift"] = grouped["target_rate"] / baseline_rate grouped = grouped[grouped["lift"] >= min_lift] if grouped.empty: continue grouped["support"] = grouped["count"] / total grouped["score"] = ( (grouped["lift"] - 1).clip(lower=0) * 45 + grouped["target_rate"] * 30 + np.sqrt(grouped["target_count"]) * 8 + grouped["support"] * 17 ) grouped = grouped.reset_index() grouped = grouped.rename(columns={"_value": "因子值"}) grouped["维度"] = dimension grouped["样本数"] = grouped["count"].astype(int) grouped["目标数"] = grouped["target_count"].astype(int) grouped["目标占比"] = grouped["target_rate"].round(4) grouped["基线占比"] = round(baseline_rate, 4) grouped["异常倍数"] = grouped["lift"].round(2) grouped["支持度"] = grouped["support"].round(4) grouped["关键因子得分"] = grouped["score"].round(2) all_rows.append( grouped[["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]] ) if not all_rows: return pd.DataFrame( columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"] ) result = pd.concat(all_rows, ignore_index=True) return ( result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False) .head(top_n) .reset_index(drop=True) )