"""不良关键因子发现。""" import numpy as np import pandas as pd from defect_analysis.schemas import normalize_defect_schema DEFAULT_FACTOR_DIMENSIONS = [ "equipment_id", "seat_id", "lam_equipment_id", "lam_seat_id", "lam_fixture_id", "lam_jig_id", "lam_nozzle_id", "material_lot_oca", "material_lot_glass", "material_lot_polarizer", "clean_equipment_id", "clean_slot_id", "bond_equipment_id", "bond_head_id", "recipe_id", "shift", "defect_geometry_type", ] def _build_target_mask(df, target_defect_type=None, target_severity=None): if target_defect_type: return df["defect_type"] == target_defect_type if target_severity: return df["severity"] == target_severity return df["severity"] == "严重" def find_key_factors( df, *, target_defect_type=None, target_severity=None, dimensions=None, min_count=3, min_lift=1.1, top_n=20, ): """查找与目标不良显著相关的关键因子。 当前实现是可解释统计排序:按每个维度取值计算目标占比、异常倍数和支持度, 用综合得分排序。它适合生产早期作为 ML 特征与根因候选的基线。 """ normalized = normalize_defect_schema(df) if normalized.empty: return pd.DataFrame() dimensions = DEFAULT_FACTOR_DIMENSIONS if dimensions is None else dimensions target_mask = _build_target_mask(normalized, target_defect_type, target_severity) baseline_rate = float(target_mask.mean()) if baseline_rate <= 0: return pd.DataFrame( columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"] ) rows = [] total = len(normalized) for dimension in dimensions: if dimension not in normalized.columns: continue values = normalized[dimension].fillna("").astype(str) valid = normalized[values != ""].copy() if valid.empty: continue valid_target = target_mask.loc[valid.index] grouped = valid.assign(_target=valid_target.astype(int)).groupby(dimension) for value, group in grouped: count = len(group) if count < min_count: continue target_count = int(group["_target"].sum()) if target_count == 0: continue target_rate = target_count / count lift = target_rate / baseline_rate if lift < min_lift: continue support = count / total score = (max(lift - 1, 0) * 45) + (target_rate * 30) + (np.sqrt(target_count) * 8) + (support * 17) rows.append( { "维度": dimension, "因子值": str(value), "样本数": int(count), "目标数": target_count, "目标占比": round(float(target_rate), 4), "基线占比": round(float(baseline_rate), 4), "异常倍数": round(float(lift), 2), "支持度": round(float(support), 4), "关键因子得分": round(float(score), 2), } ) if not rows: return pd.DataFrame( columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"] ) result = pd.DataFrame(rows) return ( result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False) .head(top_n) .reset_index(drop=True) )