"""根因候选分析。""" import pandas as pd EXTENDED_ROOT_CAUSE_DIMENSIONS = [ "lam_fixture_id", "lam_jig_id", "lam_nozzle_id", "material_lot_oca", "material_lot_glass", "material_lot_polarizer", "clean_equipment_id", "clean_slot_id", "bond_equipment_id", "bond_head_id", "recipe_id", ] def build_extended_root_causes(df, dimensions=None): """按治具、吸嘴、材料批次等行业维度生成扩展根因候选。""" dimensions = EXTENDED_ROOT_CAUSE_DIMENSIONS if dimensions is None else dimensions total_defects = max(len(df), 1) rows = [] for dimension in dimensions: if dimension not in df.columns: continue series = df[dimension].fillna("").astype(str) valid = df[series != ""].copy() if valid.empty: continue counts = valid.groupby(dimension).agg( 缺陷数=("defect_id", "count"), 涉及面板=("panel_id", "nunique"), 主要缺陷=("defect_type", lambda s: s.mode().iloc[0] if not s.mode().empty else "-"), 严重数=("severity", lambda s: int((s == "严重").sum())), ).reset_index() expected = len(valid) / max(valid[dimension].nunique(), 1) counts["维度"] = dimension counts["候选值"] = counts[dimension].astype(str) counts["占比"] = counts["缺陷数"] / total_defects counts["严重占比"] = counts["严重数"] / counts["缺陷数"].clip(lower=1) counts["异常倍数"] = (counts["缺陷数"] / max(expected, 0.001)).round(2) count_score = counts["缺陷数"] / counts["缺陷数"].max() lift_score = (counts["异常倍数"] / 3).clip(upper=1) counts["风险分"] = (count_score * 55 + lift_score * 30 + counts["严重占比"] * 15).round(1) rows.append( counts[["维度", "候选值", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"]] ) if not rows: return pd.DataFrame( columns=["维度", "候选值", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"] ) return ( pd.concat(rows, ignore_index=True) .sort_values(["风险分", "缺陷数"], ascending=False) .head(12) .reset_index(drop=True) )