app_utils.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. """缺陷分析页面的可测试业务逻辑。"""
  2. import numpy as np
  3. import pandas as pd
  4. def normalize_date_bounds(start_date, end_date):
  5. """把日期范围转换成左闭右开的时间边界,确保结束日期整天被包含。"""
  6. start_ts = pd.Timestamp(start_date).normalize()
  7. end_exclusive = pd.Timestamp(end_date).normalize() + pd.Timedelta(days=1)
  8. return start_ts, end_exclusive
  9. def apply_defect_filters(
  10. df,
  11. *,
  12. start_date,
  13. end_date,
  14. selected_types,
  15. selected_batches,
  16. selected_equipment,
  17. selected_seats,
  18. selected_shift="全部",
  19. selected_severity="全部",
  20. ):
  21. """应用页面筛选条件。"""
  22. start_ts, end_exclusive = normalize_date_bounds(start_date, end_date)
  23. mask = (
  24. (df["timestamp"] >= start_ts)
  25. & (df["timestamp"] < end_exclusive)
  26. & (df["defect_type"].isin(selected_types))
  27. & (df["batch_id"].isin(selected_batches))
  28. & (df["equipment_id"].isin(selected_equipment))
  29. )
  30. if selected_shift != "全部":
  31. mask &= df["shift"] == selected_shift
  32. if selected_severity != "全部":
  33. mask &= df["severity"] == selected_severity
  34. if selected_seats:
  35. mask &= df["seat_id"].isin(selected_seats)
  36. return df[mask].copy()
  37. def calculate_kpis(source_df, filtered_df):
  38. """基于当前筛选结果计算页面 KPI。"""
  39. total_panels_inspected = filtered_df["panel_id"].nunique()
  40. defective_panels = filtered_df["panel_id"].nunique()
  41. total_defects = len(filtered_df)
  42. critical_defects = int((filtered_df["severity"] == "严重").sum()) if total_defects else 0
  43. top_defect_type = filtered_df["defect_type"].mode().iloc[0] if total_defects else "-"
  44. yield_rate = (1 - defective_panels / max(total_panels_inspected, 1)) * 100
  45. return {
  46. "total_panels_inspected": int(total_panels_inspected),
  47. "defective_panels": int(defective_panels),
  48. "yield_rate": float(yield_rate),
  49. "total_defects": int(total_defects),
  50. "critical_defects": int(critical_defects),
  51. "top_defect_type": top_defect_type,
  52. }
  53. def calculate_spc_metrics(df):
  54. """计算 SPC 所需数据,防止模拟分母造成非法概率。"""
  55. daily = df.groupby("day").agg(
  56. total_defects=("defect_id", "count"),
  57. panels_with_defects=("panel_id", "nunique"),
  58. ).reset_index()
  59. daily["day"] = pd.to_datetime(daily["day"])
  60. daily = daily.sort_values("day").reset_index(drop=True)
  61. if len(daily) < 2:
  62. return {
  63. "daily": daily,
  64. "p_bar": 0.0,
  65. "ucl": 0.0,
  66. "lcl": 0.0,
  67. "uwl": 0.0,
  68. "lwl": 0.0,
  69. "sigma_p": 0.0,
  70. }
  71. total_days = (df["timestamp"].max() - df["timestamp"].min()).days + 1
  72. total_unique_panels = df["panel_id"].nunique()
  73. estimated = max(total_unique_panels // max(total_days // 7, 1), 1)
  74. daily["estimated_inspected"] = np.maximum(estimated, daily["panels_with_defects"])
  75. daily["defect_rate"] = (
  76. daily["panels_with_defects"] / daily["estimated_inspected"]
  77. ).clip(lower=0, upper=1)
  78. p_bar = float(np.clip(daily["defect_rate"].mean(), 0, 1))
  79. n_avg = float(daily["estimated_inspected"].mean())
  80. sigma_p = float(np.sqrt(max(p_bar * (1 - p_bar), 0) / n_avg)) if n_avg > 0 else 0.0
  81. return {
  82. "daily": daily,
  83. "p_bar": p_bar,
  84. "ucl": min(1.0, p_bar + 3 * sigma_p),
  85. "lcl": max(0.0, p_bar - 3 * sigma_p),
  86. "uwl": min(1.0, p_bar + 2 * sigma_p),
  87. "lwl": max(0.0, p_bar - 2 * sigma_p),
  88. "sigma_p": sigma_p,
  89. }
  90. def build_diagnostic_dashboard(df):
  91. """生成诊断驾驶舱需要的摘要、根因候选和趋势数据。"""
  92. total_defects = len(df)
  93. if total_defects == 0:
  94. return {
  95. "severity_level": "正常",
  96. "top_defect_type": "-",
  97. "top_defect_share": 0.0,
  98. "serious_share": 0.0,
  99. "root_causes": pd.DataFrame(),
  100. "daily_trend": pd.DataFrame(),
  101. "pareto": pd.DataFrame(),
  102. "primary_recommendation": "当前筛选条件下没有缺陷记录。",
  103. }
  104. type_counts = df["defect_type"].value_counts()
  105. top_defect_type = type_counts.index[0]
  106. top_defect_share = float(type_counts.iloc[0] / total_defects)
  107. serious_share = float((df["severity"] == "严重").sum() / total_defects)
  108. root_causes = (
  109. df.groupby(["equipment_id", "seat_id"])
  110. .agg(
  111. 缺陷数=("defect_id", "count"),
  112. 涉及面板=("panel_id", "nunique"),
  113. 主要缺陷=("defect_type", lambda s: s.mode().iloc[0]),
  114. 严重数=("severity", lambda s: int((s == "严重").sum())),
  115. )
  116. .reset_index()
  117. )
  118. root_causes["根因候选"] = root_causes["equipment_id"] + " / " + root_causes["seat_id"]
  119. root_causes["占比"] = root_causes["缺陷数"] / total_defects
  120. root_causes["严重占比"] = root_causes["严重数"] / root_causes["缺陷数"].clip(lower=1)
  121. equipment_totals = df.groupby("equipment_id")["defect_id"].count()
  122. equipment_seat_counts = df.groupby("equipment_id")["seat_id"].nunique().clip(lower=1)
  123. root_causes["期望缺陷数"] = root_causes["equipment_id"].map(
  124. equipment_totals / equipment_seat_counts
  125. ).clip(lower=0.001)
  126. root_causes["异常倍数"] = (root_causes["缺陷数"] / root_causes["期望缺陷数"]).round(2)
  127. count_score = root_causes["缺陷数"] / root_causes["缺陷数"].max()
  128. panel_score = root_causes["涉及面板"] / df["panel_id"].nunique()
  129. lift_score = (root_causes["异常倍数"] / 3).clip(upper=1)
  130. root_causes["风险分"] = (
  131. count_score * 55 + lift_score * 25 + root_causes["严重占比"] * 15 + panel_score * 5
  132. ).round(1)
  133. root_causes = root_causes.sort_values(["风险分", "缺陷数"], ascending=False).head(8)
  134. root_causes = root_causes[
  135. ["根因候选", "缺陷数", "占比", "异常倍数", "涉及面板", "主要缺陷", "严重占比", "风险分"]
  136. ].reset_index(drop=True)
  137. pareto = type_counts.rename_axis("缺陷类型").reset_index(name="缺陷数")
  138. pareto["占比"] = pareto["缺陷数"] / total_defects
  139. pareto["累计占比"] = pareto["占比"].cumsum()
  140. daily_trend = df.groupby("day").size().rename("缺陷数").reset_index()
  141. daily_trend["day"] = pd.to_datetime(daily_trend["day"])
  142. daily_trend = daily_trend.sort_values("day")
  143. if serious_share >= 0.2 or (len(root_causes) > 0 and root_causes.iloc[0]["占比"] >= 0.15):
  144. severity_level = "严重"
  145. elif serious_share >= 0.1 or top_defect_share >= 0.35:
  146. severity_level = "关注"
  147. else:
  148. severity_level = "正常"
  149. if len(root_causes) > 0:
  150. top_root = root_causes.iloc[0]
  151. primary_recommendation = (
  152. f"优先排查 {top_root['根因候选']},该组合贡献 {top_root['占比']:.1%} "
  153. f"缺陷,异常倍数 {top_root['异常倍数']:.2f}x,主要类型为 {top_root['主要缺陷']}。"
  154. )
  155. else:
  156. primary_recommendation = f"优先排查 {top_defect_type} 相关工艺参数。"
  157. return {
  158. "severity_level": severity_level,
  159. "top_defect_type": top_defect_type,
  160. "top_defect_share": top_defect_share,
  161. "serious_share": serious_share,
  162. "root_causes": root_causes,
  163. "daily_trend": daily_trend,
  164. "pareto": pareto,
  165. "primary_recommendation": primary_recommendation,
  166. }