| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- """不良关键因子发现。"""
- import numpy as np
- import pandas as pd
- from defect_analysis.schemas import normalize_defect_schema
- DEFAULT_FACTOR_DIMENSIONS = [
- "equipment_id",
- "seat_id",
- "lam_equipment_id",
- "lam_seat_id",
- "lam_fixture_id",
- "lam_jig_id",
- "lam_nozzle_id",
- "material_lot_oca",
- "material_lot_glass",
- "material_lot_polarizer",
- "clean_equipment_id",
- "clean_slot_id",
- "bond_equipment_id",
- "bond_head_id",
- "recipe_id",
- "shift",
- "defect_geometry_type",
- ]
- def _build_target_mask(df, target_defect_type=None, target_severity=None):
- if target_defect_type:
- return df["defect_type"] == target_defect_type
- if target_severity:
- return df["severity"] == target_severity
- return df["severity"] == "严重"
- def find_key_factors(
- df,
- *,
- target_defect_type=None,
- target_severity=None,
- dimensions=None,
- min_count=3,
- min_lift=1.1,
- top_n=20,
- ):
- """查找与目标不良显著相关的关键因子。
- 当前实现是可解释统计排序:按每个维度取值计算目标占比、异常倍数和支持度,
- 用综合得分排序。它适合生产早期作为 ML 特征与根因候选的基线。
- """
- normalized = normalize_defect_schema(df)
- if normalized.empty:
- return pd.DataFrame()
- dimensions = DEFAULT_FACTOR_DIMENSIONS if dimensions is None else dimensions
- target_mask = _build_target_mask(normalized, target_defect_type, target_severity)
- baseline_rate = float(target_mask.mean())
- if baseline_rate <= 0:
- return pd.DataFrame(
- columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
- )
- rows = []
- total = len(normalized)
- for dimension in dimensions:
- if dimension not in normalized.columns:
- continue
- values = normalized[dimension].fillna("").astype(str)
- valid = normalized[values != ""].copy()
- if valid.empty:
- continue
- valid_target = target_mask.loc[valid.index]
- grouped = valid.assign(_target=valid_target.astype(int)).groupby(dimension)
- for value, group in grouped:
- count = len(group)
- if count < min_count:
- continue
- target_count = int(group["_target"].sum())
- if target_count == 0:
- continue
- target_rate = target_count / count
- lift = target_rate / baseline_rate
- if lift < min_lift:
- continue
- support = count / total
- score = (max(lift - 1, 0) * 45) + (target_rate * 30) + (np.sqrt(target_count) * 8) + (support * 17)
- rows.append(
- {
- "维度": dimension,
- "因子值": str(value),
- "样本数": int(count),
- "目标数": target_count,
- "目标占比": round(float(target_rate), 4),
- "基线占比": round(float(baseline_rate), 4),
- "异常倍数": round(float(lift), 2),
- "支持度": round(float(support), 4),
- "关键因子得分": round(float(score), 2),
- }
- )
- if not rows:
- return pd.DataFrame(
- columns=["维度", "因子值", "样本数", "目标数", "目标占比", "基线占比", "异常倍数", "支持度", "关键因子得分"]
- )
- result = pd.DataFrame(rows)
- return (
- result.sort_values(["关键因子得分", "目标数", "异常倍数"], ascending=False)
- .head(top_n)
- .reset_index(drop=True)
- )
|