"""缺陷分析页面的可测试业务逻辑。""" import numpy as np import pandas as pd def normalize_date_bounds(start_date, end_date): """把日期范围转换成左闭右开的时间边界,确保结束日期整天被包含。""" start_ts = pd.Timestamp(start_date).normalize() end_exclusive = pd.Timestamp(end_date).normalize() + pd.Timedelta(days=1) return start_ts, end_exclusive def apply_defect_filters( df, *, start_date, end_date, selected_types, selected_batches, selected_equipment, selected_seats, selected_shift="全部", selected_severity="全部", ): """应用页面筛选条件。""" start_ts, end_exclusive = normalize_date_bounds(start_date, end_date) mask = ( (df["timestamp"] >= start_ts) & (df["timestamp"] < end_exclusive) & (df["defect_type"].isin(selected_types)) & (df["batch_id"].isin(selected_batches)) & (df["equipment_id"].isin(selected_equipment)) ) if selected_shift != "全部": mask &= df["shift"] == selected_shift if selected_severity != "全部": mask &= df["severity"] == selected_severity if selected_seats: mask &= df["seat_id"].isin(selected_seats) return df[mask].copy() def calculate_kpis(source_df, filtered_df): """基于当前筛选结果计算页面 KPI。""" total_panels_inspected = filtered_df["panel_id"].nunique() defective_panels = filtered_df["panel_id"].nunique() total_defects = len(filtered_df) critical_defects = int((filtered_df["severity"] == "严重").sum()) if total_defects else 0 top_defect_type = filtered_df["defect_type"].mode().iloc[0] if total_defects else "-" yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100 return { "total_panels_inspected": int(total_panels_inspected), "defective_panels": int(defective_panels), "yield_rate": float(yield_rate), "total_defects": int(total_defects), "critical_defects": int(critical_defects), "top_defect_type": top_defect_type, } def calculate_spc_metrics(df): """计算 SPC 所需数据,防止模拟分母造成非法概率。""" daily = df.groupby("day").agg( total_defects=("defect_id", "count"), panels_with_defects=("panel_id", "nunique"), ).reset_index() daily["day"] = pd.to_datetime(daily["day"]) daily = daily.sort_values("day").reset_index(drop=True) if len(daily) < 2: return { "daily": daily, "p_bar": 0.0, "ucl": 0.0, "lcl": 0.0, "uwl": 0.0, "lwl": 0.0, "sigma_p": 0.0, } total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1 total_unique_panels = df["panel_id"].nunique() estimated = max(total_unique_panels // max(total_days // 7, 1), 1) daily["estimated_inspected"] = np.maximum(estimated, daily["panels_with_defects"]) daily["defect_rate"] = ( daily["panels_with_defects"] / daily["estimated_inspected"] ).clip(lower=0, upper=1) p_bar = float(np.clip(daily["defect_rate"].mean(), 0, 1)) n_avg = float(daily["estimated_inspected"].mean()) sigma_p = float(np.sqrt(max(p_bar * (1 - p_bar), 0) / n_avg)) if n_avg > 0 else 0.0 return { "daily": daily, "p_bar": p_bar, "ucl": min(1.0, p_bar + 3 * sigma_p), "lcl": max(0.0, p_bar - 3 * sigma_p), "uwl": min(1.0, p_bar + 2 * sigma_p), "lwl": max(0.0, p_bar - 2 * sigma_p), "sigma_p": sigma_p, } def build_diagnostic_dashboard(df): """生成诊断驾驶舱需要的摘要、根因候选和趋势数据。""" total_defects = len(df) if total_defects == 0: return { "severity_level": "正常", "top_defect_type": "-", "top_defect_share": 0.0, "serious_share": 0.0, "root_causes": pd.DataFrame(), "daily_trend": pd.DataFrame(), "pareto": pd.DataFrame(), "primary_recommendation": "当前筛选条件下没有缺陷记录。", } type_counts = df["defect_type"].value_counts() top_defect_type = type_counts.index[0] top_defect_share = float(type_counts.iloc[0] / total_defects) serious_share = float((df["severity"] == "严重").sum() / total_defects) root_causes = ( df.groupby(["equipment_id", "seat_id"]) .agg( 缺陷数=("defect_id", "count"), 涉及面板=("panel_id", "nunique"), 主要缺陷=("defect_type", lambda s: s.mode().iloc[0]), 严重数=("severity", lambda s: int((s == "严重").sum())), ) .reset_index() ) root_causes["根因候选"] = root_causes["equipment_id"] + " / " + root_causes["seat_id"] root_causes["占比"] = root_causes["缺陷数"] / total_defects root_causes["严重占比"] = root_causes["严重数"] / root_causes["缺陷数"].clip(lower=1) equipment_totals = df.groupby("equipment_id")["defect_id"].count() equipment_seat_counts = df.groupby("equipment_id")["seat_id"].nunique().clip(lower=1) root_causes["期望缺陷数"] = root_causes["equipment_id"].map( equipment_totals / equipment_seat_counts ).clip(lower=0.001) root_causes["异常倍数"] = (root_causes["缺陷数"] / root_causes["期望缺陷数"]).round(2) count_score = root_causes["缺陷数"] / root_causes["缺陷数"].max() panel_score = root_causes["涉及面板"] / df["panel_id"].nunique() lift_score = (root_causes["异常倍数"] / 3).clip(upper=1) root_causes["风险分"] = ( count_score * 55 + lift_score * 25 + root_causes["严重占比"] * 15 + panel_score * 5 ).round(1) root_causes = root_causes.sort_values(["风险分", "缺陷数"], ascending=False).head(8) root_causes = root_causes[ ["根因候选", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"] ].reset_index(drop=True) pareto = type_counts.rename_axis("缺陷类型").reset_index(name="缺陷数") pareto["占比"] = pareto["缺陷数"] / total_defects pareto["累计占比"] = pareto["占比"].cumsum() daily_trend = df.groupby("day").size().rename("缺陷数").reset_index() daily_trend["day"] = pd.to_datetime(daily_trend["day"]) daily_trend = daily_trend.sort_values("day") if serious_share >= 0.2 or (len(root_causes) > 0 and root_causes.iloc[0]["占比"] >= 0.15): severity_level = "严重" elif serious_share >= 0.1 or top_defect_share >= 0.35: severity_level = "关注" else: severity_level = "正常" if len(root_causes) > 0: top_root = root_causes.iloc[0] primary_recommendation = ( f"优先排查 {top_root['根因候选']},该组合贡献 {top_root['占比']:.1%} " f"缺陷,异常倍数 {top_root['异常倍数']:.2f}x,主要类型为 {top_root['主要缺陷']}。" ) else: primary_recommendation = f"优先排查 {top_defect_type} 相关工艺参数。" return { "severity_level": severity_level, "top_defect_type": top_defect_type, "top_defect_share": top_defect_share, "serious_share": serious_share, "root_causes": root_causes, "daily_trend": daily_trend, "pareto": pareto, "primary_recommendation": primary_recommendation, }