""" 缺陷集中性分析 - Streamlit 交互式可视化页面 """ import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import matplotlib.font_manager as fm import seaborn as sns import streamlit as st import plotly.express as px import plotly.graph_objects as go import os from datetime import datetime from sklearn.cluster import DBSCAN from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler from app_utils import ( apply_defect_filters, build_diagnostic_dashboard, calculate_kpis, calculate_spc_metrics, generate_industry_diagnosis, ) # --- 中文字体设置 --- def setup_chinese_font(): """设置中文字体""" font_paths = [ r"C:\Windows\Fonts\msyh.ttc", # 微软雅黑 r"C:\Windows\Fonts\simhei.ttf", # 黑体 r"C:\Windows\Fonts\simsun.ttc", # 宋体 r"C:\Windows\Fonts\malgun.ttf", # Malgun Gothic ] for fp in font_paths: if os.path.exists(fp): font_prop = fm.FontProperties(fname=fp) plt.rcParams["font.family"] = font_prop.get_name() plt.rcParams["axes.unicode_minus"] = False return font_prop # fallback plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "Arial Unicode MS"] plt.rcParams["axes.unicode_minus"] = False return None setup_chinese_font() # --- 页面配置 --- st.set_page_config( page_title="屏幕缺陷集中性分析", page_icon="🔍", layout="wide", initial_sidebar_state="expanded" ) # --- 侧边栏 --- st.sidebar.title("🔍 筛选条件") # --- 数据源切换 --- st.sidebar.divider() st.sidebar.subheader("📂 数据源") data_source = st.sidebar.radio("选择数据源", ["内置模拟数据", "上传CSV文件"], label_visibility="collapsed") REQUIRED_COLUMNS = [ "defect_id", "panel_id", "batch_id", "equipment_id", "seat_id", "inspection_station", "timestamp", "defect_type", "severity", "x_mm", "y_mm", "panel_width_mm", "panel_height_mm", "hour", "shift", "day", ] uploaded_df = None if data_source == "上传CSV文件": uploaded_file = st.sidebar.file_uploader("上传CSV文件", type=["csv"], accept_multiple_files=False) if uploaded_file is not None: try: uploaded_df = pd.read_csv(uploaded_file, parse_dates=["timestamp"]) uploaded_df["timestamp"] = pd.to_datetime(uploaded_df["timestamp"]) missing = [c for c in REQUIRED_COLUMNS if c not in uploaded_df.columns] if missing: st.sidebar.error(f"缺少字段: {', '.join(missing)}") uploaded_df = None else: st.sidebar.success(f"已加载 {len(uploaded_df)} 条记录") # 下载模板 template_df = pd.DataFrame(columns=REQUIRED_COLUMNS) csv_template = template_df.to_csv(index=False, encoding="utf-8-sig") st.sidebar.download_button( label="📋 下载数据格式模板", data=csv_template, file_name="defect_data_template.csv", mime="text/csv" ) except Exception as e: st.sidebar.error(f"CSV解析失败: {e}") uploaded_df = None else: st.sidebar.info("请选择一个CSV文件上传") # --- 加载数据 --- @st.cache_data(ttl=300) def load_data_from_csv(): """加载内置模拟数据""" if not os.path.exists("defect_data.csv"): st.error("未找到 defect_data.csv,请先运行 generate_data.py 生成数据") return None df = pd.read_csv("defect_data.csv", parse_dates=["timestamp"]) df["timestamp"] = pd.to_datetime(df["timestamp"]) return df if data_source == "上传CSV文件" and uploaded_df is not None: df = uploaded_df else: df = load_data_from_csv() if df is None: st.stop() # --- 角色视图 --- st.sidebar.divider() st.sidebar.subheader("👤 视图模式") view_mode = st.sidebar.selectbox( "选择视图模式", options=["操作员", "工程师", "管理者"], index=1, help="操作员: 基础分析 | 工程师: 全部功能 | 管理者: KPI+SPC+健康评分" ) # 各角色可见的 Tab tab_visibility = { "操作员": { "tabs": ["🗺️ 空间集中性", "📊 类型集中性 (帕累托)", "📈 时间集中性", "🏗️ 设备座号集中性", "🔬 缺陷模式识别", "🧭 诊断驾驶舱"], "show_kpi": True, "show_export": True, }, "工程师": { "tabs": "all", "show_kpi": True, "show_export": True, }, "管理者": { "tabs": ["🚨 SPC 控制图与预警", "🔬 缺陷模式识别", "💚 设备健康与共性分析", "📊 类型集中性 (帕累托)", "📈 时间集中性", "🧭 诊断驾驶舱"], "show_kpi": True, "show_export": True, }, } # 应用 Tab 可见性 current_config = tab_visibility[view_mode] # --- 筛选条件 --- # 日期范围 min_date = df["timestamp"].min().date() max_date = df["timestamp"].max().date() date_range = st.sidebar.date_input( "日期范围", value=[min_date, max_date], min_value=min_date, max_value=max_date ) if len(date_range) == 2: start_date, end_date = pd.Timestamp(date_range[0]), pd.Timestamp(date_range[1]) else: start_date, end_date = pd.Timestamp(min_date), pd.Timestamp(max_date) # 缺陷类型 all_types = sorted(df["defect_type"].unique()) selected_types = st.sidebar.multiselect("缺陷类型", options=all_types, default=all_types) # 班次 shift_options = ["全部", "白班", "夜班"] selected_shift = st.sidebar.radio("班次", options=shift_options) # 批次 all_batches = sorted(df["batch_id"].unique()) selected_batches = st.sidebar.multiselect("批次", options=all_batches, default=all_batches) # 严重程度 all_severities = ["全部", "轻微", "中等", "严重"] selected_severity = st.sidebar.selectbox("严重程度", options=all_severities) # 设备 all_equipment = sorted(df["equipment_id"].unique()) selected_equipment = st.sidebar.multiselect("前贴附设备", options=all_equipment, default=all_equipment) # 座号(随设备联动) if selected_equipment: eq_seats = sorted(df[df["equipment_id"].isin(selected_equipment)]["seat_id"].unique()) selected_seats = st.sidebar.multiselect("座号", options=eq_seats, default=eq_seats) else: selected_seats = [] filtered_df = apply_defect_filters( df, start_date=start_date, end_date=end_date, selected_types=selected_types, selected_batches=selected_batches, selected_equipment=selected_equipment, selected_seats=selected_seats, selected_shift=selected_shift, selected_severity=selected_severity, ) # ========== KPI 看板 ========== kpis = calculate_kpis(df, filtered_df) total_panels_inspected = kpis["total_panels_inspected"] defective_panels = kpis["defective_panels"] yield_rate = kpis["yield_rate"] total_defects = kpis["total_defects"] critical_defects = kpis["critical_defects"] top_defect_type = kpis["top_defect_type"] kpi1, kpi2, kpi3, kpi4, kpi5, kpi6 = st.columns(6) kpi1.metric("检测面板数", f"{total_panels_inspected} 块") kpi2.metric("不良面板数", f"{defective_panels} 块", delta=f"{defective_panels/total_panels_inspected*100:.1f}%" if total_panels_inspected > 0 else "0%") kpi3.metric("综合良率", f"{yield_rate:.1f}%", delta=f"{yield_rate - 95:.1f}%", delta_color="normal" if yield_rate >= 95 else "inverse") kpi4.metric("缺陷总数", f"{total_defects} 个") kpi5.metric("严重缺陷", f"{critical_defects} 个", delta=f"{critical_defects/max(total_defects,1)*100:.1f}%" if total_defects > 0 else "0%") kpi6.metric("主要缺陷类型", top_defect_type) # 第二排 KPI eq_concentrated = False if "equipment_id" in filtered_df.columns: eq_stats = filtered_df.groupby("equipment_id").size() top_eq = eq_stats.idxmax() if len(eq_stats) > 0 else "-" top_eq_count = eq_stats.max() if len(eq_stats) > 0 else 0 else: top_eq, top_eq_count = "-", 0 seat_concentrated = False if "seat_id" in filtered_df.columns and len(filtered_df) > 0: seat_stats = filtered_df.groupby("seat_id").size() if len(seat_stats) > 0: top_seat = seat_stats.idxmax() top_seat_count = seat_stats.max() avg_seat_count = seat_stats.mean() if top_seat_count > avg_seat_count * 2: seat_concentrated = True else: top_seat, top_seat_count = "-", 0 else: top_seat, top_seat_count = "-", 0 kpi7, kpi8, kpi9 = st.columns(3) kpi7.metric("最高缺陷设备", str(top_eq), f"{top_eq_count} 个缺陷") kpi8.metric("最高缺陷座号", str(top_seat), f"{top_seat_count} 个缺陷") if seat_concentrated: kpi9.metric("座号集中性", "⚠️ 存在集中", delta="需关注", delta_color="inverse") else: kpi9.metric("座号集中性", "✅ 正常分布") # --- 主标题 --- st.title("📊 屏幕缺陷集中性分析系统") st.markdown(f"**数据范围**: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')} | " f"**筛选后缺陷数**: {len(filtered_df)} 条 | " f"**涉及面板**: {filtered_df['panel_id'].nunique()} 块") st.divider() if filtered_df.empty: st.warning("当前筛选条件下没有缺陷记录,请放宽日期、批次、设备或缺陷类型筛选。") st.stop() # --- Tab 布局 (按角色动态) --- ALL_TABS = [ "🧭 诊断驾驶舱", "🗺️ 空间集中性", "📊 类型集中性 (帕累托)", "📈 时间集中性", "🏭 批次集中性", "🏗️ 设备座号集中性", "🔗 关联分析", "🧠 智能缺陷聚类 (DBSCAN)", "🚨 SPC 控制图与预警", "🔬 缺陷模式识别", "💚 设备健康与共性分析", "🔲 多层叠加分析" ] if current_config["tabs"] == "all": visible_tabs = ALL_TABS else: visible_tabs = [t for t in ALL_TABS if t in current_config["tabs"]] tab_containers = st.tabs(visible_tabs) tab_map = {name: container for name, container in zip(visible_tabs, tab_containers)} def get_tab(name): """获取指定 Tab 容器,如果不可见则返回 None""" return tab_map.get(name) # ========== Tab 0: 诊断驾驶舱 ========== _t = get_tab("🧭 诊断驾驶舱") if _t: with _t: dashboard = build_diagnostic_dashboard(filtered_df) industry_diagnosis = generate_industry_diagnosis(filtered_df, dashboard) level_colors = { "严重": ("#7f1d1d", "#fee2e2"), "关注": ("#92400e", "#fef3c7"), "正常": ("#14532d", "#dcfce7"), } level_fg, level_bg = level_colors.get(dashboard["severity_level"], ("#334155", "#e2e8f0")) st.markdown( """ """, unsafe_allow_html=True, ) st.markdown( f"""
当前诊断等级:{dashboard["severity_level"]}

缺陷诊断驾驶舱

{dashboard["primary_recommendation"]}

