ソースを参照

重构:抽离业务逻辑到 app_utils.py + 新增诊断驾驶舱

- 筛选逻辑 / KPI / SPC / 诊断 → app_utils.py(可测试、可复用)
- 新增"诊断驾驶舱" Tab:Hero 区域 + 4 张卡片 + Plotly 数字孪生面板
- 引入 plotly 替代部分 matplotlib 图表(hover / 缩放交互)
- SPC 分母修正:估算面板数取 max(estimated, panels_with_defects)
- build_diagnostic_dashboard:根因候选风险分排名 + 首要建议
- 批次筛选默认从 :5 改为全选
- 新增 requirements.txt + 单元测试 tests/test_app_utils.py(5 项全通过)
leod 1 週間 前
コミット
8d0fdbece8
5 ファイル変更632 行追加63 行削除
  1. 3 0
      .gitignore
  2. 313 63
      app.py
  3. 190 0
      app_utils.py
  4. 7 0
      requirements.txt
  5. 119 0
      tests/test_app_utils.py

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+__pycache__/
+*.pyc
+*.pyo

+ 313 - 63
app.py

@@ -10,11 +10,19 @@ import matplotlib.pyplot as plt
 import matplotlib.font_manager as fm
 import seaborn as sns
 import streamlit as st
+import plotly.express as px
+import plotly.graph_objects as go
 import os
 from datetime import datetime
 from sklearn.cluster import DBSCAN
 from sklearn.decomposition import PCA
 from sklearn.preprocessing import StandardScaler
+from app_utils import (
+    apply_defect_filters,
+    build_diagnostic_dashboard,
+    calculate_kpis,
+    calculate_spc_metrics,
+)
 
 # --- 中文字体设置 ---
 def setup_chinese_font():
@@ -46,21 +54,6 @@ st.set_page_config(
     initial_sidebar_state="expanded"
 )
 
-# --- 加载数据 ---
-@st.cache_data(ttl=300)
-def load_data():
-    """加载并缓存数据"""
-    if not os.path.exists("defect_data.csv"):
-        st.error("未找到 defect_data.csv,请先运行 generate_data.py 生成数据")
-        return None
-    df = pd.read_csv("defect_data.csv", parse_dates=["timestamp"])
-    df["timestamp"] = pd.to_datetime(df["timestamp"])
-    return df
-
-df = load_data()
-if df is None:
-    st.stop()
-
 # --- 侧边栏 ---
 st.sidebar.title("🔍 筛选条件")
 
@@ -136,7 +129,7 @@ view_mode = st.sidebar.selectbox(
 tab_visibility = {
     "操作员": {
         "tabs": ["🗺️ 空间集中性", "📊 类型集中性 (帕累托)", "📈 时间集中性",
-                 "🏗️ 设备座号集中性", "🔬 缺陷模式识别"],
+                 "🏗️ 设备座号集中性", "🔬 缺陷模式识别", "🧭 诊断驾驶舱"],
         "show_kpi": True,
         "show_export": True,
     },
@@ -147,7 +140,7 @@ tab_visibility = {
     },
     "管理者": {
         "tabs": ["🚨 SPC 控制图与预警", "🔬 缺陷模式识别", "💚 设备健康与共性分析",
-                 "📊 类型集中性 (帕累托)", "📈 时间集中性"],
+                 "📊 类型集中性 (帕累托)", "📈 时间集中性", "🧭 诊断驾驶舱"],
         "show_kpi": True,
         "show_export": True,
     },
@@ -182,7 +175,7 @@ selected_shift = st.sidebar.radio("班次", options=shift_options)
 
 # 批次
 all_batches = sorted(df["batch_id"].unique())
-selected_batches = st.sidebar.multiselect("批次", options=all_batches, default=all_batches[:5])
+selected_batches = st.sidebar.multiselect("批次", options=all_batches, default=all_batches)
 
 # 严重程度
 all_severities = ["全部", "轻微", "中等", "严重"]
@@ -199,30 +192,26 @@ if selected_equipment:
 else:
     selected_seats = []
 
