"""缺陷分析页面的可测试业务逻辑。""" import numpy as np import pandas as pd from defect_analysis.ml.model_bundle import create_model_bundle from defect_analysis.ml.predict import predict_key_factors from defect_analysis.root_cause import EXTENDED_ROOT_CAUSE_DIMENSIONS, build_extended_root_causes from defect_analysis.schemas import ( CORE_REQUIRED_COLUMNS, INDUSTRY_OPTIONAL_COLUMNS, TEMPLATE_COLUMNS, get_missing_required_columns, normalize_defect_schema, ) DEFECT_SOP_RECOMMENDATIONS = { "划痕": ["检查搬运轨道、吸嘴和治具接触面", "复核清洗滚刷与擦拭工位是否有硬质颗粒"], "气泡": ["检查贴合压力、真空度、OCA 状态和贴合速度", "复核贴合前清洁与材料开封时长"], "漏光": ["检查边缘贴合、背光组装、框胶和压合均匀性", "复核四角/边缘区应力与夹持状态"], "色差": ["检查背光、偏光片批次、贴合应力和老化条件", "对比同批材料与相邻工艺参数"], "异物": ["检查洁净度、清洗段、静电控制和材料暴露时间", "追溯同批材料与工位环境记录"], "亮点": ["复核点灯/AOI 判定、TFT 像素缺陷和异物压伤", "抽查高发区域是否存在压接或污染"], "暗点": ["复核点灯/AOI 判定、TFT 像素缺陷和异物压伤", "检查绑定/驱动相关区域异常"], "裂纹": ["立即检查切割、搬运、夹持和跌落冲击风险", "对同批面板执行 Hold 与复检"], } def normalize_date_bounds(start_date, end_date): """把日期范围转换成左闭右开的时间边界,确保结束日期整天被包含。""" start_ts = pd.Timestamp(start_date).normalize() end_exclusive = pd.Timestamp(end_date).normalize() + pd.Timedelta(days=1) return start_ts, end_exclusive def apply_defect_filters( df, *, start_date, end_date, selected_types, selected_batches, selected_equipment, selected_seats, selected_shift="全部", selected_severity="全部", ): """应用页面筛选条件。""" start_ts, end_exclusive = normalize_date_bounds(start_date, end_date) mask = ( (df["timestamp"] >= start_ts) & (df["timestamp"] < end_exclusive) & (df["defect_type"].isin(selected_types)) & (df["batch_id"].isin(selected_batches)) & (df["equipment_id"].isin(selected_equipment)) ) if selected_shift != "全部": mask &= df["shift"] == selected_shift if selected_severity != "全部": mask &= df["severity"] == selected_severity if selected_seats: mask &= df["seat_id"].isin(selected_seats) return df[mask].copy() def classify_panel_zone(df): """按 3C 面板行业常用语义把坐标映射到关键区域。""" width = df.get("panel_width_mm", pd.Series(155.0, index=df.index)).replace(0, np.nan) height = df.get("panel_height_mm", pd.Series(340.0, index=df.index)).replace(0, np.nan) x = df.get("x_mm", width * 0.5) y = df.get("y_mm", height * 0.5) x_norm = x / width y_norm = y / height zones = [] for x, y in zip(x_norm.fillna(0.5), y_norm.fillna(0.5)): labels = [] if x <= 0.1: labels.append("左边缘区") if x >= 0.9: labels.append("右边缘区") if y <= 0.1: labels.append("下边缘区") if y >= 0.9: labels.append("上边缘区") if (x <= 0.12 or x >= 0.88) and (y <= 0.12 or y >= 0.88): labels.append("角落区") if 0.68 <= y <= 0.88 and 0.25 <= x <= 0.75: labels.append("FPC/绑定区") if not labels: labels.append("显示中心区") zones.append(" / ".join(labels)) return pd.Series(zones, index=df.index, name="panel_zone") def calculate_kpis(source_df, filtered_df): """基于当前筛选结果计算页面 KPI。""" total_panels_inspected = filtered_df["panel_id"].nunique() defective_panels = filtered_df["panel_id"].nunique() total_defects = len(filtered_df) critical_defects = int((filtered_df["severity"] == "严重").sum()) if total_defects else 0 top_defect_type = filtered_df["defect_type"].mode().iloc[0] if total_defects else "-" yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100 return { "total_panels_inspected": int(total_panels_inspected), "defective_panels": int(defective_panels), "yield_rate": float(yield_rate), "total_defects": int(total_defects), "critical_defects": int(critical_defects), "top_defect_type": top_defect_type, } def calculate_spc_metrics(df): """计算 SPC 所需数据,防止模拟分母造成非法概率。""" daily = df.groupby("day").agg( total_defects=("defect_id", "count"), panels_with_defects=("panel_id", "nunique"), ).reset_index() daily["day"] = pd.to_datetime(daily["day"]) daily = daily.sort_values("day").reset_index(drop=True) if len(daily) < 2: return { "daily": daily, "p_bar": 0.0, "ucl": 0.0, "lcl": 0.0, "uwl": 0.0, "lwl": 0.0, "sigma_p": 0.0, } total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1 total_unique_panels = df["panel_id"].nunique() estimated = max(total_unique_panels // max(total_days // 7, 1), 1) daily["estimated_inspected"] = np.maximum(estimated, daily["panels_with_defects"]) daily["defect_rate"] = ( daily["panels_with_defects"] / daily["estimated_inspected"] ).clip(lower=0, upper=1) p_bar = float(np.clip(daily["defect_rate"].mean(), 0, 1)) n_avg = float(daily["estimated_inspected"].mean()) sigma_p = float(np.sqrt(max(p_bar * (1 - p_bar), 0) / n_avg)) if n_avg > 0 else 0.0 return { "daily": daily, "p_bar": p_bar, "ucl": min(1.0, p_bar + 3 * sigma_p), "lcl": max(0.0, p_bar - 3 * sigma_p), "uwl": min(1.0, p_bar + 2 * sigma_p), "lwl": max(0.0, p_bar - 2 * sigma_p), "sigma_p": sigma_p, } def build_diagnostic_dashboard(df): """生成诊断驾驶舱需要的摘要、根因候选和趋势数据。""" total_defects = len(df) if total_defects == 0: return { "severity_level": "正常", "top_defect_type": "-", "top_defect_share": 0.0, "serious_share": 0.0, "root_causes": pd.DataFrame(), "extended_root_causes": pd.DataFrame(), "daily_trend": pd.DataFrame(), "pareto": pd.DataFrame(), "primary_recommendation": "当前筛选条件下没有缺陷记录。", } type_counts = df["defect_type"].value_counts() zones = classify_panel_zone(df) zone_counts = zones.value_counts() top_defect_type = type_counts.index[0] top_defect_share = float(type_counts.iloc[0] / total_defects) top_zone = zone_counts.index[0] top_zone_share = float(zone_counts.iloc[0] / total_defects) serious_share = float((df["severity"] == "严重").sum() / total_defects) root_causes = ( df.groupby(["equipment_id", "seat_id"]) .agg( 缺陷数=("defect_id", "count"), 涉及面板=("panel_id", "nunique"), 主要缺陷=("defect_type", lambda s: s.mode().iloc[0]), 严重数=("severity", lambda s: int((s == "严重").sum())), ) .reset_index() ) root_causes["根因候选"] = root_causes["equipment_id"] + " / " + root_causes["seat_id"] root_causes["占比"] = root_causes["缺陷数"] / total_defects root_causes["严重占比"] = root_causes["严重数"] / root_causes["缺陷数"].clip(lower=1) equipment_totals = df.groupby("equipment_id")["defect_id"].count() equipment_seat_counts = df.groupby("equipment_id")["seat_id"].nunique().clip(lower=1) root_causes["期望缺陷数"] = root_causes["equipment_id"].map( equipment_totals / equipment_seat_counts ).clip(lower=0.001) root_causes["异常倍数"] = (root_causes["缺陷数"] / root_causes["期望缺陷数"]).round(2) count_score = root_causes["缺陷数"] / root_causes["缺陷数"].max() panel_score = root_causes["涉及面板"] / df["panel_id"].nunique() lift_score = (root_causes["异常倍数"] / 3).clip(upper=1) root_causes["风险分"] = ( count_score * 55 + lift_score * 25 + root_causes["严重占比"] * 15 + panel_score * 5 ).round(1) root_causes = root_causes.sort_values(["风险分", "缺陷数"], ascending=False).head(8) root_causes = root_causes[ ["根因候选", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"] ].reset_index(drop=True) pareto = type_counts.rename_axis("缺陷类型").reset_index(name="缺陷数") pareto["占比"] = pareto["缺陷数"] / total_defects pareto["累计占比"] = pareto["占比"].cumsum() daily_trend = df.groupby("day").size().rename("缺陷数").reset_index() daily_trend["day"] = pd.to_datetime(daily_trend["day"]) daily_trend = daily_trend.sort_values("day") extended_root_causes = build_extended_root_causes(df) if serious_share >= 0.2 or (len(root_causes) > 0 and root_causes.iloc[0]["占比"] >= 0.15): severity_level = "严重" elif serious_share >= 0.1 or top_defect_share >= 0.35: severity_level = "关注" else: severity_level = "正常" if len(root_causes) > 0: top_root = root_causes.iloc[0] primary_recommendation = ( f"优先排查 {top_root['根因候选']},该组合贡献 {top_root['占比']:.1%} " f"缺陷,异常倍数 {top_root['异常倍数']:.2f}x,主要类型为 {top_root['主要缺陷']}。" ) else: primary_recommendation = f"优先排查 {top_defect_type} 相关工艺参数。" return { "severity_level": severity_level, "top_defect_type": top_defect_type, "top_defect_share": top_defect_share, "top_zone": top_zone, "top_zone_share": top_zone_share, "zone_distribution": zone_counts.rename_axis("区域").reset_index(name="缺陷数"), "serious_share": serious_share, "root_causes": root_causes, "extended_root_causes": extended_root_causes, "daily_trend": daily_trend, "pareto": pareto, "primary_recommendation": primary_recommendation, } def detect_industry_patterns(df): """识别面板行业常见缺陷模式。""" if df.empty: return [] patterns = [] zones = classify_panel_zone(df) zone_share = zones.value_counts(normalize=True) if any(idx != "显示中心区" and share >= 0.35 for idx, share in zone_share.items()): patterns.append(f"区域集中: {zone_share.index[0]} 占比 {zone_share.iloc[0]:.1%}") coord_df = df.copy() coord_df["x_bin"] = (coord_df["x_mm"] // 5).astype(int) coord_df["y_bin"] = (coord_df["y_mm"] // 5).astype(int) repeat = coord_df.groupby(["x_bin", "y_bin"])["panel_id"].nunique().max() if repeat >= min(3, max(2, df["panel_id"].nunique())): patterns.append("跨面板重复坐标: 疑似治具、吸嘴、压头或固定接触点异常") if df["x_mm"].nunique() >= 3 and df["y_mm"].nunique() >= 3 and len(df) >= 6: corr = abs(pd.Series(df["x_mm"]).corr(pd.Series(df["y_mm"]))) if pd.notna(corr) and corr >= 0.85: patterns.append("线状分布: 疑似搬运划伤、滚轮轨迹或线性压伤") batch_share = df["batch_id"].value_counts(normalize=True).iloc[0] if batch_share >= 0.5 and df["batch_id"].nunique() > 1: patterns.append(f"批次集中: {df['batch_id'].value_counts().index[0]} 占比 {batch_share:.1%}") return patterns or ["随机点状分布: 更偏向材料、环境尘埃或偶发检出"] def generate_industry_diagnosis(df, dashboard): """生成 3C 面板行业化诊断结论和排查建议。""" if df.empty: return { "headline": "当前筛选条件下没有可诊断缺陷。", "patterns": [], "recommendations": ["放宽筛选条件或上传更多检测记录后再诊断。"], } top_type = dashboard["top_defect_type"] top_zone = dashboard.get("top_zone", classify_panel_zone(df).value_counts().index[0]) top_root = dashboard["root_causes"].iloc[0]["根因候选"] if len(dashboard["root_causes"]) else "当前筛选范围" patterns = detect_industry_patterns(df) recommendations = [] if top_type in DEFECT_SOP_RECOMMENDATIONS: recommendations.extend(DEFECT_SOP_RECOMMENDATIONS[top_type]) if "边缘" in top_zone or "角落" in top_zone: recommendations.append("优先复核边缘贴合、切割/搬运夹持、吸附接触面和四角应力状态") if "FPC" in top_zone or "绑定" in top_zone: recommendations.append("重点检查绑定压力、FPC/COF 区域异物、压接参数和 AOI 复判样本") if any("跨面板重复" in p for p in patterns): recommendations.append("对高发座号对应治具、吸嘴、压头做点检,并抽查同坐标复现样本") if dashboard["serious_share"] >= 0.2: recommendations.append("严重缺陷占比较高,建议对相关批次执行 Hold、复检或加严抽样") deduped = [] for item in recommendations: if item not in deduped: deduped.append(item) headline = ( f"{top_zone} 的 {top_type} 最突出,首要候选为 {top_root}。" f"建议按工序链路优先排查材料、贴合/搬运接触面和对应治具状态。" ) return { "headline": headline, "patterns": patterns, "recommendations": deduped[:5], } def build_ml_factor_insights( df, *, target_defect_type=None, target_severity=None, model_name="random_forest", top_n=10, ): """构建页面可展示的 ML 关键因子、验证指标和特征解释。""" normalized = normalize_defect_schema(df) resolved_target_type = target_defect_type if resolved_target_type is None and not normalized.empty: resolved_target_type = normalized["defect_type"].mode().iloc[0] base = { "target_defect_type": resolved_target_type, "target_severity": target_severity, "model_name": model_name, "key_factors": pd.DataFrame(), "metrics": {}, "validation_metrics": {}, "feature_importance": [], "error": None, } if normalized.empty: base["error"] = "当前筛选条件下没有可训练数据。" return base try: base["key_factors"] = predict_key_factors( normalized, target_defect_type=resolved_target_type, target_severity=target_severity, model_name=model_name, top_n=top_n, ) bundle = create_model_bundle( normalized, model_name=model_name, target_defect_type=resolved_target_type, target_severity=target_severity, ) except (RuntimeError, ValueError) as exc: base["error"] = str(exc) return base base["metrics"] = bundle["metrics"] base["validation_metrics"] = bundle["validation_metrics"] base["feature_importance"] = bundle["feature_importance"] return base