""", unsafe_allow_html=True, ) card1, card2, card3, card4 = st.columns(4) with card1: st.markdown( f"""
筛选后缺陷
{len(filtered_df)}
涉及 {filtered_df["panel_id"].nunique()} 块面板
""", unsafe_allow_html=True, ) with card2: st.markdown( f"""
主导缺陷类型
{dashboard["top_defect_type"]}
占全部缺陷 {dashboard["top_defect_share"]:.1%}
""", unsafe_allow_html=True, ) with card3: st.markdown( f"""
严重缺陷占比
{dashboard["serious_share"]:.1%}
高于 20% 建议立即复盘
""", unsafe_allow_html=True, ) with card4: top_root = dashboard["root_causes"].iloc[0] if len(dashboard["root_causes"]) else None root_name = top_root["根因候选"] if top_root is not None else "-" root_share = top_root["占比"] if top_root is not None else 0 root_lift = top_root["异常倍数"] if top_root is not None else 0 st.markdown( f"""
首要根因候选
{root_name}
贡献 {root_share:.1%} 缺陷,异常 {root_lift:.2f}x
""", unsafe_allow_html=True, ) st.markdown( f"""
3C 面板行业诊断结论
{industry_diagnosis["headline"]}
""", unsafe_allow_html=True, ) diag_col1, diag_col2 = st.columns([1, 1]) with diag_col1: st.subheader("识别到的缺陷模式") for pattern in industry_diagnosis["patterns"]: st.markdown(f"- {pattern}") with diag_col2: st.subheader("行业化排查建议") for idx, recommendation in enumerate(industry_diagnosis["recommendations"], 1): st.markdown(f"{idx}. {recommendation}") st.divider() left, right = st.columns([1.25, 1]) with left: st.subheader("交互式面板数字孪生") panel_w = float(df["panel_width_mm"].iloc[0]) panel_h = float(df["panel_height_mm"].iloc[0]) fig_map = go.Figure() fig_map.add_shape( type="rect", x0=0, y0=0, x1=panel_w, y1=panel_h, line=dict(color="#0f172a", width=2), fillcolor="#f8fafc", layer="below", ) fig_map.add_trace( go.Scatter( x=filtered_df["x_mm"], y=filtered_df["y_mm"], mode="markers", marker=dict( size=7, color=filtered_df["severity"].map({"轻微": 1, "中等": 2, "严重": 3}), colorscale=[[0, "#38bdf8"], [0.5, "#f59e0b"], [1, "#dc2626"]], showscale=True, colorbar=dict(title="严重度"), opacity=0.72, line=dict(width=0.4, color="#ffffff"), ), text=filtered_df["defect_id"], customdata=filtered_df[["defect_type", "severity", "equipment_id", "seat_id", "batch_id"]], hovertemplate=( "缺陷ID: %{text}
" "坐标: (%{x:.1f}, %{y:.1f}) mm
" "类型: %{customdata[0]}
" "严重度: %{customdata[1]}
" "设备/座号: %{customdata[2]} / %{customdata[3]}
" "批次: %{customdata[4]}" ), name="缺陷点", ) ) fig_map.add_vrect(x0=0, x1=panel_w * 0.1, fillcolor="#f97316", opacity=0.08, line_width=0) fig_map.add_vrect(x0=panel_w * 0.9, x1=panel_w, fillcolor="#f97316", opacity=0.08, line_width=0) fig_map.add_hrect(y0=panel_h * 0.72, y1=panel_h * 0.88, fillcolor="#14b8a6", opacity=0.09, line_width=0) fig_map.update_layout( height=560, margin=dict(l=18, r=18, t=30, b=18), plot_bgcolor="#ffffff", paper_bgcolor="#ffffff", xaxis=dict(title="X (mm)", range=[0, panel_w], showgrid=True, gridcolor="#e2e8f0"), yaxis=dict(title="Y (mm)", range=[0, panel_h], scaleanchor="x", scaleratio=1, showgrid=True, gridcolor="#e2e8f0"), title="按真实屏幕比例定位缺陷,橙色为边缘敏感区,青色为 FPC 关注区", ) st.plotly_chart(fig_map, use_container_width=True) fig_density = px.density_heatmap( filtered_df, x="x_mm", y="y_mm", nbinsx=28, nbinsy=42, color_continuous_scale="YlOrRd", title="密度热区视图", labels={"x_mm": "X (mm)", "y_mm": "Y (mm)"}, ) fig_density.update_layout(height=300, margin=dict(l=18, r=18, t=42, b=18)) st.plotly_chart(fig_density, use_container_width=True) with right: st.subheader("根因候选榜") root_causes = dashboard["root_causes"].copy() fig_root = px.bar( root_causes.sort_values("风险分", ascending=True), x="风险分", y="根因候选", orientation="h", color="异常倍数", color_continuous_scale="Tealrose", text="风险分", hover_data={ "缺陷数": True, "占比": ":.1%", "异常倍数": ":.2f", "涉及面板": True, "主要缺陷": True, "严重占比": ":.1%", "风险分": ":.1f", }, labels={"风险分": "风险分", "根因候选": ""}, ) fig_root.update_traces(texttemplate="%{text:.1f}", textposition="outside") fig_root.update_layout(height=360, margin=dict(l=8, r=20, t=20, b=20)) st.plotly_chart(fig_root, use_container_width=True) root_table = root_causes.copy() root_table["占比"] = root_table["占比"].map(lambda v: f"{v:.1%}") root_table["异常倍数"] = root_table["异常倍数"].map(lambda v: f"{v:.2f}x") root_table["严重占比"] = root_table["严重占比"].map(lambda v: f"{v:.1%}") st.dataframe(root_table, use_container_width=True, hide_index=True) st.caption("风险分 = 贡献规模 + 异常倍数 + 严重占比 + 涉及面板数。先查高贡献且高偏离的组合。") trend_col, pareto_col = st.columns([1, 1]) with trend_col: st.subheader("每日缺陷走势") daily_trend = dashboard["daily_trend"] fig_trend_dash = px.area( daily_trend, x="day", y="缺陷数", markers=True, color_discrete_sequence=["#0f766e"], labels={"day": "日期", "缺陷数": "缺陷数"}, ) fig_trend_dash.update_traces(line=dict(width=3), fillcolor="rgba(20, 184, 166, .22)") fig_trend_dash.update_layout(height=350, margin=dict(l=18, r=18, t=20, b=18)) st.plotly_chart(fig_trend_dash, use_container_width=True) with pareto_col: st.subheader("缺陷类型 Pareto") pareto = dashboard["pareto"].head(8) fig_pareto_dash = go.Figure() fig_pareto_dash.add_trace( go.Bar( x=pareto["缺陷类型"], y=pareto["缺陷数"], marker_color="#334155", name="缺陷数", hovertemplate="%{x}
缺陷数: %{y}", ) ) fig_pareto_dash.add_trace( go.Scatter( x=pareto["缺陷类型"], y=pareto["累计占比"], yaxis="y2", mode="lines+markers", line=dict(color="#dc2626", width=3), name="累计占比", hovertemplate="%{x}
累计占比: %{y:.1%}", ) ) fig_pareto_dash.update_layout( height=350, margin=dict(l=18, r=18, t=20, b=18), yaxis=dict(title="缺陷数"), yaxis2=dict(title="累计占比", overlaying="y", side="right", tickformat=".0%"), legend=dict(orientation="h", y=1.12), ) st.plotly_chart(fig_pareto_dash, use_container_width=True) # ========== Tab 1: 空间集中性 ========== _t = get_tab("🗺️ 空间集中性") if _t: with _t: st.header("缺陷空间分布热力图") col1, col2 = st.columns([2, 1]) with col1: # 热力图分辨率 grid_size = st.slider("热力图网格分辨率", min_value=5, max_value=50, value=20) fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # 左图:2D 热力图 x_edges = np.linspace(0, df["panel_width_mm"].iloc[0], grid_size + 1) y_edges = np.linspace(0, df["panel_height_mm"].iloc[0], grid_size + 1) H, _, _ = np.histogram2d( filtered_df["x_mm"], filtered_df["y_mm"], bins=[x_edges, y_edges] ) im = axes[0].imshow( H.T, origin="lower", aspect="auto", extent=[0, df["panel_width_mm"].iloc[0], 0, df["panel_height_mm"].iloc[0]], cmap="YlOrRd" ) axes[0].set_title(f"缺陷密度热力图 (总 {len(filtered_df)} 个)") axes[0].set_xlabel("X (mm)") axes[0].set_ylabel("Y (mm)") plt.colorbar(im, ax=axes[0], label="缺陷数量") # 右图:散点图(叠加) axes[1].scatter( filtered_df["x_mm"], filtered_df["y_mm"], alpha=0.3, s=5, c="red", edgecolors="none" ) axes[1].set_title("缺陷位置散点图") axes[1].set_xlabel("X (mm)") axes[1].set_ylabel("Y (mm)") axes[1].set_aspect("equal") st.pyplot(fig) plt.close() with col2: st.subheader("区域统计") # 将面板分为 9 宫格 x_bins = pd.cut(filtered_df["x_mm"], bins=3, labels=["左", "中", "右"]) y_bins = pd.cut(filtered_df["y_mm"], bins=3, labels=["上", "中", "下"]) region_df = pd.DataFrame({"X区域": x_bins, "Y区域": y_bins}) region_counts = region_df.groupby(["X区域", "Y区域"], observed=False).size().unstack(fill_value=0) st.dataframe(region_counts, use_container_width=True) # 高频缺陷区域 TOP5 st.subheader("高频缺陷区域 TOP5") region_df["区域"] = region_df["X区域"].astype(str) + "-" + region_df["Y区域"].astype(str) top_regions = region_df["区域"].value_counts().head(5) for i, (region, count) in enumerate(top_regions.items(), 1): st.metric(f"#{i} {region}", f"{count} 个缺陷") # --- 模拟面板缺陷标注图 --- st.divider() st.subheader("🖼️ 模拟面板缺陷标注图") st.markdown("选择批次和面板,查看缺陷在面板上的实际分布标注(按缺陷类型用不同颜色/形状区分)") ann_col1, ann_col2, ann_col3 = st.columns(3) with ann_col1: ann_batch = st.selectbox("选择批次", options=sorted(filtered_df["batch_id"].unique()), key="ann_batch") with ann_col2: panels_in_batch = sorted(filtered_df[filtered_df["batch_id"] == ann_batch]["panel_id"].unique()) ann_panel = st.selectbox("选择面板", options=panels_in_batch, key="ann_panel") with ann_col3: ann_show_label = st.checkbox("显示缺陷标签", value=True) panel_defects = filtered_df[(filtered_df["batch_id"] == ann_batch) & (filtered_df["panel_id"] == ann_panel)] if len(panel_defects) == 0: st.warning(f"当前面板 **{ann_panel}** (批次 {ann_batch}) 在筛选条件下无缺陷记录,请调整筛选条件或选择其他面板") else: pw = df["panel_width_mm"].iloc[0] ph = df["panel_height_mm"].iloc[0] # 缺陷类型 → 颜色/形状映射 type_style = { "划痕": {"color": "red", "marker": "x", "size": 80}, "亮点": {"color": "yellow", "marker": "o", "size": 60}, "暗点": {"color": "black", "marker": "x", "size": 60}, "气泡": {"color": "cyan", "marker": "o", "size": 100}, "色差": {"color": "magenta", "marker": "s", "size": 70}, "漏光": {"color": "orange", "marker": "D", "size": 80}, "裂纹": {"color": "darkred", "marker": "v", "size": 90}, "异物": {"color": "green", "marker": "P", "size": 80}, } fig_ann, ax_ann = plt.subplots(figsize=(3.5, 5)) # 面板背景(模拟屏幕灰色渐变) ax_ann.add_patch(plt.Rectangle((0, 0), pw, ph, facecolor="#1a1a2e", edgecolor="#444", linewidth=2)) # 内框(模拟屏幕可视区域) margin = 8 ax_ann.add_patch(plt.Rectangle((margin, margin), pw - 2*margin, ph - 2*margin, facecolor="#16213e", edgecolor="#0f3460", linewidth=1.5)) # FPC绑定区域标注 fpc_y = ph * 0.7 ax_ann.axhline(y=fpc_y, color="#555", linestyle="--", alpha=0.4, linewidth=0.5) ax_ann.text(pw/2, fpc_y + 2, "FPC区", color="#666", fontsize=7, ha="center", alpha=0.5) # 绘制缺陷标注 for _, row in panel_defects.iterrows(): style = type_style.get(row["defect_type"], {"color": "white", "marker": "o", "size": 50}) severity_size = {"轻微": 0.7, "中等": 1.0, "严重": 1.4}.get(row["severity"], 1.0) ax_ann.scatter(row["x_mm"], row["y_mm"], c=style["color"], marker=style["marker"], s=style["size"] * severity_size, edgecolors="white", linewidth=0.3, alpha=0.85, zorder=3) if ann_show_label: ax_ann.annotate(row["defect_type"][:2], (row["x_mm"], row["y_mm"]), fontsize=5, color="white", ha="center", va="bottom", alpha=0.7, zorder=4) # 图例 legend_elements = [plt.Line2D([0], [0], marker=type_style[t]["marker"], color="w", markerfacecolor=type_style[t]["color"], markersize=8, label=t, markeredgewidth=0.5, markeredgecolor="white") for t in type_style] ax_ann.legend(handles=legend_elements, loc="upper right", fontsize=7, framealpha=0.7, facecolor="#222", edgecolor="#555") ax_ann.set_xlim(-5, pw + 5) ax_ann.set_ylim(-5, ph + 5) ax_ann.set_title(f"面板 {ann_panel} | 批次 {ann_batch} | {len(panel_defects)} 个缺陷", fontsize=11, pad=10) ax_ann.set_xlabel("X (mm)") ax_ann.set_ylabel("Y (mm)") ax_ann.set_aspect("equal") ax_ann.grid(True, alpha=0.1, color="gray") st.pyplot(fig_ann) plt.close() # ========== Tab 2: 帕累托分析 ========== _t = get_tab("📊 类型集中性 (帕累托)") if _t: with _t: st.header("缺陷类型帕累托分析") type_counts = filtered_df["defect_type"].value_counts().reset_index() type_counts.columns = ["缺陷类型", "数量"] type_counts = type_counts.sort_values("数量", ascending=False).reset_index(drop=True) type_counts["累计占比"] = type_counts["数量"].cumsum() / type_counts["数量"].sum() * 100 type_counts["占比"] = type_counts["数量"] / type_counts["数量"].sum() * 100 fig, ax1 = plt.subplots(figsize=(10, 5)) # 柱状图 bars = ax1.bar(type_counts["缺陷类型"], type_counts["数量"], color="steelblue", alpha=0.8) ax1.set_xlabel("缺陷类型") ax1.set_ylabel("数量", color="steelblue") ax1.set_title("帕累托图 - 缺陷类型分布") # 累计占比折线 ax2 = ax1.twinx() ax2.plot(type_counts["缺陷类型"], type_counts["累计占比"], color="red", marker="o", linewidth=2) ax2.axhline(y=80, color="green", linestyle="--", alpha=0.5, label="80%线") ax2.set_ylabel("累计占比 (%)", color="red") ax2.set_ylim(0, 110) # 标注数值 for bar, count in zip(bars, type_counts["数量"]): ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 2, str(count), ha="center", va="bottom", fontsize=9) st.pyplot(fig) plt.close() # 数据表格 st.subheader("详细数据") st.dataframe(type_counts, use_container_width=True) # 严重程度分布 st.subheader("按严重程度分布") sev_counts = filtered_df["severity"].value_counts() fig2, ax = plt.subplots(figsize=(6, 4)) colors = {"轻微": "#4CAF50", "中等": "#FF9800", "严重": "#F44336"} sev_counts.plot(kind="bar", ax=ax, color=[colors.get(s, "gray") for s in sev_counts.index]) ax.set_title("缺陷严重程度分布") ax.set_ylabel("数量") st.pyplot(fig2) plt.close() # ========== Tab 3: 时间集中性 ========== _t = get_tab("📈 时间集中性") if _t: with _t: st.header("缺陷时间分布趋势") col1, col2 = st.columns(2) with col1: # 按天趋势 daily = filtered_df.groupby("day").size().reset_index(name="缺陷数") daily["day"] = pd.to_datetime(daily["day"]) fig1, ax1 = plt.subplots(figsize=(10, 4)) ax1.plot(daily["day"], daily["缺陷数"], marker="o", markersize=3, linewidth=1.5, color="steelblue") ax1.fill_between(daily["day"], daily["缺陷数"], alpha=0.2, color="steelblue") ax1.set_title("每日缺陷数量趋势") ax1.set_ylabel("缺陷数量") ax1.tick_params(axis="x", rotation=45) # 移动平均 if len(daily) > 3: daily["移动平均(3天)"] = daily["缺陷数"].rolling(window=3, min_periods=1).mean() ax1.plot(daily["day"], daily["移动平均(3天)"], color="red", linestyle="--", linewidth=2, alpha=0.7, label="3日移动平均") ax1.legend() st.pyplot(fig1) plt.close() with col2: # 按小时分布 hourly = filtered_df.groupby("hour").size().reindex(range(24), fill_value=0) fig2, ax2 = plt.subplots(figsize=(10, 4)) colors = ["#FF6B6B" if (h >= 17 or h < 8) else "#4ECDC4" for h in hourly.index] ax2.bar(hourly.index, hourly.values, color=colors, alpha=0.8) ax2.set_title("每小时缺陷分布 (红色=夜班)") ax2.set_xlabel("小时") ax2.set_ylabel("缺陷数量") st.pyplot(fig2) plt.close() # 班次对比 st.subheader("班次对比") shift_stats = filtered_df.groupby("shift").agg({ "defect_id": "count", "panel_id": "nunique" }).rename(columns={"defect_id": "缺陷数", "panel_id": "涉及面板数"}) st.dataframe(shift_stats, use_container_width=True) # 每周分布 st.subheader("按星期分布") filtered_df_copy = filtered_df.copy() filtered_df_copy["weekday"] = filtered_df_copy["timestamp"].dt.day_name() weekday_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] weekday_cn = {"Monday": "周一", "Tuesday": "周二", "Wednesday": "周三", "Thursday": "周四", "Friday": "周五", "Saturday": "周六", "Sunday": "周日"} filtered_df_copy["星期"] = filtered_df_copy["weekday"].map(weekday_cn) weekday_counts = filtered_df_copy.groupby("星期").size().reindex( [weekday_cn[d] for d in weekday_order], fill_value=0 ) fig3, ax3 = plt.subplots(figsize=(8, 4)) ax3.bar(range(7), weekday_counts.values, color="steelblue", alpha=0.8) ax3.set_xticks(range(7)) ax3.set_xticklabels(weekday_counts.index) ax3.set_title("按星期分布") ax3.set_ylabel("缺陷数量") st.pyplot(fig3) plt.close() # ========== Tab 4: 批次集中性 ========== _t = get_tab("🏭 批次集中性") if _t: with _t: st.header("批次缺陷集中性分析") batch_stats = filtered_df.groupby("batch_id").agg({ "defect_id": "count", "panel_id": "nunique", "severity": lambda x: (x == "严重").sum() }).rename(columns={"defect_id": "缺陷数", "panel_id": "面板数", "severity": "严重缺陷数"}) batch_stats["缺陷率"] = batch_stats["缺陷数"] / batch_stats["面板数"] batch_stats = batch_stats.sort_index() col1, col2 = st.columns(2) with col1: fig1, ax1 = plt.subplots(figsize=(10, 4)) ax1.bar(range(len(batch_stats)), batch_stats["缺陷数"], color="steelblue", alpha=0.8) ax1.set_title("各批次缺陷数量") ax1.set_xlabel("批次") ax1.set_ylabel("缺陷数") ax1.set_xticks(range(len(batch_stats))) ax1.set_xticklabels(batch_stats.index, rotation=90, fontsize=7) st.pyplot(fig1) plt.close() with col2: fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(range(len(batch_stats)), batch_stats["缺陷率"], marker="o", markersize=3, color="red", linewidth=1.5) ax2.axhline(y=batch_stats["缺陷率"].mean(), color="green", linestyle="--", label=f"平均缺陷率: {batch_stats['缺陷率'].mean():.2%}") ax2.set_title("各批次缺陷率趋势") ax2.set_xlabel("批次") ax2.set_ylabel("缺陷率") ax2.set_xticks(range(len(batch_stats))) ax2.set_xticklabels(batch_stats.index, rotation=90, fontsize=7) ax2.legend() st.pyplot(fig2) plt.close() # 异常批次 st.subheader("异常批次 (缺陷率 > 平均值 + 1倍标准差)") threshold = batch_stats["缺陷率"].mean() + batch_stats["缺陷率"].std() abnormal = batch_stats[batch_stats["缺陷率"] > threshold].sort_values("缺陷率", ascending=False) if len(abnormal) > 0: st.dataframe(abnormal, use_container_width=True) else: st.success("未发现异常批次") # ========== Tab 5: 设备座号集中性 ========== _t = get_tab("🏗️ 设备座号集中性") if _t: with _t: st.header("🏗️ 前贴附制程设备座号集中性分析") st.markdown( "分析缺陷是否集中在特定设备的特定座号(工位)。" "如果某个座号缺陷明显多于其他座号,说明该座号对应的设备局部存在问题(如吸嘴老化、加热不均、压力异常等)。" ) # --- 设备对比 --- st.subheader("设备级别对比") eq_stats = filtered_df.groupby("equipment_id").agg({ "defect_id": "count", "panel_id": "nunique", "severity": lambda x: (x == "严重").sum() }).rename(columns={"defect_id": "缺陷数", "panel_id": "面板数", "severity": "严重缺陷"}) eq_stats["缺陷率"] = eq_stats["缺陷数"] / eq_stats["面板数"] eq_stats = eq_stats.sort_values("缺陷数", ascending=False) col_eq1, col_eq2 = st.columns(2) with col_eq1: fig_eq1, ax_eq1 = plt.subplots(figsize=(8, 4)) bars1 = ax_eq1.bar(range(len(eq_stats)), eq_stats["缺陷数"], color=["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(eq_stats)], alpha=0.8) ax_eq1.set_xticks(range(len(eq_stats))) ax_eq1.set_xticklabels(eq_stats.index, fontsize=10) ax_eq1.set_ylabel("缺陷数量") ax_eq1.set_title("各设备缺陷总数") for bar, count in zip(bars1, eq_stats["缺陷数"]): ax_eq1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 3, str(count), ha="center", va="bottom", fontsize=10, fontweight="bold") st.pyplot(fig_eq1) plt.close() with col_eq2: fig_eq2, ax_eq2 = plt.subplots(figsize=(8, 4)) bars2 = ax_eq2.bar(range(len(eq_stats)), eq_stats["缺陷率"] * 100, color=["#FF6B6B", "#4ECDC4", "#45B7D1"][:len(eq_stats)], alpha=0.8) ax_eq2.set_xticks(range(len(eq_stats))) ax_eq2.set_xticklabels(eq_stats.index, fontsize=10) ax_eq2.set_ylabel("缺陷率 (%)") ax_eq2.set_title("各设备缺陷率") for bar, rate in zip(bars2, eq_stats["缺陷率"] * 100): ax_eq2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3, f"{rate:.1f}%", ha="center", va="bottom", fontsize=10, fontweight="bold") st.pyplot(fig_eq2) plt.close() st.dataframe(eq_stats, use_container_width=True) # --- 座号级别分析 --- st.divider() st.subheader("座号级别缺陷分布") # 选择设备查看座号 eq_for_seat = st.selectbox("选择设备查看座号分布", options=sorted(filtered_df["equipment_id"].unique()), key="eq_seat") eq_data = filtered_df[filtered_df["equipment_id"] == eq_for_seat] eq_info = None for eq_name, info in [("LAM-A01", {"rows": 4, "cols": 5}), ("LAM-A02", {"rows": 4, "cols": 5}), ("LAM-B01", {"rows": 5, "cols": 4})]: if eq_name == eq_for_seat: eq_info = info break seat_counts = eq_data.groupby("seat_id").size().reset_index(name="缺陷数") seat_counts = seat_counts.sort_values("缺陷数", ascending=False) if eq_info: # 网格热力图 grid = np.zeros((eq_info["rows"], eq_info["cols"])) seat_to_defects = eq_data.groupby("seat_id").size().to_dict() for r in range(1, eq_info["rows"] + 1): for c in range(1, eq_info["cols"] + 1): seat_name = f"R{r}C{c}" grid[r - 1, c - 1] = seat_to_defects.get(seat_name, 0) fig_grid, ax_grid = plt.subplots(figsize=(8, 6)) im = ax_grid.imshow(grid, cmap="YlOrRd", aspect="equal") ax_grid.set_title(f"{eq_for_seat} 座号缺陷热力图") ax_grid.set_xlabel("列号") ax_grid.set_ylabel("行号") ax_grid.set_xticks(range(eq_info["cols"])) ax_grid.set_xticklabels([f"C{i+1}" for i in range(eq_info["cols"])]) ax_grid.set_yticks(range(eq_info["rows"])) ax_grid.set_yticklabels([f"R{i+1}" for i in range(eq_info["rows"])]) # 标注数值 for r in range(eq_info["rows"]): for c in range(eq_info["cols"]): val = int(grid[r, c]) color = "white" if val > grid.max() * 0.7 else "black" ax_grid.text(c, r, str(val), ha="center", va="center", fontsize=10, color=color, fontweight="bold") plt.colorbar(im, ax=ax_grid, label="缺陷数量") st.pyplot(fig_grid) plt.close() else: fig_bar, ax_bar = plt.subplots(figsize=(10, 4)) ax_bar.bar(range(len(seat_counts)), seat_counts["缺陷数"], color="steelblue", alpha=0.8) ax_bar.set_xticks(range(len(seat_counts))) ax_bar.set_xticklabels(seat_counts["seat_id"], rotation=45, fontsize=8) ax_bar.set_ylabel("缺陷数量") ax_bar.set_title("座号缺陷分布") st.pyplot(fig_bar) plt.close() # 座号数据表格 st.dataframe(seat_counts, use_container_width=True) # --- 异常座号检测 --- st.divider() st.subheader("异常座号检测") all_seat_stats = filtered_df.groupby(["equipment_id", "seat_id"]).size().reset_index(name="缺陷数") overall_mean = all_seat_stats["缺陷数"].mean() overall_std = all_seat_stats["缺陷数"].std() threshold_1x = overall_mean + overall_std threshold_2x = overall_mean + 2 * overall_std st.info(f"📊 全局统计: 平均每个座号 **{overall_mean:.1f}** 个缺陷 | 标准差 **{overall_std:.1f}**") col_anom1, col_anom2 = st.columns(2) with col_anom1: st.markdown(f"**⚠️ 1σ 预警座号** (缺陷数 > {threshold_1x:.0f})") warning_seats = all_seat_stats[all_seat_stats["缺陷数"] > threshold_1x].sort_values("缺陷数", ascending=False) if len(warning_seats) > 0: st.dataframe(warning_seats.reset_index(drop=True), use_container_width=True) else: st.success("无预警座号") with col_anom2: st.markdown(f"**🔴 2σ 异常座号** (缺陷数 > {threshold_2x:.0f})") critical_seats = all_seat_stats[all_seat_stats["缺陷数"] > threshold_2x].sort_values("缺陷数", ascending=False) if len(critical_seats) > 0: st.dataframe(critical_seats.reset_index(drop=True), use_container_width=True) else: st.success("无异常座号") # --- 座号 × 缺陷类型 交叉分析 --- st.divider() st.subheader("座号 × 缺陷类型 交叉分析") st.markdown("识别哪些座号偏向产生特定类型的缺陷(如 R2C3 座号主要产生气泡 → 吸嘴问题)") if eq_info: eq_seat_type = eq_data.groupby(["seat_id", "defect_type"]).size().unstack(fill_value=0) fig_ct, ax_ct = plt.subplots(figsize=(10, 6)) sns.heatmap(eq_seat_type, annot=True, fmt="d", cmap="YlOrRd", ax=ax_ct, linewidths=0.5, linecolor="white") ax_ct.set_title(f"{eq_for_seat} 座号 × 缺陷类型 热力图") st.pyplot(fig_ct) plt.close() # ========== Tab 6: 关联分析 ========== _t = get_tab("🔗 关联分析") if _t: with _t: st.header("缺陷关联分析") col1, col2 = st.columns(2) with col1: # 缺陷类型 x 严重程度 交叉表 ct = pd.crosstab(filtered_df["defect_type"], filtered_df["severity"]) fig1, ax1 = plt.subplots(figsize=(8, 5)) sns.heatmap(ct, annot=True, fmt="d", cmap="YlOrRd", ax=ax1, linewidths=0.5, linecolor="white") ax1.set_title("缺陷类型 × 严重程度 热力图") st.pyplot(fig1) plt.close() with col2: # 缺陷类型 x 班次 交叉表 ct2 = pd.crosstab(filtered_df["defect_type"], filtered_df["shift"]) fig2, ax2 = plt.subplots(figsize=(8, 5)) sns.heatmap(ct2, annot=True, fmt="d", cmap="Blues", ax=ax2, linewidths=0.5, linecolor="white") ax2.set_title("缺陷类型 × 班次 热力图") st.pyplot(fig2) plt.close() # 面板缺陷 TOP10 st.subheader("缺陷最多的面板 TOP10") panel_defects = filtered_df.groupby("panel_id").agg({ "defect_id": "count", "defect_type": lambda x: x.mode().iloc[0] if len(x) > 0 else "N/A" }).rename(columns={"defect_id": "缺陷数", "defect_type": "主要缺陷类型"}) panel_defects = panel_defects.sort_values("缺陷数", ascending=False).head(10) st.dataframe(panel_defects, use_container_width=True) # 面板缺陷分布 fig3, ax3 = plt.subplots(figsize=(8, 4)) panel_counts = filtered_df.groupby("panel_id").size() ax3.hist(panel_counts, bins=20, color="steelblue", alpha=0.8, edgecolor="white") ax3.set_title("单面板缺陷数量分布") ax3.set_xlabel("缺陷数/面板") ax3.set_ylabel("面板数量") ax3.axvline(x=panel_counts.mean(), color="red", linestyle="--", label=f"平均: {panel_counts.mean():.1f}") ax3.legend() st.pyplot(fig3) plt.close() # --- 智能缺陷聚类 (DBSCAN + PCA) --- _t = get_tab("🧠 智能缺陷聚类 (DBSCAN)") if _t: with _t: st.header("🧠 DBSCAN 智能缺陷空间聚类") st.markdown( "**原理**: DBSCAN 是基于密度的空间聚类算法,能自动识别任意形状的缺陷聚集区域," "无需预设聚类数量,自动过滤随机散落的噪声缺陷。" "行业标准:半导体晶圆/面板缺陷模式识别首选算法。" ) col1, col2 = st.columns([2, 1]) with col1: # --- 参数控制 --- st.subheader("参数设置") p_col1, p_col2 = st.columns(2) with p_col1: eps = st.slider( "eps (邻域半径 mm)", min_value=5.0, max_value=100.0, value=25.0, step=5.0, help="两个点被视为'邻居'的最大距离。值越大,簇越大。" ) with p_col2: min_samples = st.slider( "min_samples (最小簇点数)", min_value=3, max_value=50, value=10, help="形成一个簇所需的最小点数。值越大,越严格的聚集才算簇。" ) # --- 执行聚类 --- coords = filtered_df[["x_mm", "y_mm"]].values scaler = StandardScaler() coords_scaled = scaler.fit_transform(coords) dbscan = DBSCAN(eps=eps / scaler.scale_[0], min_samples=min_samples) filtered_df["cluster"] = dbscan.fit_predict(coords_scaled) # 统计聚类结果 n_clusters = len(set(dbscan.labels_)) - (1 if -1 in dbscan.labels_ else 0) n_noise = list(dbscan.labels_).count(-1) st.info(f"📊 **聚类结果**: 发现 **{n_clusters}** 个缺陷聚集区域,**{n_noise}** 个噪声点(随机散落缺陷)") # --- 可视化 --- fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # 左图:聚类结果(空间位置) labels = filtered_df["cluster"].values unique_labels = set(labels) colors = plt.cm.get_cmap("tab20", len(unique_labels) if len(unique_labels) > 0 else 1) for k in unique_labels: if k == -1: # 噪声点 xy = filtered_df[labels == k][["x_mm", "y_mm"]].values axes[0].scatter(xy[:, 0], xy[:, 1], c="lightgray", s=3, alpha=0.3, label="噪声") else: xy = filtered_df[labels == k][["x_mm", "y_mm"]].values axes[0].scatter(xy[:, 0], xy[:, 1], c=[colors(k)], s=15, alpha=0.7, label=f"簇 {k+1} ({len(xy)} 点)") axes[0].set_title(f"DBSCAN 空间聚类结果 (eps={eps}, min_samples={min_samples})") axes[0].set_xlabel("X (mm)") axes[0].set_ylabel("Y (mm)") axes[0].set_aspect("equal") axes[0].legend(fontsize=7, loc="upper right", ncol=2) # 右图:PCA 降维可视化(加入更多特征维度) if len(filtered_df) > 2: # 构建多维特征:x, y, hour, defect_type编码, severity编码 feature_df = filtered_df[["x_mm", "y_mm", "hour"]].copy() # 缺陷类型编码 type_map = {t: i for i, t in enumerate(filtered_df["defect_type"].unique())} feature_df["type_code"] = filtered_df["defect_type"].map(type_map).astype(float) # 严重程度编码 sev_map = {"轻微": 0, "中等": 1, "严重": 2} feature_df["sev_code"] = filtered_df["severity"].map(sev_map).astype(float) features = feature_df.values features_scaled = StandardScaler().fit_transform(features) # PCA 降维到 2D n_components = min(2, features_scaled.shape[1]) pca = PCA(n_components=n_components) pca_result = pca.fit_transform(features_scaled) explained_var = pca.explained_variance_ratio_ for k in unique_labels: mask_k = labels == k if k == -1: axes[1].scatter(pca_result[mask_k, 0], pca_result[mask_k, 1], c="lightgray", s=3, alpha=0.3, label="噪声") else: axes[1].scatter(pca_result[mask_k, 0], pca_result[mask_k, 1], c=[colors(k)], s=15, alpha=0.7, label=f"簇 {k+1}") axes[1].set_title( f"PCA 多维特征降维\n" f"PC1: {explained_var[0]*100:.1f}% | PC2: {explained_var[1]*100:.1f}%" ) axes[1].set_xlabel("主成分 1") axes[1].set_ylabel("主成分 2") axes[1].legend(fontsize=7, loc="upper right") st.pyplot(fig) plt.close() # --- 簇特征统计 --- if n_clusters > 0: st.divider() st.subheader("各簇特征分析") cluster_data = [] for k in sorted([c for c in unique_labels if c != -1]): cluster_df = filtered_df[labels == k] cluster_data.append({ "簇编号": k + 1, "缺陷数量": len(cluster_df), "占比": f"{len(cluster_df)/len(filtered_df)*100:.1f}%", "中心X(mm)": round(cluster_df["x_mm"].mean(), 1), "中心Y(mm)": round(cluster_df["y_mm"].mean(), 1), "X范围": f"{cluster_df['x_mm'].min():.0f}~{cluster_df['x_mm'].max():.0f}", "Y范围": f"{cluster_df['y_mm'].min():.0f}~{cluster_df['y_mm'].max():.0f}", "主要缺陷": cluster_df["defect_type"].mode().iloc[0] if len(cluster_df) > 0 else "-", "主要严重度": cluster_df["severity"].mode().iloc[0] if len(cluster_df) > 0 else "-", "涉及批次": cluster_df["batch_id"].nunique(), "涉及面板": cluster_df["panel_id"].nunique(), }) st.dataframe(pd.DataFrame(cluster_data), use_container_width=True) with col2: # --- 聚类结果说明 --- st.subheader("📖 结果解读") st.markdown( f""" **当前参数**: eps={eps}mm, min_samples={min_samples} **聚类统计**: - 缺陷聚集区域: {n_clusters} 个 - 随机散落噪声: {n_noise} 个 - 噪声占比: {n_noise/len(filtered_df)*100:.1f}% **参数调优建议**: - **eps 调大** → 簇数量减少,簇变大 - **eps 调小** → 簇数量增加,更精细 - **min_samples 调大** → 只有高度密集区域才算簇 - **min_samples 调小** → 更多区域被识别为簇 **工业应用**: - 每个"簇"代表一个**系统性缺陷源** (如某台设备、某道工序、某个物料批次) - "噪声"点是随机缺陷,通常无需特别关注 - 重点关注**缺陷数量多、涉及批次集中**的簇 """ ) # --- 簇分布饼图 --- if n_clusters > 0: st.subheader("簇规模分布") cluster_counts = filtered_df[labels >= 0]["cluster"].value_counts().sort_index() fig_pie, ax_pie = plt.subplots(figsize=(5, 5)) pie_labels = [f"簇{i+1}" for i in cluster_counts.index] ax_pie.pie(cluster_counts.values, labels=pie_labels, autopct="%1.1f%%", colors=plt.cm.tab20.colors[:len(cluster_counts)], startangle=90) ax_pie.set_title("各簇缺陷占比") st.pyplot(fig_pie) plt.close() # --- DBSCAN vs K-Means 对比 --- st.subheader("为什么选 DBSCAN?") st.markdown( """ | 维度 | DBSCAN | K-Means | |------|--------|---------| | 形状适应 | ✅ 任意形状 | ❌ 仅球形 | | 预设K值 | ❌ 不需要 | ✅ 必须 | | 噪声处理 | ✅ 自动过滤 | ❌ 干扰聚类 | | 环形/线形缺陷 | ✅ 能识别 | ❌ 识别不了 | """ ) # ========== Tab 8: SPC 控制图与预警 ========== _t = get_tab("🚨 SPC 控制图与预警") if _t: with _t: st.header("🚨 SPC 统计过程控制") st.markdown( "基于统计过程控制(SPC)方法,监控每日缺陷率是否在控制限内," "自动检测异常趋势并给出改善/恶化结论。" ) # --- 数据准备:按天计算缺陷率 --- # 需要知道每天检测了多少面板才能算缺陷率 # 用 batch_id 近似日期 spc_metrics = calculate_spc_metrics(df) daily_all = spc_metrics["daily"] if len(daily_all) < 2: st.warning("数据天数不足,无法生成控制图") else: # 控制限计算 p_bar = spc_metrics["p_bar"] sigma_p = spc_metrics["sigma_p"] UCL = spc_metrics["ucl"] LCL = spc_metrics["lcl"] UWL = spc_metrics["uwl"] LWL = spc_metrics["lwl"] # --- Western Electric 规则检测 --- we_violations = [] # 规则1: 单点超出 3σ 控制限 for i, row in daily_all.iterrows(): if row["defect_rate"] > UCL or row["defect_rate"] < LCL: we_violations.append({ "日期": row["day"].strftime("%Y-%m-%d"), "规则": "Rule 1: 超出3σ控制限", "值": f"{row['defect_rate']:.2%}" }) # 规则2: 连续7点上升或下降 rates = daily_all["defect_rate"].values if len(rates) >= 7: for i in range(len(rates) - 6): window = rates[i:i+7] if all(window[j] < window[j+1] for j in range(6)): we_violations.append({ "日期": daily_all.loc[i+6, "day"].strftime("%Y-%m-%d"), "规则": "Rule 2: 连续7点上升", "值": f"{rates[i]:.2%} → {rates[i+6]:.2%}" }) elif all(window[j] > window[j+1] for j in range(6)): we_violations.append({ "日期": daily_all.loc[i+6, "day"].strftime("%Y-%m-%d"), "规则": "Rule 2: 连续7点下降", "值": f"{rates[i]:.2%} → {rates[i+6]:.2%}" }) # 规则3: 连续7点在中心线同一侧 for i in range(len(rates) - 6): window = rates[i:i+7] if all(v > p_bar for v in window): we_violations.append({ "日期": daily_all.loc[i+6, "day"].strftime("%Y-%m-%d"), "规则": "Rule 3: 连续7点在CL上方", "值": f"持续偏高" }) elif all(v < p_bar for v in window): we_violations.append({ "日期": daily_all.loc[i+6, "day"].strftime("%Y-%m-%d"), "规则": "Rule 3: 连续7点在CL下方", "值": f"持续偏低" }) # --- 趋势分析 --- from numpy.polynomial import polynomial as P x = np.arange(len(daily_all)) coeffs = np.polyfit(x, rates, 1) slope = coeffs[0] daily_all["trend"] = np.polyval(coeffs, x) if abs(slope) < sigma_p * 0.1: trend_status = "稳定" trend_icon = "➡️" trend_color = "normal" elif slope > 0: trend_status = "恶化中" trend_icon = "📈" trend_color = "inverse" else: trend_status = "改善中" trend_icon = "📉" trend_color = "normal" # --- KPI 行 --- kpi_spc1, kpi_spc2, kpi_spc3, kpi_spc4 = st.columns(4) kpi_spc1.metric("平均缺陷率", f"{p_bar:.2%}") kpi_spc2.metric("控制限 (UCL/LCL)", f"{UCL:.2%} / {LCL:.2%}") kpi_spc3.metric("趋势判断", f"{trend_icon} {trend_status}", delta=f"斜率: {slope*100:.3f}%/天", delta_color=trend_color) kpi_spc4.metric("Western Electric 告警", f"{len(we_violations)} 次", delta="需关注" if len(we_violations) > 0 else "正常") # --- 控制图 --- st.divider() st.subheader("X-bar 控制图 (每日缺陷率)") fig_spc, ax_spc = plt.subplots(figsize=(14, 5)) # 数据点 ax_spc.plot(daily_all["day"], daily_all["defect_rate"], marker="o", markersize=4, linewidth=1.5, color="steelblue", label="日缺陷率") ax_spc.fill_between(daily_all["day"], daily_all["defect_rate"], alpha=0.15, color="steelblue") # 控制限线 ax_spc.axhline(y=p_bar, color="green", linestyle="-", linewidth=1.5, label=f"CL (中心线): {p_bar:.2%}") ax_spc.axhline(y=UCL, color="red", linestyle="--", linewidth=1, label=f"UCL: {UCL:.2%}") ax_spc.axhline(y=LCL, color="red", linestyle="--", linewidth=1, label=f"LCL: {LCL:.2%}") ax_spc.axhline(y=UWL, color="orange", linestyle=":", linewidth=1, alpha=0.6, label=f"UWL (2σ): {UWL:.2%}") ax_spc.axhline(y=LWL, color="orange", linestyle=":", linewidth=1, alpha=0.6, label=f"LWL (2σ): {LWL:.2%}") # 标注异常点 for v in we_violations: if "Rule 1" in v["规则"]: anomaly_date = pd.Timestamp(v["日期"]) val = float(v["值"].rstrip("%")) / 100 ax_spc.annotate("⚠️", (anomaly_date, val), fontsize=12, ha="center", va="bottom", color="red") ax_spc.set_title("SPC 控制图 - 每日缺陷率") ax_spc.set_ylabel("缺陷率") ax_spc.tick_params(axis="x", rotation=45) ax_spc.legend(fontsize=8, loc="upper right") ax_spc.grid(True, alpha=0.3) st.pyplot(fig_spc) plt.close() # --- 趋势图 --- st.subheader("缺陷率趋势 (含线性回归)") fig_trend, ax_trend = plt.subplots(figsize=(14, 4)) ax_trend.plot(daily_all["day"], daily_all["defect_rate"], marker="o", markersize=3, linewidth=1.5, color="steelblue", label="日缺陷率") ax_trend.plot(daily_all["day"], daily_all["trend"], color="red", linestyle="--", linewidth=2, label=f"趋势线 (斜率: {slope*100:.3f}%/天)") ax_trend.fill_between(daily_all["day"], daily_all["defect_rate"], alpha=0.1, color="steelblue") ax_trend.axhline(y=p_bar, color="green", linestyle="--", alpha=0.5, label=f"平均: {p_bar:.2%}") ax_trend.set_ylabel("缺陷率") ax_trend.tick_params(axis="x", rotation=45) ax_trend.legend(fontsize=8) ax_trend.grid(True, alpha=0.3) st.pyplot(fig_trend) plt.close() # --- 告警清单 --- st.divider() st.subheader("⚠️ Western Electric 规则告警清单") if we_violations: we_df = pd.DataFrame(we_violations) st.dataframe(we_df, use_container_width=True) st.warning(f"共发现 **{len(we_violations)}** 次统计异常,建议关注对应日期的工艺参数和人员排班") else: st.success("✅ 未触发 Western Electric 规则告警,过程处于统计控制状态") # --- 结论 --- st.divider() st.subheader("📋 过程能力结论") if trend_status == "改善中": st.success( f"**趋势改善中** 📉\n\n" f"每日缺陷率以平均 {abs(slope)*100:.3f}%/天 的速度下降。\n" f"当前平均缺陷率为 {p_bar:.2%},控制上限 {UCL:.2%}。\n" f"{'已触发' if we_violations else '未触发'} Western Electric 规则告警。" ) elif trend_status == "恶化中": st.error( f"**趋势恶化中** 📈\n\n" f"每日缺陷率以平均 {slope*100:.3f}%/天 的速度上升。\n" f"当前平均缺陷率为 {p_bar:.2%},控制上限 {UCL:.2%}。\n" f"{'已触发' if we_violations else '未触发'} Western Electric 规则告警。\n\n" f"建议:检查近期工艺参数变化、设备状态和原材料批次。" ) else: st.info( f"**过程稳定** ➡️\n\n" f"缺陷率趋势平稳,斜率 {slope*100:.3f}%/天,无显著上升或下降。\n" f"当前平均缺陷率为 {p_bar:.2%},控制限 [{LCL:.2%}, {UCL:.2%}]。\n" f"{'已触发' if we_violations else '未触发'} Western Electric 规则告警。" ) # ========== 重复缺陷坐标检测 ========== _t = get_tab("🗺️ 空间集中性") if _t: with _t: st.divider() st.subheader("🎯 重复缺陷坐标检测") st.markdown( "检测在不同面板上重复出现的缺陷坐标。随机缺陷不会在同一位置反复出现," "而设备硬伤(如吸嘴划伤、夹具压痕)会在相同位置持续产生缺陷。" "这是从'描述分析'跨入'根因诊断'的关键一步。" ) # 坐标分桶:将面板划分为网格,找出跨面板重复的缺陷桶 repeat_bin_size = st.slider("坐标分桶大小 (mm)", min_value=5, max_value=50, value=15, step=5, help="将坐标按此大小分桶,同一桶内出现于不同面板的缺陷视为'重复'") pw = df["panel_width_mm"].iloc[0] ph = df["panel_height_mm"].iloc[0] # 计算桶ID df_copy = filtered_df.copy() df_copy["x_bin"] = (df_copy["x_mm"] // repeat_bin_size).astype(int) df_copy["y_bin"] = (df_copy["y_mm"] // repeat_bin_size).astype(int) df_copy["bin_key"] = df_copy["x_bin"].astype(str) + "_" + df_copy["y_bin"].astype(str) # 统计每个桶出现在多少不同面板上 bin_panels = df_copy.groupby("bin_key").agg( panel_count=("panel_id", "nunique"), defect_count=("defect_id", "count"), x_center=("x_mm", "mean"), y_center=("y_mm", "mean"), dominant_type=("defect_type", lambda x: x.mode().iloc[0] if len(x) > 0 else "-"), dominant_severity=("severity", lambda x: x.mode().iloc[0] if len(x) > 0 else "-"), ).reset_index() repeat_threshold = st.slider("重复判定阈值 (跨面板数)", min_value=2, max_value=10, value=3) repeated_bins = bin_panels[bin_panels["panel_count"] >= repeat_threshold].sort_values("panel_count", ascending=False) col_repeat1, col_repeat2 = st.columns([1, 2]) with col_repeat1: st.metric("重复缺陷桶数", f"{len(repeated_bins)}", delta=f"阈值: ≥{repeat_threshold} 块面板") if len(repeated_bins) > 0: st.dataframe( repeated_bins[["panel_count", "defect_count", "x_center", "y_center", "dominant_type", "dominant_severity"]] .rename(columns={"panel_count": "涉及面板", "defect_count": "缺陷总数", "x_center": "中心X", "y_center": "中心Y", "dominant_type": "主要类型", "dominant_severity": "主要严重度"}), use_container_width=True, height=400 ) else: st.info(f"未发现跨 {repeat_threshold}+ 块面板的重复缺陷坐标") with col_repeat2: if len(repeated_bins) > 0: # 在面板图上标注重复缺陷桶 fig_repeat, ax_repeat = plt.subplots(figsize=(4, 6)) # 面板背景 ax_repeat.add_patch(plt.Rectangle((0, 0), pw, ph, facecolor="#1a1a2e", edgecolor="#444", linewidth=2)) ax_repeat.add_patch(plt.Rectangle((8, 8), pw-16, ph-16, facecolor="#16213e", edgecolor="#0f3460", linewidth=1.5)) # 所有缺陷散点(淡) ax_repeat.scatter(filtered_df["x_mm"], filtered_df["y_mm"], alpha=0.1, s=2, c="gray", edgecolors="none", zorder=1) # 重复缺陷桶标注重叠圈 max_count = repeated_bins["panel_count"].max() for _, row in repeated_bins.iterrows(): size = 100 + (row["panel_count"] / max_count) * 400 ax_repeat.scatter(row["x_center"], row["y_center"], s=size, c="red", alpha=0.3, edgecolors="red", linewidth=2, zorder=3) ax_repeat.text(row["x_center"], row["y_center"], str(row["panel_count"]), ha="center", va="center", fontsize=8, color="white", fontweight="bold", zorder=4) ax_repeat.set_xlim(-5, pw + 5) ax_repeat.set_ylim(-5, ph + 5) ax_repeat.set_title(f"重复缺陷坐标 (≥{repeat_threshold} 块面板)", fontsize=11) ax_repeat.set_xlabel("X (mm)") ax_repeat.set_ylabel("Y (mm)") ax_repeat.set_aspect("equal") ax_repeat.grid(True, alpha=0.1, color="gray") st.pyplot(fig_repeat) plt.close() else: st.info("调整分桶大小或阈值以检测重复缺陷") # ========== Tab 9: 缺陷模式识别 ========== _t = get_tab("🔬 缺陷模式识别") if _t: with _t: st.header("🔬 缺陷空间模式自动识别") st.markdown( "参考 WM811K 晶圆缺陷图谱分类标准,对每块面板的缺陷分布进行模式评分。" "不同模式对应不同的根因机制(如边缘型→贴合工艺,角落型→夹具应力," "中心型→压力不均,线条型→机械刮伤,随机型→来料污染)。" ) from scipy.spatial import ConvexHull from scipy.spatial.distance import cdist pw = df["panel_width_mm"].iloc[0] ph = df["panel_height_mm"].iloc[0] # 按面板分组,逐块分析模式 panel_groups = filtered_df.groupby("panel_id") patterns_results = [] for panel_id, panel_data in panel_groups: if len(panel_data) < 3: continue coords = panel_data[["x_mm", "y_mm"]].values # 归一化坐标到 [0,1] x_norm = panel_data["x_mm"].values / pw y_norm = panel_data["y_mm"].values / ph # --- 模式1: 边缘型 (缺陷靠近面板四边) --- # 计算每个点到最近边缘的距离比例 edge_dist = np.minimum(np.minimum(x_norm, 1 - x_norm), np.minimum(y_norm, 1 - y_norm)) edge_ratio = (edge_dist < 0.12).mean() # 12% 以内的点视为边缘点 edge_score = edge_ratio # --- 模式2: 角落型 (缺陷集中在四个角落) --- corner_threshold = 0.15 # 15% 范围 in_corner = ( ((x_norm < corner_threshold) & (y_norm < corner_threshold)) | # 左下 ((x_norm < corner_threshold) & (y_norm > 1 - corner_threshold)) | # 左上 ((x_norm > 1 - corner_threshold) & (y_norm < corner_threshold)) | # 右下 ((x_norm > 1 - corner_threshold) & (y_norm > 1 - corner_threshold)) # 右上 ) corner_score = in_corner.mean() # --- 模式3: 中心型 (缺陷集中在面板中心区域) --- center_x, center_y = 0.5, 0.5 dist_to_center = np.sqrt((x_norm - center_x)**2 + (y_norm - center_y)**2) center_radius = 0.18 # 18% 半径 center_score = (dist_to_center < center_radius).mean() # --- 模式4: 线条型 (缺陷沿一条线分布) --- # 用 PCA 第一主成分占比来判断线性程度 if len(coords) >= 3: from sklearn.decomposition import PCA pca = PCA(n_components=2) pca.fit(coords) linearity = pca.explained_variance_ratio_[0] # 第一主成分占比 line_score = linearity else: line_score = 0 # --- 模式5: 随机型 (均匀分布,无明显模式) --- # 用空间变异系数:将面板分为网格,计算各格缺陷数的变异系数 grid_n = 5 x_edges = np.linspace(0, pw, grid_n + 1) y_edges = np.linspace(0, ph, grid_n + 1) H, _, _ = np.histogram2d(panel_data["x_mm"].values, panel_data["y_mm"].values, bins=[x_edges, y_edges]) if H.sum() > 0 and H.std() > 0: cv = H.std() / H.mean() if H.mean() > 0 else 999 # cv 越小越均匀(随机) randomness_score = max(0, 1 - cv / 3) # 归一化到 [0,1] else: randomness_score = 0 # --- 主导模式判定 --- scores = { "边缘型": edge_score, "角落型": corner_score, "中心型": center_score, "线条型": line_score, "随机型": randomness_score, } dominant_pattern = max(scores, key=scores.get) patterns_results.append({ "面板ID": panel_id, "缺陷数": len(panel_data), "主导模式": dominant_pattern, "边缘型": round(edge_score, 2), "角落型": round(corner_score, 2), "中心型": round(center_score, 2), "线条型": round(line_score, 2), "随机型": round(randomness_score, 2), }) if patterns_results: pattern_df = pd.DataFrame(patterns_results) # --- 模式统计 --- col_pat1, col_pat2, col_pat3 = st.columns([1, 1, 2]) with col_pat1: pattern_counts = pattern_df["主导模式"].value_counts() fig_pat, ax_pat = plt.subplots(figsize=(8, 5)) colors_pat = {"边缘型": "#FF6B6B", "角落型": "#FFA500", "中心型": "#4ECDC4", "线条型": "#9B59B6", "随机型": "#95A5A6"} bars = ax_pat.bar(pattern_counts.index, pattern_counts.values, color=[colors_pat.get(p, "#888") for p in pattern_counts.index], alpha=0.8) for bar, count in zip(bars, pattern_counts.values): ax_pat.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5, str(count), ha="center", va="bottom", fontsize=11, fontweight="bold") ax_pat.set_title("缺陷模式分布") ax_pat.set_ylabel("面板数量") st.pyplot(fig_pat) plt.close() with col_pat2: st.subheader("模式占比") total_panels = len(pattern_df) for pattern in ["边缘型", "角落型", "中心型", "线条型", "随机型"]: count = (pattern_df["主导模式"] == pattern).sum() pct = count / total_panels * 100 st.metric(pattern, f"{count} 块", f"{pct:.1f}%") with col_pat3: # --- 模式-根因映射 --- st.subheader("模式 → 可能根因") root_cause_map = { "边缘型": { "可能原因": "贴合工艺参数异常、边缘夹具压力不均、涂胶厚度不均", "建议排查": "检查贴合压力、边缘密封工艺、涂胶均匀性" }, "角落型": { "可能原因": "夹具应力集中、面板放置定位偏差、角落散热不良", "建议排查": "检查夹具对齐、面板定位精度、角落温度分布" }, "中心型": { "可能原因": "压力中心不均、FPC绑定区域工艺异常、中心温度过高", "建议排查": "检查压力分布曲线、FPC绑定参数、加热板温度" }, "线条型": { "可能原因": "机械刮伤、传送带划痕、清洗刷毛磨损、吸嘴移动轨迹", "建议排查": "检查传送带状态、清洗设备、吸嘴运动轨迹" }, "随机型": { "可能原因": "来料污染、环境尘埃、化学药液杂质", "建议排查": "检查洁净室等级、来料检验记录、药液过滤状态" }, } for pattern in ["边缘型", "角落型", "中心型", "线条型", "随机型"]: count = (pattern_df["主导模式"] == pattern).sum() if count == 0: continue rc = root_cause_map[pattern] with st.expander(f"{pattern} ({count} 块面板)"): st.markdown(f"**可能原因**: {rc['可能原因']}") st.markdown(f"**建议排查**: {rc['建议排查']}") # --- 详细数据表 --- st.divider() st.subheader("面板模式评分明细") st.dataframe(pattern_df, use_container_width=True, height=400) else: st.warning("当前筛选条件下无足够面板数据进行模式分析(需至少 3 个缺陷/面板)") # ========== Tab 10: 设备健康与共性分析 ========== _t = get_tab("💚 设备健康与共性分析") if _t: with _t: st.header("💚 设备健康评分 & 共性分析") st.markdown( "综合评估各台设备的健康状态,并在发现异常批次时自动分析其共性特征。" ) # --- 设备健康评分 --- st.subheader("设备健康评分 (0-100)") st.markdown("评分维度:缺陷率(40%) + 座号集中度(30%) + 严重度分布(30%)") health_data = [] for eq_id in sorted(df["equipment_id"].unique()): eq_all = df[df["equipment_id"] == eq_id] eq_filtered = filtered_df[filtered_df["equipment_id"] == eq_id] # 维度1: 缺陷率评分 (40%) eq_panels = eq_all["panel_id"].nunique() eq_defects = len(eq_all) eq_defect_rate = eq_defects / max(eq_panels, 1) # 缺陷率越低分越高,线性归一化 # 以 5 个缺陷/面板为最差(0分),0 为最好(100分) rate_score = max(0, 100 * (1 - eq_defect_rate / 5)) # 维度2: 座号集中度评分 (30%) # 座号分布越均匀分越高,集中分越低 eq_seat_counts = eq_all.groupby("seat_id").size() if len(eq_seat_counts) > 1: seat_cv = eq_seat_counts.std() / max(eq_seat_counts.mean(), 0.001) # cv 越小越均匀,得分越高 seat_score = max(0, 100 * (1 - seat_cv / 3)) else: seat_score = 50 # 维度3: 严重度评分 (30%) eq_sev = eq_all["severity"].value_counts() severe_ratio = eq_sev.get("严重", 0) / max(len(eq_all), 1) sev_score = max(0, 100 * (1 - severe_ratio * 3)) # 严重占比 33% 时为 0 分 # 综合得分 total_score = rate_score * 0.4 + seat_score * 0.3 + sev_score * 0.3 health_data.append({ "设备ID": eq_id, "缺陷总数": eq_defects, "缺陷率": f"{eq_defect_rate:.2f}", "座号集中度(CV)": f"{seat_cv:.2f}" if len(eq_seat_counts) > 1 else "N/A", "严重占比": f"{severe_ratio:.1%}", "缺陷率分(40%)": round(rate_score, 1), "座号分(30%)": round(seat_score, 1), "严重度分(30%)": round(sev_score, 1), "健康总分": round(total_score, 1), }) health_df = pd.DataFrame(health_data).sort_values("健康总分", ascending=False) # 显示健康评分 col_h1, col_h2 = st.columns([3, 2]) with col_h1: st.dataframe(health_df, use_container_width=True, hide_index=True) with col_h2: # 可视化排名 fig_health, ax_health = plt.subplots(figsize=(6, 4)) health_sorted = health_df.sort_values("健康总分", ascending=True) colors_health = ["#4CAF50" if s >= 70 else "#FF9800" if s >= 40 else "#F44336" for s in health_sorted["健康总分"]] bars = ax_health.barh(health_sorted["设备ID"], health_sorted["健康总分"], color=colors_health, alpha=0.8, height=0.5) for bar, score in zip(bars, health_sorted["健康总分"]): ax_health.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2, f"{score:.0f}", ha="left", va="center", fontsize=12, fontweight="bold") ax_health.set_xlabel("健康评分 (0-100)") ax_health.set_title("设备健康排名") ax_health.set_xlim(0, 110) st.pyplot(fig_health) plt.close() # --- 共性分析 --- st.divider() st.subheader("🔍 异常批次共性分析") st.markdown("选中异常批次后,自动分析这些批次的共同特征(设备/时段/座号/缺陷类型)。") # 自动检测异常批次(基于缺陷率) batch_stats = df.groupby("batch_id").agg( defects=("defect_id", "count"), panels=("panel_id", "nunique") ) batch_stats["defect_rate"] = batch_stats["defects"] / batch_stats["panels"] threshold = batch_stats["defect_rate"].mean() + batch_stats["defect_rate"].std() abnormal_batches = batch_stats[batch_stats["defect_rate"] > threshold].index.tolist() st.info(f"自动检测到的异常批次 (缺陷率 > {threshold:.2%}): **{len(abnormal_batches)}** 个") st.write(", ".join(abnormal_batches[:10])) if abnormal_batches: col_c1, col_c2 = st.columns(2) with col_c1: # 选择要分析的批次 selected_abnormal = st.multiselect( "选择要分析的异常批次", options=abnormal_batches, default=abnormal_batches[:3] if len(abnormal_batches) >= 3 else abnormal_batches, key="commonality_batch" ) if selected_abnormal: abnormal_df = df[df["batch_id"].isin(selected_abnormal)] normal_df = df[~df["batch_id"].isin(selected_abnormal)] st.divider() st.markdown(f"**分析对象**: {len(selected_abnormal)} 个异常批次, " f"{len(abnormal_df)} 条缺陷记录") # 共性分析:设备 st.subheader("共性特征 TOP3") col_common1, col_common2, col_common3 = st.columns(3) with col_common1: # 设备共性 abnormal_eq_rate = abnormal_df.groupby("equipment_id").size() / len(abnormal_df) normal_eq_rate = normal_df.groupby("equipment_id").size() / len(normal_df) eq_boost = {} for eq in abnormal_df["equipment_id"].unique(): a_rate = abnormal_eq_rate.get(eq, 0) n_rate = normal_eq_rate.get(eq, 0) if n_rate > 0: eq_boost[eq] = (a_rate - n_rate) / n_rate * 100 else: eq_boost[eq] = 999 eq_top = sorted(eq_boost.items(), key=lambda x: x[1], reverse=True)[:3] st.markdown("**设备共用性**") for eq, boost in eq_top: st.markdown(f"- {eq}: 异常占比 {abnormal_eq_rate.get(eq, 0):.1%}, " f"相对正常 **+{boost:.0f}%**") with col_common2: # 时段共性 abnormal_hour = abnormal_df.groupby("hour").size() / len(abnormal_df) normal_hour = normal_df.groupby("hour").size() / len(normal_df) # 按班次聚合 abnormal_shift = abnormal_df.groupby("shift").size() / len(abnormal_df) normal_shift = normal_df.groupby("shift").size() / len(normal_df) st.markdown("**时段共性**") for shift in ["白班", "夜班"]: a_rate = abnormal_shift.get(shift, 0) n_rate = normal_shift.get(shift, 0) if n_rate > 0: boost = (a_rate - n_rate) / n_rate * 100 else: boost = 999 st.markdown(f"- {shift}: 异常占比 {a_rate:.1%}, " f"相对正常 **{'+' if boost > 0 else ''}{boost:.0f}%**") with col_common3: # 座号共性 abnormal_seat = abnormal_df.groupby("seat_id").size() / len(abnormal_df) normal_seat = normal_df.groupby("seat_id").size() / len(normal_df) seat_boost = {} for seat in abnormal_df["seat_id"].unique(): a_rate = abnormal_seat.get(seat, 0) n_rate = normal_seat.get(seat, 0) if n_rate > 0: seat_boost[seat] = (a_rate - n_rate) / n_rate * 100 else: seat_boost[seat] = 999 seat_top = sorted(seat_boost.items(), key=lambda x: x[1], reverse=True)[:3] st.markdown("**座号共性**") for seat, boost in seat_top: st.markdown(f"- {seat}: 异常占比 {abnormal_seat.get(seat, 0):.1%}, " f"相对正常 **+{boost:.0f}%**") # --- 缺陷类型偏差 --- st.subheader("异常批次缺陷类型偏差") abnormal_type = abnormal_df.groupby("defect_type").size() / len(abnormal_df) normal_type = normal_df.groupby("defect_type").size() / len(normal_df) type_diff = [] for t in set(list(abnormal_type.index) + list(normal_type.index)): a_rate = abnormal_type.get(t, 0) n_rate = normal_type.get(t, 0) type_diff.append({ "缺陷类型": t, "异常占比": f"{a_rate:.1%}", "正常占比": f"{n_rate:.1%}", "偏差": f"{'+' if a_rate > n_rate else ''}{(a_rate - n_rate) / max(n_rate, 0.001) * 100:.0f}%", }) st.dataframe(pd.DataFrame(type_diff).sort_values("偏差", key=lambda x: x.str.rstrip("%").astype(float), ascending=False), use_container_width=True, hide_index=True) # ========== Tab 11: 多层叠加分析 ========== _t = get_tab("🔲 多层叠加分析") if _t: with _t: st.header("🔲 多层叠加分析") st.markdown( "将缺陷数据与面板物理区域、设备座号、时间维度叠加在同一视图上," "揭示单一维度看不到的深层关联。" ) pw = df["panel_width_mm"].iloc[0] ph = df["panel_height_mm"].iloc[0] # --- 自定义区域定义 --- st.subheader("📐 自定义区域缺陷统计") st.markdown("将面板划分为不同功能区域,统计各区域缺陷分布") # 定义区域:(名称, 判定函数) # 边缘区:距四边 < 15% # 中心区:距中心 < 20% 半径 # 角落区:四个角的 15% 范围 # FPC区:Y > 70% 高度 # 上半区/下半区 def classify_zone(x_norm, y_norm): """将每个缺陷点分类到区域""" zones = [] for i in range(len(x_norm)): zx, zy = x_norm[i], y_norm[i] zone_list = [] # 边缘区 if min(zx, 1 - zx, zy, 1 - zy) < 0.15: zone_list.append("边缘区") # 中心区 if np.sqrt((zx - 0.5)**2 + (zy - 0.5)**2) < 0.20: zone_list.append("中心区") # 角落区 if (zx < 0.15 or zx > 0.85) and (zy < 0.15 or zy > 0.85): zone_list.append("角落区") # FPC区 if zy > 0.70: zone_list.append("FPC区") # 上半区 if zy < 0.50: zone_list.append("上半区") # 下半区 if zy > 0.50: zone_list.append("下半区") if not zone_list: zone_list.append("其他区域") zones.append(", ".join(zone_list)) return zones # 计算每个缺陷的区域归属 x_norm_arr = filtered_df["x_mm"].values / pw y_norm_arr = filtered_df["y_mm"].values / ph filtered_df_copy = filtered_df.copy() filtered_df_copy["zone"] = classify_zone(x_norm_arr, y_norm_arr) # 统计各区域缺陷数 zone_counts = {} zone_types = ["边缘区", "中心区", "角落区", "FPC区", "上半区", "下半区", "其他区域"] for z in zone_types: count = filtered_df_copy["zone"].str.contains(z).sum() zone_counts[z] = count col_z1, col_z2 = st.columns([1, 2]) with col_z1: st.subheader("区域缺陷统计") for z in zone_types: count = zone_counts.get(z, 0) pct = count / max(len(filtered_df_copy), 1) * 100 bar_len = int(pct / 100 * 200) bar = "█" * max(bar_len, 0) st.markdown(f"{z} | {bar} **{count}** ({pct:.1f}%)") with col_z2: # 区域可视化 fig_zone, ax_zone = plt.subplots(figsize=(4, 6)) # 面板背景 ax_zone.add_patch(plt.Rectangle((0, 0), pw, ph, facecolor="#1a1a2e", edgecolor="#444", linewidth=2)) # 区域边界 # 边缘区 (15% 边界) margin_x = pw * 0.15 margin_y = ph * 0.15 ax_zone.add_patch(plt.Rectangle((0, 0), margin_x, ph, fill=False, edgecolor="yellow", linewidth=1, alpha=0.4, linestyle="--")) ax_zone.add_patch(plt.Rectangle((pw - margin_x, 0), margin_x, ph, fill=False, edgecolor="yellow", linewidth=1, alpha=0.4, linestyle="--")) ax_zone.add_patch(plt.Rectangle((0, 0), pw, margin_y, fill=False, edgecolor="yellow", linewidth=1, alpha=0.4, linestyle="--")) ax_zone.add_patch(plt.Rectangle((0, ph - margin_y), pw, margin_y, fill=False, edgecolor="yellow", linewidth=1, alpha=0.4, linestyle="--")) # 中心区 (20% 半径) center_r = 0.20 * max(pw, ph) / 2 circle = plt.Circle((pw/2, ph/2), center_r, fill=False, edgecolor="cyan", linewidth=1.5, alpha=0.5, linestyle="--") ax_zone.add_patch(circle) # FPC区 fpc_y = ph * 0.70 ax_zone.add_patch(plt.Rectangle((0, fpc_y), pw, ph - fpc_y, fill=False, edgecolor="magenta", linewidth=1.5, alpha=0.5, linestyle="--")) # 缺陷散点 scatter_colors = {"边缘区": "yellow", "中心区": "cyan", "角落区": "orange", "FPC区": "magenta", "上半区": "#4ECDC4", "下半区": "#45B7D1", "其他区域": "gray"} for z_name in zone_types: z_mask = filtered_df_copy["zone"].str.contains(z_name) if z_mask.sum() > 0: z_data = filtered_df_copy[z_mask] ax_zone.scatter(z_data["x_mm"], z_data["y_mm"], c=scatter_colors.get(z_name, "gray"), s=5, alpha=0.3, label=f"{z_name} ({z_mask.sum()})", edgecolors="none", zorder=2) ax_zone.set_xlim(-5, pw + 5) ax_zone.set_ylim(-5, ph + 5) ax_zone.set_title("缺陷区域叠加图 (虚线=区域边界)") ax_zone.set_xlabel("X (mm)") ax_zone.set_ylabel("Y (mm)") ax_zone.set_aspect("equal") ax_zone.legend(fontsize=7, loc="upper right", ncol=1, framealpha=0.7) st.pyplot(fig_zone) plt.close() # --- 跨批次同座号面板对比 --- st.divider() st.subheader("🔀 跨批次同座号面板对比") st.markdown( "选择一台设备和一个座号,查看该座号在不同批次生产的面板上缺陷分布的对比。" "如果同一座号持续在相同位置产生缺陷 → 该座号存在系统性问题。" ) col_cmp1, col_cmp2, col_cmp3 = st.columns(3) with col_cmp1: cmp_eq = st.selectbox("选择设备", options=sorted(df["equipment_id"].unique()), key="cmp_eq") with col_cmp2: eq_seats = sorted(df[(df["equipment_id"] == cmp_eq)]["seat_id"].unique()) cmp_seat = st.selectbox("选择座号", options=eq_seats, key="cmp_seat") with col_cmp3: # 找出有该设备座号缺陷的批次 eq_seat_batches = sorted(df[(df["equipment_id"] == cmp_eq) & (df["seat_id"] == cmp_seat)]["batch_id"].unique()) cmp_batches = st.multiselect("选择对比批次", options=eq_seat_batches, default=eq_seat_batches[:3] if len(eq_seat_batches) >= 3 else eq_seat_batches) if cmp_batches and len(cmp_batches) >= 2: n_cols = min(len(cmp_batches), 3) n_rows = (len(cmp_batches) + n_cols - 1) // n_cols fig_cmp, axes_cmp = plt.subplots(n_rows, n_cols, figsize=(3.5 * n_cols, 5 * n_rows)) axes_cmp = axes_cmp.flatten() if n_cols * n_rows > 1 else [axes_cmp] for i, batch in enumerate(cmp_batches): ax = axes_cmp[i] batch_data = df[(df["equipment_id"] == cmp_eq) & (df["seat_id"] == cmp_seat) & (df["batch_id"] == batch)] # 面板背景 ax.add_patch(plt.Rectangle((0, 0), pw, ph, facecolor="#1a1a2e", edgecolor="#444", linewidth=1)) if len(batch_data) > 0: # 按缺陷类型着色 type_colors = {"划痕": "red", "亮点": "yellow", "暗点": "black", "气泡": "cyan", "色差": "magenta", "漏光": "orange", "裂纹": "darkred", "异物": "green"} for _, row in batch_data.iterrows(): c = type_colors.get(row["defect_type"], "white") ax.scatter(row["x_mm"], row["y_mm"], c=c, s=30, alpha=0.7, edgecolors="white", linewidth=0.3, zorder=3) ax.set_xlim(-3, pw + 3) ax.set_ylim(-3, ph + 3) ax.set_title(f"{batch}\n{len(batch_data)} 缺陷", fontsize=9) ax.set_aspect("equal") ax.grid(True, alpha=0.1, color="gray") ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False) # 隐藏多余子图 for j in range(len(cmp_batches), len(axes_cmp)): axes_cmp[j].set_visible(False) fig_cmp.suptitle(f"{cmp_eq} / {cmp_seat} 跨批次对比", fontsize=12, y=1.01) plt.tight_layout() st.pyplot(fig_cmp) plt.close() # 对比统计 st.subheader("对比统计") comp_stats = [] for batch in cmp_batches: batch_data = df[(df["equipment_id"] == cmp_eq) & (df["seat_id"] == cmp_seat) & (df["batch_id"] == batch)] comp_stats.append({ "批次": batch, "缺陷数": len(batch_data), "主要类型": batch_data["defect_type"].mode().iloc[0] if len(batch_data) > 0 else "-", "严重占比": f"{(batch_data['severity']=='严重').sum() / max(len(batch_data), 1):.0%}", "中心X": round(batch_data["x_mm"].mean(), 1) if len(batch_data) > 0 else "-", "中心Y": round(batch_data["y_mm"].mean(), 1) if len(batch_data) > 0 else "-", }) st.dataframe(pd.DataFrame(comp_stats), use_container_width=True, hide_index=True) # 趋势判断 if len(cmp_batches) >= 3: defect_counts = [len(df[(df["equipment_id"] == cmp_eq) & (df["seat_id"] == cmp_seat) & (df["batch_id"] == b)]) for b in cmp_batches] x_trend = np.arange(len(cmp_batches)) coeffs = np.polyfit(x_trend, defect_counts, 1) slope = coeffs[0] if slope > 0.5: st.warning(f"⚠️ **{cmp_eq}/{cmp_seat}** 缺陷数呈**上升趋势** (斜率: {slope:.1f}/批次),建议安排设备检修") elif slope < -0.5: st.success(f"✅ **{cmp_eq}/{cmp_seat}** 缺陷数呈**改善趋势** (斜率: {slope:.1f}/批次)") else: st.info(f"➡️ **{cmp_eq}/{cmp_seat}** 缺陷数**平稳** (斜率: {slope:.1f}/批次)") else: st.info("请选择至少 2 个批次进行对比") # --- 缺陷传播追踪 --- st.divider() st.subheader("📡 缺陷坐标传播追踪") st.markdown( "追踪同一坐标区域在时间轴上的缺陷演变,识别持续恶化的位置。" "如果某坐标的缺陷数量随时间递增 → 该位置存在渐进性损伤(如吸嘴持续磨损)。" ) # 坐标分桶 + 时间维度 prop_bin = st.slider("传播追踪分桶大小 (mm)", min_value=10, max_value=50, value=20, step=10) df_time = df.copy() df_time["x_bin"] = (df_time["x_mm"] // prop_bin).astype(int) df_time["y_bin"] = (df_time["y_mm"] // prop_bin).astype(int) # 按桶 + 日期聚合 prop_df = df_time.groupby(["x_bin", "y_bin", "day"]).size().reset_index(name="defect_count") # 找出至少有 3 天数据的桶 bucket_days = prop_df.groupby(["x_bin", "y_bin"])["day"].nunique() active_buckets = bucket_days[bucket_days >= 3].index.tolist() if active_buckets: # 选择要追踪的桶 bucket_options = [f"({bx},{by})" for bx, by in active_buckets] bucket_counts = prop_df.groupby(["x_bin", "y_bin"])["defect_count"].sum().sort_values(ascending=False) # 默认选缺陷最多的桶 default_top = bucket_counts.index[0] selected_bucket = st.selectbox( "选择要追踪的坐标桶", options=bucket_options, index=0, format_func=lambda x: f"{x} (总缺陷: {bucket_counts.loc[tuple(map(int, x.strip('()').split(',')))]:.0f})" ) bx, by = map(int, selected_bucket.strip("()").split(",")) bucket_timeline = prop_df[(prop_df["x_bin"] == bx) & (prop_df["y_bin"] == by)].sort_values("day") bucket_timeline["day"] = pd.to_datetime(bucket_timeline["day"]) # 传播趋势图 fig_prop, ax_prop = plt.subplots(figsize=(12, 4)) ax_prop.bar(bucket_timeline["day"], bucket_timeline["defect_count"], color="steelblue", alpha=0.7, width=0.8) # 趋势线 if len(bucket_timeline) >= 2: x_t = np.arange(len(bucket_timeline)) coeffs_p = np.polyfit(x_t, bucket_timeline["defect_count"].values, 1) slope_p = coeffs_p[0] trend_y = np.polyval(coeffs_p, x_t) ax_prop.plot(bucket_timeline["day"], trend_y, color="red", linestyle="--", linewidth=2, label=f"趋势 (斜率: {slope_p:.2f}/天)") if slope_p > 0.3: ax_prop.set_title(f"坐标桶 ({bx},{by}) — 缺陷数上升 (恶化趋势)") elif slope_p < -0.3: ax_prop.set_title(f"坐标桶 ({bx},{by}) — 缺陷数下降 (改善趋势)") else: ax_prop.set_title(f"坐标桶 ({bx},{by}) — 缺陷数平稳") else: ax_prop.set_title(f"坐标桶 ({bx},{by})") ax_prop.set_ylabel("缺陷数量") ax_prop.tick_params(axis="x", rotation=45) ax_prop.legend() ax_prop.grid(True, alpha=0.3, axis="y") st.pyplot(fig_prop) plt.close() # 该桶的缺陷类型演变 bucket_data = df_time[(df_time["x_bin"] == bx) & (df_time["y_bin"] == by)] st.markdown(f"**坐标桶 ({bx},{by}) 缺陷类型演变** (对应面板区域: X {bx*prop_bin}-{(bx+1)*prop_bin}mm, Y {by*prop_bin}-{(by+1)*prop_bin}mm)") bucket_type_timeline = bucket_data.groupby(["day", "defect_type"]).size().unstack(fill_value=0) bucket_type_timeline.index = pd.to_datetime(bucket_type_timeline.index) st.dataframe(bucket_type_timeline, use_container_width=True, height=300) else: st.info("当前数据中无足够多天数的连续缺陷坐标桶 (需 ≥3 天)") # --- 底部:数据导出 --- st.divider() if current_config["show_export"]: st.subheader("📥 数据导出") # 综合报告导出 st.subheader("📋 一键导出综合报告") st.markdown("包含所有分析模块的关键结论,适合汇报和存档。") report_parts = [] report_parts.append("# 缺陷集中性分析综合报告\n") report_parts.append(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report_parts.append(f"**数据范围**: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')}") report_parts.append(f"**筛选后缺陷数**: {len(filtered_df)} 条") report_parts.append(f"**涉及面板**: {filtered_df['panel_id'].nunique()} 块") report_parts.append(f"**视图模式**: {view_mode}\n") # 1. KPI 摘要 report_parts.append("## 1. KPI 摘要\n") report_kpis = calculate_kpis(df, filtered_df) total_panels_inspected_r = report_kpis["total_panels_inspected"] defective_panels_r = report_kpis["defective_panels"] yield_rate_r = report_kpis["yield_rate"] report_parts.append(f"- 检测面板数: {total_panels_inspected_r} 块") defective_rate_r = defective_panels_r / max(total_panels_inspected_r, 1) * 100 report_parts.append(f"- 不良面板数: {defective_panels_r} 块 ({defective_rate_r:.1f}%)") report_parts.append(f"- 综合良率: {yield_rate_r:.1f}%") report_parts.append(f"- 缺陷总数: {len(filtered_df)} 个") report_parts.append(f"- 严重缺陷: {(filtered_df['severity']=='严重').sum()} 个\n") # 2. 缺陷类型 report_parts.append("## 2. 缺陷类型分布\n") type_counts_r = filtered_df["defect_type"].value_counts() for t, c in type_counts_r.items(): report_parts.append(f"- {t}: {c} ({c/len(filtered_df)*100:.1f}%)") report_parts.append("") # 3. 设备/座号 if "equipment_id" in filtered_df.columns: report_parts.append("## 3. 设备与座号分布\n") eq_counts = filtered_df["equipment_id"].value_counts() for e, c in eq_counts.items(): report_parts.append(f"- {e}: {c} 个缺陷") seat_top = filtered_df["seat_id"].value_counts().head(5) report_parts.append(f"\n**缺陷座号 TOP5**:") for i, (s, c) in enumerate(seat_top.items(), 1): report_parts.append(f" {i}. {s}: {c} 个") report_parts.append("") # 4. 趋势 report_parts.append("## 4. 趋势分析\n") daily_r = filtered_df.groupby("day").size() if len(daily_r) >= 2: x_r = np.arange(len(daily_r)) coeffs_r = np.polyfit(x_r, daily_r.values.astype(float), 1) slope_r = coeffs_r[0] if slope_r > 0: report_parts.append(f"- 缺陷数趋势: **上升** (斜率 {slope_r:.1f}/天)") else: report_parts.append(f"- 缺陷数趋势: **下降** (斜率 {slope_r:.1f}/天)") report_parts.append("") # 5. 异常座号 report_parts.append("## 5. 异常检测\n") if "seat_id" in filtered_df.columns: all_seat_stats_r = filtered_df.groupby(["equipment_id", "seat_id"]).size() mean_r = all_seat_stats_r.mean() std_r = all_seat_stats_r.std() threshold_2x_r = mean_r + 2 * std_r critical_r = all_seat_stats_r[all_seat_stats_r > threshold_2x_r] if len(critical_r) > 0: report_parts.append(f"- ⚠️ 2σ 异常座号: {len(critical_r)} 个") for (eq, seat), count in critical_r.items(): report_parts.append(f" - {eq}/{seat}: {count} 个缺陷") else: report_parts.append("- ✅ 无 2σ 异常座号") report_parts.append("") # 6. 建议 report_parts.append("## 6. 建议\n") top_type = type_counts_r.index[0] if len(type_counts_r) > 0 else "-" top_eq = eq_counts.index[0] if len(eq_counts) > 0 else "-" report_parts.append(f"- 重点关注缺陷类型: **{top_type}**") report_parts.append(f"- 重点关注设备: **{top_eq}**") report_parts.append("- 建议查看 SPC 控制图确认趋势状态") report_parts.append("- 建议检查设备健康评分\n") report_parts.append("---\n*本报告由缺陷集中性分析系统自动生成*") full_report = "\n".join(report_parts) col_exp1, col_exp2, col_exp3 = st.columns(3) with col_exp1: st.download_button( label="📥 综合报告 (MD)", data=full_report.encode("utf-8"), file_name=f"defect_report_{datetime.now().strftime('%Y%m%d')}.md", mime="text/markdown", use_container_width=True ) with col_exp2: csv_data = filtered_df.to_csv(index=False).encode("utf-8-sig") st.download_button( label="📥 筛选数据 (CSV)", data=csv_data, file_name=f"defect_data_{datetime.now().strftime('%Y%m%d')}.csv", mime="text/csv", use_container_width=True ) with col_exp3: # 精简版 TXT 报告 txt_lines = ["缺陷集中性分析报告", f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"缺陷数: {len(filtered_df)} | 面板: {filtered_df['panel_id'].nunique()}", f"良率: {yield_rate_r:.1f}%"] for t, c in type_counts_r.head(3).items(): txt_lines.append(f" TOP: {t} {c}个") txt_content = "\n".join(txt_lines) st.download_button( label="📥 精简报告 (TXT)", data=txt_content.encode("utf-8"), file_name=f"defect_summary_{datetime.now().strftime('%Y%m%d')}.txt", mime="text/plain", use_container_width=True )