-# 应用筛选
-mask = (
-    (df["timestamp"] >= start_date) &
-    (df["timestamp"] <= end_date) &
-    (df["defect_type"].isin(selected_types)) &
-    (df["batch_id"].isin(selected_batches)) &
-    (df["equipment_id"].isin(selected_equipment))
+filtered_df = apply_defect_filters(
+    df,
+    start_date=start_date,
+    end_date=end_date,
+    selected_types=selected_types,
+    selected_batches=selected_batches,
+    selected_equipment=selected_equipment,
+    selected_seats=selected_seats,
+    selected_shift=selected_shift,
+    selected_severity=selected_severity,
 )
-if selected_shift != "全部":
-    mask &= (df["shift"] == selected_shift)
-if selected_severity != "全部":
-    mask &= (df["severity"] == selected_severity)
-if selected_seats:
-    mask &= (df["seat_id"].isin(selected_seats))
-
-filtered_df = df[mask].copy()
 
 # ========== KPI 看板 ==========
-total_panels_inspected = df[df["timestamp"] >= start_date]["panel_id"].nunique()
-defective_panels = filtered_df["panel_id"].nunique()
-yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100
-total_defects = len(filtered_df)
-critical_defects = (filtered_df["severity"] == "严重").sum()
-top_defect_type = filtered_df["defect_type"].mode().iloc[0] if len(filtered_df) > 0 else "-"
+kpis = calculate_kpis(df, filtered_df)
+total_panels_inspected = kpis["total_panels_inspected"]
+defective_panels = kpis["defective_panels"]
+yield_rate = kpis["yield_rate"]
+total_defects = kpis["total_defects"]
+critical_defects = kpis["critical_defects"]
+top_defect_type = kpis["top_defect_type"]
 
 kpi1, kpi2, kpi3, kpi4, kpi5, kpi6 = st.columns(6)
 kpi1.metric("检测面板数", f"{total_panels_inspected} 块")
@@ -271,8 +260,13 @@ st.markdown(f"**数据范围**: {start_date.strftime('%Y-%m-%d')} ~ {end_date.st
 
 st.divider()
 
+if filtered_df.empty:
+    st.warning("当前筛选条件下没有缺陷记录,请放宽日期、批次、设备或缺陷类型筛选。")
+    st.stop()
+
 # --- Tab 布局 (按角色动态) ---
 ALL_TABS = [
+    "🧭 诊断驾驶舱",
     "🗺️ 空间集中性",
     "📊 类型集中性 (帕累托)",
     "📈 时间集中性",
@@ -298,6 +292,272 @@ def get_tab(name):
     """获取指定 Tab 容器,如果不可见则返回 None"""
     return tab_map.get(name)
 
+# ========== Tab 0: 诊断驾驶舱 ==========
+_t = get_tab("🧭 诊断驾驶舱")
+if _t:
+    with _t:
+        dashboard = build_diagnostic_dashboard(filtered_df)
+        level_colors = {
+            "严重": ("#7f1d1d", "#fee2e2"),
+            "关注": ("#92400e", "#fef3c7"),
+            "正常": ("#14532d", "#dcfce7"),
+        }
+        level_fg, level_bg = level_colors.get(dashboard["severity_level"], ("#334155", "#e2e8f0"))
+
+        st.markdown(
+            """
+            <style>
+            .diag-hero {
+                padding: 24px 28px;
+                border-radius: 24px;
+                background:
+                    radial-gradient(circle at 15% 15%, rgba(20, 184, 166, .18), transparent 28%),
+                    linear-gradient(135deg, #0f172a 0%, #12343b 52%, #294936 100%);
+                color: #f8fafc;
+                box-shadow: 0 18px 45px rgba(15, 23, 42, .18);
+                margin-bottom: 18px;
+            }
+            .diag-hero h2 { margin: 0 0 8px 0; font-size: 30px; letter-spacing: .02em; }
+            .diag-hero p { margin: 0; color: #cbd5e1; font-size: 15px; }
+            .diag-badge {
+                display: inline-flex;
+                align-items: center;
+                padding: 6px 12px;
+                border-radius: 999px;
+                font-weight: 700;
+                margin-bottom: 12px;
+            }
+            .diag-card {
+                padding: 18px 18px;
+                border-radius: 18px;
+                border: 1px solid #dbe4e7;
+                background: linear-gradient(180deg, #ffffff 0%, #f8fafc 100%);
+                min-height: 128px;
+            }
+            .diag-card .label { color: #64748b; font-size: 13px; margin-bottom: 8px; }
+            .diag-card .value { color: #0f172a; font-size: 26px; font-weight: 800; line-height: 1.1; }
+            .diag-card .hint { color: #475569; font-size: 13px; margin-top: 10px; }
+            </style>
+            """,
+            unsafe_allow_html=True,
+        )
+
+        st.markdown(
+            f"""
+            <div class="diag-hero">
+                <div class="diag-badge" style="color:{level_fg}; background:{level_bg};">
+                    当前诊断等级:{dashboard["severity_level"]}
+                </div>
+                <h2>缺陷诊断驾驶舱</h2>
+                <p>{dashboard["primary_recommendation"]}</p>
+            </div>
+            """,
+            unsafe_allow_html=True,
+        )
+
+        card1, card2, card3, card4 = st.columns(4)
+        with card1:
+            st.markdown(
+                f"""
+                <div class="diag-card">
+                    <div class="label">筛选后缺陷</div>
+                    <div class="value">{len(filtered_df)}</div>
+                    <div class="hint">涉及 {filtered_df["panel_id"].nunique()} 块面板</div>
+                </div>
+                """,
+                unsafe_allow_html=True,
+            )
+        with card2:
+            st.markdown(
+                f"""
+                <div class="diag-card">
+                    <div class="label">主导缺陷类型</div>
+                    <div class="value">{dashboard["top_defect_type"]}</div>
+                    <div class="hint">占全部缺陷 {dashboard["top_defect_share"]:.1%}</div>
+                </div>
+                """,
+                unsafe_allow_html=True,
+            )
+        with card3:
+            st.markdown(
+                f"""
+                <div class="diag-card">
+                    <div class="label">严重缺陷占比</div>
+                    <div class="value">{dashboard["serious_share"]:.1%}</div>
+                    <div class="hint">高于 20% 建议立即复盘</div>
+                </div>
+                """,
+                unsafe_allow_html=True,
+            )
+        with card4:
+            top_root = dashboard["root_causes"].iloc[0] if len(dashboard["root_causes"]) else None
+            root_name = top_root["根因候选"] if top_root is not None else "-"
+            root_share = top_root["占比"] if top_root is not None else 0
+            root_lift = top_root["异常倍数"] if top_root is not None else 0
+            st.markdown(
+                f"""
+                <div class="diag-card">
+                    <div class="label">首要根因候选</div>
+                    <div class="value" style="font-size:22px;">{root_name}</div>
+                    <div class="hint">贡献 {root_share:.1%} 缺陷,异常 {root_lift:.2f}x</div>
+                </div>
+                """,
+                unsafe_allow_html=True,
+            )
+
+        st.divider()
+        left, right = st.columns([1.25, 1])
+        with left:
+            st.subheader("交互式面板数字孪生")
+            panel_w = float(df["panel_width_mm"].iloc[0])
+            panel_h = float(df["panel_height_mm"].iloc[0])
+            fig_map = go.Figure()
+            fig_map.add_shape(
+                type="rect",
+                x0=0,
+                y0=0,
+                x1=panel_w,
+                y1=panel_h,
+                line=dict(color="#0f172a", width=2),
+                fillcolor="#f8fafc",
+                layer="below",
+            )
+            fig_map.add_trace(
+                go.Scatter(
+                    x=filtered_df["x_mm"],
+                    y=filtered_df["y_mm"],
+                    mode="markers",
+                    marker=dict(
+                        size=7,
+                        color=filtered_df["severity"].map({"轻微": 1, "中等": 2, "严重": 3}),
+                        colorscale=[[0, "#38bdf8"], [0.5, "#f59e0b"], [1, "#dc2626"]],
+                        showscale=True,
+                        colorbar=dict(title="严重度"),
+                        opacity=0.72,
+                        line=dict(width=0.4, color="#ffffff"),
+                    ),
+                    text=filtered_df["defect_id"],
+                    customdata=filtered_df[["defect_type", "severity", "equipment_id", "seat_id", "batch_id"]],
+                    hovertemplate=(
+                        "缺陷ID: %{text}<br>"
+                        "坐标: (%{x:.1f}, %{y:.1f}) mm<br>"
+                        "类型: %{customdata[0]}<br>"
+                        "严重度: %{customdata[1]}<br>"
+                        "设备/座号: %{customdata[2]} / %{customdata[3]}<br>"
+                        "批次: %{customdata[4]}<extra></extra>"
+                    ),
+                    name="缺陷点",
+                )
+            )
+            fig_map.add_vrect(x0=0, x1=panel_w * 0.1, fillcolor="#f97316", opacity=0.08, line_width=0)
+            fig_map.add_vrect(x0=panel_w * 0.9, x1=panel_w, fillcolor="#f97316", opacity=0.08, line_width=0)
+            fig_map.add_hrect(y0=panel_h * 0.72, y1=panel_h * 0.88, fillcolor="#14b8a6", opacity=0.09, line_width=0)
+            fig_map.update_layout(
+                height=560,
+                margin=dict(l=18, r=18, t=30, b=18),
+                plot_bgcolor="#ffffff",
+                paper_bgcolor="#ffffff",
+                xaxis=dict(title="X (mm)", range=[0, panel_w], showgrid=True, gridcolor="#e2e8f0"),
+                yaxis=dict(title="Y (mm)", range=[0, panel_h], scaleanchor="x", scaleratio=1, showgrid=True, gridcolor="#e2e8f0"),
+                title="按真实屏幕比例定位缺陷,橙色为边缘敏感区,青色为 FPC 关注区",
+            )
+            st.plotly_chart(fig_map, use_container_width=True)
+
+            fig_density = px.density_heatmap(
+                filtered_df,
+                x="x_mm",
+                y="y_mm",
+                nbinsx=28,
+                nbinsy=42,
+                color_continuous_scale="YlOrRd",
+                title="密度热区视图",
+                labels={"x_mm": "X (mm)", "y_mm": "Y (mm)"},
+            )
+            fig_density.update_layout(height=300, margin=dict(l=18, r=18, t=42, b=18))
+            st.plotly_chart(fig_density, use_container_width=True)
+
+        with right:
+            st.subheader("根因候选榜")
+            root_causes = dashboard["root_causes"].copy()
+            fig_root = px.bar(
+                root_causes.sort_values("风险分", ascending=True),
+                x="风险分",
+                y="根因候选",
+                orientation="h",
+                color="异常倍数",
+                color_continuous_scale="Tealrose",
+                text="风险分",
+                hover_data={
+                    "缺陷数": True,
+                    "占比": ":.1%",
+                    "异常倍数": ":.2f",
+                    "涉及面板": True,
+                    "主要缺陷": True,
+                    "严重占比": ":.1%",
+                    "风险分": ":.1f",
+                },
+                labels={"风险分": "风险分", "根因候选": ""},
+            )
+            fig_root.update_traces(texttemplate="%{text:.1f}", textposition="outside")
+            fig_root.update_layout(height=360, margin=dict(l=8, r=20, t=20, b=20))
+            st.plotly_chart(fig_root, use_container_width=True)
+
+            root_table = root_causes.copy()
+            root_table["占比"] = root_table["占比"].map(lambda v: f"{v:.1%}")
+            root_table["异常倍数"] = root_table["异常倍数"].map(lambda v: f"{v:.2f}x")
+            root_table["严重占比"] = root_table["严重占比"].map(lambda v: f"{v:.1%}")
+            st.dataframe(root_table, use_container_width=True, hide_index=True)
+            st.caption("风险分 = 贡献规模 + 异常倍数 + 严重占比 + 涉及面板数。先查高贡献且高偏离的组合。")
+
+        trend_col, pareto_col = st.columns([1, 1])
+        with trend_col:
+            st.subheader("每日缺陷走势")
+            daily_trend = dashboard["daily_trend"]
+            fig_trend_dash = px.area(
+                daily_trend,
+                x="day",
+                y="缺陷数",
+                markers=True,
+                color_discrete_sequence=["#0f766e"],
+                labels={"day": "日期", "缺陷数": "缺陷数"},
+            )
+            fig_trend_dash.update_traces(line=dict(width=3), fillcolor="rgba(20, 184, 166, .22)")
+            fig_trend_dash.update_layout(height=350, margin=dict(l=18, r=18, t=20, b=18))
+            st.plotly_chart(fig_trend_dash, use_container_width=True)
+
+        with pareto_col:
+            st.subheader("缺陷类型 Pareto")
+            pareto = dashboard["pareto"].head(8)
+            fig_pareto_dash = go.Figure()
+            fig_pareto_dash.add_trace(
+                go.Bar(
+                    x=pareto["缺陷类型"],
+                    y=pareto["缺陷数"],
+                    marker_color="#334155",
+                    name="缺陷数",
+                    hovertemplate="%{x}<br>缺陷数: %{y}<extra></extra>",
+                )
+            )
+            fig_pareto_dash.add_trace(
+                go.Scatter(
+                    x=pareto["缺陷类型"],
+                    y=pareto["累计占比"],
+                    yaxis="y2",
+                    mode="lines+markers",
+                    line=dict(color="#dc2626", width=3),
+                    name="累计占比",
+                    hovertemplate="%{x}<br>累计占比: %{y:.1%}<extra></extra>",
+                )
+            )
+            fig_pareto_dash.update_layout(
+                height=350,
+                margin=dict(l=18, r=18, t=20, b=18),
+                yaxis=dict(title="缺陷数"),
+                yaxis2=dict(title="累计占比", overlaying="y", side="right", tickformat=".0%"),
+                legend=dict(orientation="h", y=1.12),
+            )
+            st.plotly_chart(fig_pareto_dash, use_container_width=True)
+
 # ========== Tab 1: 空间集中性 ==========
 _t = get_tab("🗺️ 空间集中性")
 if _t:
@@ -1016,31 +1276,19 @@ if _t:
     # --- 数据准备:按天计算缺陷率 ---
     # 需要知道每天检测了多少面板才能算缺陷率
     # 用 batch_id 近似日期
-        daily_all = df.groupby("day").agg(
-            total_defects=("defect_id", "count"),
-            panels_with_defects=("panel_id", "nunique")
-        ).reset_index()
-        daily_all["day"] = pd.to_datetime(daily_all["day"])
-        daily_all = daily_all.sort_values("day").reset_index(drop=True)
+        spc_metrics = calculate_spc_metrics(df)
+        daily_all = spc_metrics["daily"]
 
         if len(daily_all) < 2:
             st.warning("数据天数不足,无法生成控制图")
         else:
-        # 估算每天检测总数:用总面板数 / 总天数近似
-            total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1
-            total_unique_panels = df["panel_id"].nunique()
-            daily_all["estimated_inspected"] = max(total_unique_panels // max(total_days // 7, 1), 1)  # 按工作日估算
-            daily_all["defect_rate"] = daily_all["panels_with_defects"] / daily_all["estimated_inspected"]
-
         # 控制限计算
-            p_bar = daily_all["defect_rate"].mean()
-            n_avg = daily_all["estimated_inspected"].mean()
-            sigma_p = np.sqrt(p_bar * (1 - p_bar) / n_avg) if n_avg > 0 and p_bar > 0 else 0
-
-            UCL = p_bar + 3 * sigma_p  # 上控制限
-            LCL = max(0, p_bar - 3 * sigma_p)  # 下控制限
-            UWL = p_bar + 2 * sigma_p  # 上警告限
-            LWL = max(0, p_bar - 2 * sigma_p)  # 下警告限
+            p_bar = spc_metrics["p_bar"]
+            sigma_p = spc_metrics["sigma_p"]
+            UCL = spc_metrics["ucl"]
+            LCL = spc_metrics["lcl"]
+            UWL = spc_metrics["uwl"]
+            LWL = spc_metrics["lwl"]
 
         # --- Western Electric 规则检测 ---
             we_violations = []
@@ -1986,11 +2234,13 @@ if current_config["show_export"]:
 
     # 1. KPI 摘要
     report_parts.append("## 1. KPI 摘要\n")
-    total_panels_inspected_r = df[df["timestamp"] >= start_date]["panel_id"].nunique()
-    defective_panels_r = filtered_df["panel_id"].nunique()
-    yield_rate_r = (1 - defective_panels_r / max(total_panels_inspected_r, 1)) * 100
+    report_kpis = calculate_kpis(df, filtered_df)
+    total_panels_inspected_r = report_kpis["total_panels_inspected"]
+    defective_panels_r = report_kpis["defective_panels"]
+    yield_rate_r = report_kpis["yield_rate"]
     report_parts.append(f"- 检测面板数: {total_panels_inspected_r} 块")
-    report_parts.append(f"- 不良面板数: {defective_panels_r} 块 ({defective_panels_r/total_panels_inspected_r*100:.1f}%)")
+    defective_rate_r = defective_panels_r / max(total_panels_inspected_r, 1) * 100
+    report_parts.append(f"- 不良面板数: {defective_panels_r} 块 ({defective_rate_r:.1f}%)")
     report_parts.append(f"- 综合良率: {yield_rate_r:.1f}%")
     report_parts.append(f"- 缺陷总数: {len(filtered_df)} 个")
     report_parts.append(f"- 严重缺陷: {(filtered_df['severity']=='严重').sum()} 个\n")

+ 190 - 0
app_utils.py

@@ -0,0 +1,190 @@
+"""缺陷分析页面的可测试业务逻辑。"""
+
+import numpy as np
+import pandas as pd
+
+
+def normalize_date_bounds(start_date, end_date):
+    """把日期范围转换成左闭右开的时间边界,确保结束日期整天被包含。"""
+    start_ts = pd.Timestamp(start_date).normalize()
+    end_exclusive = pd.Timestamp(end_date).normalize() + pd.Timedelta(days=1)
+    return start_ts, end_exclusive
+
+
+def apply_defect_filters(
+    df,
+    *,
+    start_date,
+    end_date,
+    selected_types,
+    selected_batches,
+    selected_equipment,
+    selected_seats,
+    selected_shift="全部",
+    selected_severity="全部",
+):
+    """应用页面筛选条件。"""
+    start_ts, end_exclusive = normalize_date_bounds(start_date, end_date)
+    mask = (
+        (df["timestamp"] >= start_ts)
+        & (df["timestamp"] < end_exclusive)
+        & (df["defect_type"].isin(selected_types))
+        & (df["batch_id"].isin(selected_batches))
+        & (df["equipment_id"].isin(selected_equipment))
+    )
+    if selected_shift != "全部":
+        mask &= df["shift"] == selected_shift
+    if selected_severity != "全部":
+        mask &= df["severity"] == selected_severity
+    if selected_seats:
+        mask &= df["seat_id"].isin(selected_seats)
+
+    return df[mask].copy()
+
+
+def calculate_kpis(source_df, filtered_df):
+    """基于当前筛选结果计算页面 KPI。"""
+    total_panels_inspected = filtered_df["panel_id"].nunique()
+    defective_panels = filtered_df["panel_id"].nunique()
+    total_defects = len(filtered_df)
+    critical_defects = int((filtered_df["severity"] == "严重").sum()) if total_defects else 0
+    top_defect_type = filtered_df["defect_type"].mode().iloc[0] if total_defects else "-"
+    yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100
+
+    return {
+        "total_panels_inspected": int(total_panels_inspected),
+        "defective_panels": int(defective_panels),
+        "yield_rate": float(yield_rate),
+        "total_defects": int(total_defects),
+        "critical_defects": int(critical_defects),
+        "top_defect_type": top_defect_type,
+    }
+
+
+def calculate_spc_metrics(df):
+    """计算 SPC 所需数据,防止模拟分母造成非法概率。"""
+    daily = df.groupby("day").agg(
+        total_defects=("defect_id", "count"),
+        panels_with_defects=("panel_id", "nunique"),
+    ).reset_index()
+    daily["day"] = pd.to_datetime(daily["day"])
+    daily = daily.sort_values("day").reset_index(drop=True)
+
+    if len(daily) < 2:
+        return {
+            "daily": daily,
+            "p_bar": 0.0,
+            "ucl": 0.0,
+            "lcl": 0.0,
+            "uwl": 0.0,
+            "lwl": 0.0,
+            "sigma_p": 0.0,
+        }
+
+    total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1
+    total_unique_panels = df["panel_id"].nunique()
+    estimated = max(total_unique_panels // max(total_days // 7, 1), 1)
+    daily["estimated_inspected"] = np.maximum(estimated, daily["panels_with_defects"])
+    daily["defect_rate"] = (
+        daily["panels_with_defects"] / daily["estimated_inspected"]
+    ).clip(lower=0, upper=1)
+
+    p_bar = float(np.clip(daily["defect_rate"].mean(), 0, 1))
+    n_avg = float(daily["estimated_inspected"].mean())
+    sigma_p = float(np.sqrt(max(p_bar * (1 - p_bar), 0) / n_avg)) if n_avg > 0 else 0.0
+
+    return {
+        "daily": daily,
+        "p_bar": p_bar,
+        "ucl": min(1.0, p_bar + 3 * sigma_p),
+        "lcl": max(0.0, p_bar - 3 * sigma_p),
+        "uwl": min(1.0, p_bar + 2 * sigma_p),
+        "lwl": max(0.0, p_bar - 2 * sigma_p),
+        "sigma_p": sigma_p,
+    }
+
+
+def build_diagnostic_dashboard(df):
+    """生成诊断驾驶舱需要的摘要、根因候选和趋势数据。"""
+    total_defects = len(df)
+    if total_defects == 0:
+        return {
+            "severity_level": "正常",
+            "top_defect_type": "-",
+            "top_defect_share": 0.0,
+            "serious_share": 0.0,
+            "root_causes": pd.DataFrame(),
+            "daily_trend": pd.DataFrame(),
+            "pareto": pd.DataFrame(),
+            "primary_recommendation": "当前筛选条件下没有缺陷记录。",
+        }
+
+    type_counts = df["defect_type"].value_counts()
+    top_defect_type = type_counts.index[0]
+    top_defect_share = float(type_counts.iloc[0] / total_defects)
+    serious_share = float((df["severity"] == "严重").sum() / total_defects)
+
+    root_causes = (
+        df.groupby(["equipment_id", "seat_id"])
+        .agg(
+            缺陷数=("defect_id", "count"),
+            涉及面板=("panel_id", "nunique"),
+            主要缺陷=("defect_type", lambda s: s.mode().iloc[0]),
+            严重数=("severity", lambda s: int((s == "严重").sum())),
+        )
+        .reset_index()
+    )
+    root_causes["根因候选"] = root_causes["equipment_id"] + " / " + root_causes["seat_id"]
+    root_causes["占比"] = root_causes["缺陷数"] / total_defects
+    root_causes["严重占比"] = root_causes["严重数"] / root_causes["缺陷数"].clip(lower=1)
+    equipment_totals = df.groupby("equipment_id")["defect_id"].count()
+    equipment_seat_counts = df.groupby("equipment_id")["seat_id"].nunique().clip(lower=1)
+    root_causes["期望缺陷数"] = root_causes["equipment_id"].map(
+        equipment_totals / equipment_seat_counts
+    ).clip(lower=0.001)
+    root_causes["异常倍数"] = (root_causes["缺陷数"] / root_causes["期望缺陷数"]).round(2)
+    count_score = root_causes["缺陷数"] / root_causes["缺陷数"].max()
+    panel_score = root_causes["涉及面板"] / df["panel_id"].nunique()
+    lift_score = (root_causes["异常倍数"] / 3).clip(upper=1)
+    root_causes["风险分"] = (
+        count_score * 55 + lift_score * 25 + root_causes["严重占比"] * 15 + panel_score * 5
+    ).round(1)
+    root_causes = root_causes.sort_values(["风险分", "缺陷数"], ascending=False).head(8)
+    root_causes = root_causes[
+        ["根因候选", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"]
+    ].reset_index(drop=True)
+
+    pareto = type_counts.rename_axis("缺陷类型").reset_index(name="缺陷数")
+    pareto["占比"] = pareto["缺陷数"] / total_defects
+    pareto["累计占比"] = pareto["占比"].cumsum()
+
+    daily_trend = df.groupby("day").size().rename("缺陷数").reset_index()
+    daily_trend["day"] = pd.to_datetime(daily_trend["day"])
+    daily_trend = daily_trend.sort_values("day")
+
+    if serious_share >= 0.2 or (len(root_causes) > 0 and root_causes.iloc[0]["占比"] >= 0.15):
+        severity_level = "严重"
+    elif serious_share >= 0.1 or top_defect_share >= 0.35:
+        severity_level = "关注"
+    else:
+        severity_level = "正常"
+
+    if len(root_causes) > 0:
+        top_root = root_causes.iloc[0]
+        primary_recommendation = (
+            f"优先排查 {top_root['根因候选']},该组合贡献 {top_root['占比']:.1%} "
+            f"缺陷,异常倍数 {top_root['异常倍数']:.2f}x,主要类型为 {top_root['主要缺陷']}。"
+        )
+    else:
+        primary_recommendation = f"优先排查 {top_defect_type} 相关工艺参数。"
+
+    return {
+        "severity_level": severity_level,
+        "top_defect_type": top_defect_type,
+        "top_defect_share": top_defect_share,
+        "serious_share": serious_share,
+        "root_causes": root_causes,
+        "daily_trend": daily_trend,
+        "pareto": pareto,
+        "primary_recommendation": primary_recommendation,
+    }

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+streamlit
+pandas
+numpy
+matplotlib
+seaborn
+scikit-learn
+plotly

+ 119 - 0
tests/test_app_utils.py

@@ -0,0 +1,119 @@
+import math
+import os
+import sys
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import pandas as pd
+
+from app_utils import (
+    apply_defect_filters,
+    build_diagnostic_dashboard,
+    calculate_kpis,
+    calculate_spc_metrics,
+)
+
+
+class AppUtilsTest(unittest.TestCase):
+    def setUp(self):
+        self.df = pd.DataFrame(
+            {
+                "defect_id": ["D1", "D2", "D3", "D4"],
+                "panel_id": ["P1", "P2", "P2", "P3"],
+                "batch_id": ["B1", "B1", "B2", "B2"],
+                "equipment_id": ["E1", "E1", "E2", "E2"],
+                "seat_id": ["S1", "S2", "S1", "S2"],
+                "timestamp": pd.to_datetime(
+                    [
+                        "2026-04-01 00:00:00",
+                        "2026-04-01 23:59:59",
+                        "2026-04-02 12:00:00",
+                        "2026-04-03 00:00:01",
+                    ]
+                ),
+                "defect_type": ["划痕", "亮点", "划痕", "暗点"],
+                "severity": ["严重", "轻微", "中等", "严重"],
+                "shift": ["白班", "夜班", "白班", "白班"],
+                "day": ["2026-04-01", "2026-04-01", "2026-04-02", "2026-04-03"],
+            }
+        )
+
+    def test_date_filter_includes_full_end_date(self):
+        filtered = apply_defect_filters(
+            self.df,
+            start_date=pd.Timestamp("2026-04-01"),
+            end_date=pd.Timestamp("2026-04-01"),
+            selected_types=["划痕", "亮点", "暗点"],
+            selected_batches=["B1", "B2"],
+            selected_equipment=["E1", "E2"],
+            selected_seats=["S1", "S2"],
+            selected_shift="全部",
+            selected_severity="全部",
+        )
+
+        self.assertEqual(["D1", "D2"], filtered["defect_id"].tolist())
+
+    def test_kpis_use_same_filter_scope_for_total_panels(self):
+        filtered = apply_defect_filters(
+            self.df,
+            start_date=pd.Timestamp("2026-04-01"),
+            end_date=pd.Timestamp("2026-04-02"),
+            selected_types=["划痕"],
+            selected_batches=["B1", "B2"],
+            selected_equipment=["E1", "E2"],
+            selected_seats=["S1"],
+            selected_shift="全部",
+            selected_severity="全部",
+        )
+
+        kpis = calculate_kpis(self.df, filtered)
+
+        self.assertEqual(2, kpis["total_panels_inspected"])
+        self.assertEqual(2, kpis["defective_panels"])
+        self.assertEqual(0.0, kpis["yield_rate"])
+
+    def test_spc_metrics_clamp_estimated_rate_to_valid_probability(self):
+        metrics = calculate_spc_metrics(self.df)
+
+        self.assertTrue(math.isfinite(metrics["p_bar"]))
+        self.assertTrue(math.isfinite(metrics["ucl"]))
+        self.assertTrue(math.isfinite(metrics["lcl"]))
+        self.assertLessEqual(metrics["daily"]["defect_rate"].max(), 1.0)
+
+    def test_diagnostic_dashboard_ranks_root_cause_candidates(self):
+        dashboard = build_diagnostic_dashboard(self.df)
+
+        self.assertEqual("严重", dashboard["severity_level"])
+        self.assertEqual("E1 / S1", dashboard["root_causes"].iloc[0]["根因候选"])
+        self.assertEqual("划痕", dashboard["top_defect_type"])
+        self.assertIn("优先排查", dashboard["primary_recommendation"])
+
+    def test_diagnostic_dashboard_reports_baseline_lift(self):
+        rows = []
+        for i in range(10):
+            rows.append(
+                {
+                    "defect_id": f"D{i}",
+                    "panel_id": f"P{i}",
+                    "batch_id": "B1",
+                    "equipment_id": "E1",
+                    "seat_id": "S-hot" if i < 8 else "S-cold",
+                    "timestamp": pd.Timestamp("2026-04-01"),
+                    "defect_type": "气泡",
+                    "severity": "严重" if i < 2 else "轻微",
+                    "shift": "白班",
+                    "day": "2026-04-01",
+                }
+            )
+        df = pd.DataFrame(rows)
+
+        dashboard = build_diagnostic_dashboard(df)
+        top = dashboard["root_causes"].iloc[0]
+
+        self.assertEqual("E1 / S-hot", top["根因候选"])
+        self.assertGreater(top["异常倍数"], 1.0)
+
+
+if __name__ == "__main__":
+    unittest.main()