"""缺陷分析页面的可测试业务逻辑。"""
import base64
import html
import io
import os
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib import font_manager as fm
import numpy as np
import pandas as pd
def _setup_chinese_font():
"""配置 matplotlib 中文字体,与 app.py 保持一致。"""
font_paths = [
r"C:\Windows\Fonts\msyh.ttc",
r"C:\Windows\Fonts\simhei.ttf",
r"C:\Windows\Fonts\simsun.ttc",
r"C:\Windows\Fonts\malgun.ttf",
]
for fp in font_paths:
if os.path.exists(fp):
font_prop = fm.FontProperties(fname=fp)
plt.rcParams["font.family"] = font_prop.get_name()
plt.rcParams["axes.unicode_minus"] = False
return font_prop
plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "Arial Unicode MS"]
plt.rcParams["axes.unicode_minus"] = False
return None
_CHINESE_FONT_PROP = _setup_chinese_font()
from defect_analysis.ml.model_bundle import create_model_bundle
from defect_analysis.ml.predict import predict_key_factors
from defect_analysis.root_cause import EXTENDED_ROOT_CAUSE_DIMENSIONS, build_extended_root_causes
from defect_analysis.schemas import (
CORE_REQUIRED_COLUMNS,
INDUSTRY_OPTIONAL_COLUMNS,
TEMPLATE_COLUMNS,
get_missing_required_columns,
normalize_defect_schema,
)
DEFECT_SOP_RECOMMENDATIONS = {
"划痕": ["检查搬运轨道、吸嘴和治具接触面", "复核清洗滚刷与擦拭工位是否有硬质颗粒"],
"气泡": ["检查贴合压力、真空度、OCA 状态和贴合速度", "复核贴合前清洁与材料开封时长"],
"漏光": ["检查边缘贴合、背光组装、框胶和压合均匀性", "复核四角/边缘区应力与夹持状态"],
"色差": ["检查背光、偏光片批次、贴合应力和老化条件", "对比同批材料与相邻工艺参数"],
"异物": ["检查洁净度、清洗段、静电控制和材料暴露时间", "追溯同批材料与工位环境记录"],
"亮点": ["复核点灯/AOI 判定、TFT 像素缺陷和异物压伤", "抽查高发区域是否存在压接或污染"],
"暗点": ["复核点灯/AOI 判定、TFT 像素缺陷和异物压伤", "检查绑定/驱动相关区域异常"],
"裂纹": ["立即检查切割、搬运、夹持和跌落冲击风险", "对同批面板执行 Hold 与复检"],
}
def normalize_date_bounds(start_date, end_date):
"""把日期范围转换成左闭右开的时间边界,确保结束日期整天被包含。"""
start_ts = pd.Timestamp(start_date).normalize()
end_exclusive = pd.Timestamp(end_date).normalize() + pd.Timedelta(days=1)
return start_ts, end_exclusive
def apply_defect_filters(
df,
*,
start_date,
end_date,
selected_types,
selected_batches,
selected_equipment,
selected_seats,
selected_shift="全部",
selected_severity="全部",
):
"""应用页面筛选条件。"""
start_ts, end_exclusive = normalize_date_bounds(start_date, end_date)
mask = (
(df["timestamp"] >= start_ts)
& (df["timestamp"] < end_exclusive)
& (df["defect_type"].isin(selected_types))
& (df["batch_id"].isin(selected_batches))
& (df["equipment_id"].isin(selected_equipment))
)
if selected_shift != "全部":
mask &= df["shift"] == selected_shift
if selected_severity != "全部":
mask &= df["severity"] == selected_severity
if selected_seats:
mask &= df["seat_id"].isin(selected_seats)
return df[mask].copy()
def classify_panel_zone(df):
"""按 3C 面板行业常用语义把坐标映射到关键区域。"""
width = df.get("panel_width_mm", pd.Series(155.0, index=df.index)).replace(0, np.nan)
height = df.get("panel_height_mm", pd.Series(340.0, index=df.index)).replace(0, np.nan)
x = df.get("x_mm", width * 0.5)
y = df.get("y_mm", height * 0.5)
x_norm = x / width
y_norm = y / height
zones = []
for x, y in zip(x_norm.fillna(0.5), y_norm.fillna(0.5)):
labels = []
if x <= 0.1:
labels.append("左边缘区")
if x >= 0.9:
labels.append("右边缘区")
if y <= 0.1:
labels.append("下边缘区")
if y >= 0.9:
labels.append("上边缘区")
if (x <= 0.12 or x >= 0.88) and (y <= 0.12 or y >= 0.88):
labels.append("角落区")
if 0.68 <= y <= 0.88 and 0.25 <= x <= 0.75:
labels.append("FPC/绑定区")
if not labels:
labels.append("显示中心区")
zones.append(" / ".join(labels))
return pd.Series(zones, index=df.index, name="panel_zone")
def calculate_kpis(source_df, filtered_df):
"""基于当前筛选结果计算页面 KPI。"""
total_panels_inspected = filtered_df["panel_id"].nunique()
defective_panels = filtered_df["panel_id"].nunique()
total_defects = len(filtered_df)
critical_defects = int((filtered_df["severity"] == "严重").sum()) if total_defects else 0
top_defect_type = filtered_df["defect_type"].mode().iloc[0] if total_defects else "-"
yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100
return {
"total_panels_inspected": int(total_panels_inspected),
"defective_panels": int(defective_panels),
"yield_rate": float(yield_rate),
"total_defects": int(total_defects),
"critical_defects": int(critical_defects),
"top_defect_type": top_defect_type,
}
def calculate_spc_metrics(df):
"""计算 SPC 所需数据,防止模拟分母造成非法概率。"""
daily = df.groupby("day").agg(
total_defects=("defect_id", "count"),
panels_with_defects=("panel_id", "nunique"),
).reset_index()
daily["day"] = pd.to_datetime(daily["day"])
daily = daily.sort_values("day").reset_index(drop=True)
if len(daily) < 2:
return {
"daily": daily,
"p_bar": 0.0,
"ucl": 0.0,
"lcl": 0.0,
"uwl": 0.0,
"lwl": 0.0,
"sigma_p": 0.0,
}
total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1
total_unique_panels = df["panel_id"].nunique()
estimated = max(total_unique_panels // max(total_days // 7, 1), 1)
daily["estimated_inspected"] = np.maximum(estimated, daily["panels_with_defects"])
daily["defect_rate"] = (
daily["panels_with_defects"] / daily["estimated_inspected"]
).clip(lower=0, upper=1)
p_bar = float(np.clip(daily["defect_rate"].mean(), 0, 1))
n_avg = float(daily["estimated_inspected"].mean())
sigma_p = float(np.sqrt(max(p_bar * (1 - p_bar), 0) / n_avg)) if n_avg > 0 else 0.0
return {
"daily": daily,
"p_bar": p_bar,
"ucl": min(1.0, p_bar + 3 * sigma_p),
"lcl": max(0.0, p_bar - 3 * sigma_p),
"uwl": min(1.0, p_bar + 2 * sigma_p),
"lwl": max(0.0, p_bar - 2 * sigma_p),
"sigma_p": sigma_p,
}
def build_diagnostic_dashboard(df):
"""生成诊断驾驶舱需要的摘要、根因候选和趋势数据。"""
total_defects = len(df)
if total_defects == 0:
return {
"severity_level": "正常",
"top_defect_type": "-",
"top_defect_share": 0.0,
"serious_share": 0.0,
"root_causes": pd.DataFrame(),
"extended_root_causes": pd.DataFrame(),
"daily_trend": pd.DataFrame(),
"pareto": pd.DataFrame(),
"primary_recommendation": "当前筛选条件下没有缺陷记录。",
}
type_counts = df["defect_type"].value_counts()
zones = classify_panel_zone(df)
zone_counts = zones.value_counts()
top_defect_type = type_counts.index[0]
top_defect_share = float(type_counts.iloc[0] / total_defects)
top_zone = zone_counts.index[0]
top_zone_share = float(zone_counts.iloc[0] / total_defects)
serious_share = float((df["severity"] == "严重").sum() / total_defects)
root_causes = (
df.groupby(["equipment_id", "seat_id"])
.agg(
缺陷数=("defect_id", "count"),
涉及面板=("panel_id", "nunique"),
主要缺陷=("defect_type", lambda s: s.mode().iloc[0]),
严重数=("severity", lambda s: int((s == "严重").sum())),
)
.reset_index()
)
root_causes["根因候选"] = root_causes["equipment_id"] + " / " + root_causes["seat_id"]
root_causes["占比"] = root_causes["缺陷数"] / total_defects
root_causes["严重占比"] = root_causes["严重数"] / root_causes["缺陷数"].clip(lower=1)
equipment_totals = df.groupby("equipment_id")["defect_id"].count()
equipment_seat_counts = df.groupby("equipment_id")["seat_id"].nunique().clip(lower=1)
root_causes["期望缺陷数"] = root_causes["equipment_id"].map(
equipment_totals / equipment_seat_counts
).clip(lower=0.001)
root_causes["异常倍数"] = (root_causes["缺陷数"] / root_causes["期望缺陷数"]).round(2)
count_score = root_causes["缺陷数"] / root_causes["缺陷数"].max()
panel_score = root_causes["涉及面板"] / df["panel_id"].nunique()
lift_score = (root_causes["异常倍数"] / 3).clip(upper=1)
root_causes["风险分"] = (
count_score * 55 + lift_score * 25 + root_causes["严重占比"] * 15 + panel_score * 5
).round(1)
root_causes = root_causes.sort_values(["风险分", "缺陷数"], ascending=False).head(8)
root_causes = root_causes[
["根因候选", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"]
].reset_index(drop=True)
pareto = type_counts.rename_axis("缺陷类型").reset_index(name="缺陷数")
pareto["占比"] = pareto["缺陷数"] / total_defects
pareto["累计占比"] = pareto["占比"].cumsum()
daily_trend = df.groupby("day").size().rename("缺陷数").reset_index()
daily_trend["day"] = pd.to_datetime(daily_trend["day"])
daily_trend = daily_trend.sort_values("day")
extended_root_causes = build_extended_root_causes(df)
if serious_share >= 0.2 or (len(root_causes) > 0 and root_causes.iloc[0]["占比"] >= 0.15):
severity_level = "严重"
elif serious_share >= 0.1 or top_defect_share >= 0.35:
severity_level = "关注"
else:
severity_level = "正常"
if len(root_causes) > 0:
top_root = root_causes.iloc[0]
primary_recommendation = (
f"优先排查 {top_root['根因候选']},该组合贡献 {top_root['占比']:.1%} "
f"缺陷,异常倍数 {top_root['异常倍数']:.2f}x,主要类型为 {top_root['主要缺陷']}。"
)
else:
primary_recommendation = f"优先排查 {top_defect_type} 相关工艺参数。"
return {
"severity_level": severity_level,
"top_defect_type": top_defect_type,
"top_defect_share": top_defect_share,
"top_zone": top_zone,
"top_zone_share": top_zone_share,
"zone_distribution": zone_counts.rename_axis("区域").reset_index(name="缺陷数"),
"serious_share": serious_share,
"root_causes": root_causes,
"extended_root_causes": extended_root_causes,
"daily_trend": daily_trend,
"pareto": pareto,
"primary_recommendation": primary_recommendation,
}
def detect_industry_patterns(df):
"""识别面板行业常见缺陷模式。"""
if df.empty:
return []
patterns = []
zones = classify_panel_zone(df)
zone_share = zones.value_counts(normalize=True)
if any(idx != "显示中心区" and share >= 0.35 for idx, share in zone_share.items()):
patterns.append(f"区域集中: {zone_share.index[0]} 占比 {zone_share.iloc[0]:.1%}")
coord_df = df.copy()
coord_df["x_bin"] = (coord_df["x_mm"] // 5).astype(int)
coord_df["y_bin"] = (coord_df["y_mm"] // 5).astype(int)
repeat = coord_df.groupby(["x_bin", "y_bin"])["panel_id"].nunique().max()
if repeat >= min(3, max(2, df["panel_id"].nunique())):
patterns.append("跨面板重复坐标: 疑似治具、吸嘴、压头或固定接触点异常")
if df["x_mm"].nunique() >= 3 and df["y_mm"].nunique() >= 3 and len(df) >= 6:
corr = abs(pd.Series(df["x_mm"]).corr(pd.Series(df["y_mm"])))
if pd.notna(corr) and corr >= 0.85:
patterns.append("线状分布: 疑似搬运划伤、滚轮轨迹或线性压伤")
batch_share = df["batch_id"].value_counts(normalize=True).iloc[0]
if batch_share >= 0.5 and df["batch_id"].nunique() > 1:
patterns.append(f"批次集中: {df['batch_id'].value_counts().index[0]} 占比 {batch_share:.1%}")
return patterns or ["随机点状分布: 更偏向材料、环境尘埃或偶发检出"]
def generate_industry_diagnosis(df, dashboard):
"""生成 3C 面板行业化诊断结论和排查建议。"""
if df.empty:
return {
"headline": "当前筛选条件下没有可诊断缺陷。",
"patterns": [],
"recommendations": ["放宽筛选条件或上传更多检测记录后再诊断。"],
}
top_type = dashboard["top_defect_type"]
top_zone = dashboard.get("top_zone", classify_panel_zone(df).value_counts().index[0])
top_root = dashboard["root_causes"].iloc[0]["根因候选"] if len(dashboard["root_causes"]) else "当前筛选范围"
patterns = detect_industry_patterns(df)
recommendations = []
if top_type in DEFECT_SOP_RECOMMENDATIONS:
recommendations.extend(DEFECT_SOP_RECOMMENDATIONS[top_type])
if "边缘" in top_zone or "角落" in top_zone:
recommendations.append("优先复核边缘贴合、切割/搬运夹持、吸附接触面和四角应力状态")
if "FPC" in top_zone or "绑定" in top_zone:
recommendations.append("重点检查绑定压力、FPC/COF 区域异物、压接参数和 AOI 复判样本")
if any("跨面板重复" in p for p in patterns):
recommendations.append("对高发座号对应治具、吸嘴、压头做点检,并抽查同坐标复现样本")
if dashboard["serious_share"] >= 0.2:
recommendations.append("严重缺陷占比较高,建议对相关批次执行 Hold、复检或加严抽样")
deduped = []
for item in recommendations:
if item not in deduped:
deduped.append(item)
headline = (
f"{top_zone} 的 {top_type} 最突出,首要候选为 {top_root}。"
f"建议按工序链路优先排查材料、贴合/搬运接触面和对应治具状态。"
)
return {
"headline": headline,
"patterns": patterns,
"recommendations": deduped[:5],
}
def build_ml_factor_insights(
df,
*,
target_defect_type=None,
target_severity=None,
model_name="random_forest",
top_n=10,
):
"""构建页面可展示的 ML 关键因子、验证指标和特征解释。"""
normalized = normalize_defect_schema(df)
resolved_target_type = target_defect_type
if resolved_target_type is None and not normalized.empty:
resolved_target_type = normalized["defect_type"].mode().iloc[0]
base = {
"target_defect_type": resolved_target_type,
"target_severity": target_severity,
"model_name": model_name,
"key_factors": pd.DataFrame(),
"metrics": {},
"validation_metrics": {},
"feature_importance": [],
"error": None,
}
if normalized.empty:
base["error"] = "当前筛选条件下没有可训练数据。"
return base
try:
base["key_factors"] = predict_key_factors(
normalized,
target_defect_type=resolved_target_type,
target_severity=target_severity,
model_name=model_name,
top_n=top_n,
)
bundle = create_model_bundle(
normalized,
model_name=model_name,
target_defect_type=resolved_target_type,
target_severity=target_severity,
)
except (RuntimeError, ValueError) as exc:
base["error"] = str(exc)
return base
base["metrics"] = bundle["metrics"]
base["validation_metrics"] = bundle["validation_metrics"]
base["feature_importance"] = bundle["feature_importance"]
return base
def _fig_to_base64(fig, *, dpi=120):
"""把 matplotlib Figure 转成 base64 PNG data URI。"""
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight", facecolor="white")
buf.seek(0)
encoded = base64.b64encode(buf.read()).decode("utf-8")
buf.close()
plt.close(fig)
return f"data:image/png;base64,{encoded}"
def generate_report_charts(filtered_df, *, daily_trend_df=None):
"""生成报告内嵌的三张核心图表,返回 dict of base64 data URIs。"""
charts = {}
# --- 1. 缺陷类型分布条形图 ---
type_counts = filtered_df["defect_type"].value_counts().head(10)
if not type_counts.empty:
fig, ax = plt.subplots(figsize=(7, 3.5))
colors = ["#0f766e", "#14b8a6", "#22d3ee", "#38bdf8", "#60a5fa",
"#a78bfa", "#c084fc", "#e879f9", "#f472b6", "#fb7185"]
bars = ax.barh(
range(len(type_counts)),
type_counts.values,
color=colors[: len(type_counts)],
)
ax.set_yticks(range(len(type_counts)))
ax.set_yticklabels(type_counts.index, fontsize=11)
ax.invert_yaxis()
for i, (bar, val) in enumerate(zip(bars, type_counts.values)):
ax.text(bar.get_width() + max(type_counts.values) * 0.01,
bar.get_y() + bar.get_height() / 2,
str(val), va="center", fontsize=10, fontweight="bold")
ax.set_title("缺陷类型 TOP 10", fontsize=13, fontweight="bold", pad=12)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.set_xlabel("缺陷数")
charts["type_distribution"] = _fig_to_base64(fig)
# --- 2. 每日趋势折线图 ---
if daily_trend_df is not None and not daily_trend_df.empty:
daily = daily_trend_df.copy()
daily["day"] = pd.to_datetime(daily["day"])
daily = daily.sort_values("day")
fig, ax = plt.subplots(figsize=(7, 3))
ax.plot(daily["day"], daily["缺陷数"], marker="o", linewidth=2,
markersize=5, color="#0f766e")
ax.fill_between(daily["day"], daily["缺陷数"], alpha=0.15, color="#0f766e")
ax.set_title("每日缺陷数趋势", fontsize=13, fontweight="bold", pad=10)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.tick_params(axis="x", rotation=30)
charts["daily_trend"] = _fig_to_base64(fig)
# --- 3. 设备缺陷分布 ---
eq_counts = filtered_df.get("equipment_id")
if eq_counts is not None:
eq_counts = eq_counts.value_counts().head(8)
if not eq_counts.empty:
fig, ax = plt.subplots(figsize=(7, 3))
ax.bar(
range(len(eq_counts)),
eq_counts.values,
color=["#1e3a5f", "#2563eb", "#3b82f6", "#60a5fa",
"#93c5fd", "#0d9488", "#14b8a6", "#2dd4bf"][: len(eq_counts)],
)
ax.set_xticks(range(len(eq_counts)))
ax.set_xticklabels(eq_counts.index, rotation=25, ha="right", fontsize=10)
ax.set_title("设备缺陷分布 TOP 8", fontsize=13, fontweight="bold", pad=10)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.set_ylabel("缺陷数")
for i, val in enumerate(eq_counts.values):
ax.text(i, val + max(eq_counts.values) * 0.02, str(val),
ha="center", fontsize=9, fontweight="bold")
charts["equipment_distribution"] = _fig_to_base64(fig)
# --- 4. 严重程度饼图 ---
if "severity" in filtered_df.columns and not filtered_df.empty:
sev_counts = filtered_df["severity"].value_counts()
if not sev_counts.empty:
fig, ax = plt.subplots(figsize=(4.5, 3.5))
sev_colors = {"轻微": "#22c55e", "一般": "#f59e0b", "严重": "#ef4444"}
colors = [sev_colors.get(name, "#94a3b8") for name in sev_counts.index]
wedges, texts, autotexts = ax.pie(
sev_counts.values, labels=sev_counts.index,
autopct="%1.1f%%", colors=colors, startangle=90,
textprops={"fontsize": 11},
)
for at in autotexts:
at.set_fontweight("bold")
ax.set_title("严重程度占比", fontsize=13, fontweight="bold", pad=10)
charts["severity_pie"] = _fig_to_base64(fig)
return charts
def _escape(value):
return html.escape(str(value), quote=True)
def _series_rows(series):
if series is None:
return []
return list(series.items())
def build_html_report(
*,
generated_at,
date_range_text,
view_mode,
defect_count,
panel_count,
kpis,
type_counts,
equipment_counts=None,
seat_top=None,
trend_summary="-",
anomaly_rows=None,
recommendations=None,
charts=None,
):
"""生成可直接在浏览器打开的自包含综合 HTML 报告。"""
anomaly_rows = anomaly_rows or []
recommendations = recommendations or []
charts = charts or {}
type_rows = _series_rows(type_counts)
equipment_rows = _series_rows(equipment_counts)
seat_rows = _series_rows(seat_top)
type_total = max(sum(int(count) for _, count in type_rows), 1)
type_items = "\n".join(
f"""
| {_escape(name)} |
{int(count)} |
{count / type_total:.1%} |
"""
for name, count in type_rows
) or '| 暂无数据 |
'
equipment_items = "\n".join(
f"| {_escape(name)} | {int(count)} |
"
for name, count in equipment_rows
) or '| 暂无数据 |
'
seat_items = "\n".join(
f"| {_escape(name)} | {int(count)} |
"
for name, count in seat_rows
) or '| 暂无数据 |
'
anomaly_items = "\n".join(
f"| {_escape(row['equipment'])} | {_escape(row['seat'])} | {int(row['count'])} |
"
for row in anomaly_rows
) or '| 无 2σ 异常座号 |
'
recommendation_items = "\n".join(
f"{_escape(item)}" for item in recommendations
) or "暂无建议"
return f"""
缺陷集中性分析综合报告
筛选后缺陷数
{int(defect_count)}
综合良率
{float(kpis.get('yield_rate', 0)):.1f}%
严重缺陷
{int(kpis.get('critical_defects', 0))}
1. KPI 摘要
| 指标 | 数值 |
| 检测面板数 | {int(kpis.get('total_panels_inspected', 0))} 块 |
| 不良面板数 | {int(kpis.get('defective_panels', 0))} 块 |
| 严重缺陷 | {int(kpis.get('critical_defects', 0))} 个 |
2. 趋势分析
{_escape(trend_summary)}
{('

') if "daily_trend" in charts else ""}
建议结合 SPC 控制图确认是否越过预警线或控制线。
3. 缺陷类型分布
{('
') if "type_distribution" in charts else ""}
4. 设备分布
{('

') if "equipment_distribution" in charts else ""}
{('') if "severity_pie" in charts else ""}
{"6. 异常检测" if "severity_pie" not in charts else "7. 异常检测"}
{"7. 排查建议" if "severity_pie" not in charts else "8. 排查建议"}
本报告由缺陷集中性分析系统自动生成,可直接归档、邮件发送或浏览器打印为 PDF。
"""