Przeglądaj źródła

修复生产级审核问题

DESKTOP-74CLTRG\Leol 4 dni temu
rodzic
commit
01d417fa70

+ 25 - 9
defect_analysis/cases.py

@@ -1,6 +1,7 @@
 """异常 Case 闭环与审计日志。"""
 """异常 Case 闭环与审计日志。"""
 
 
 import sqlite3
 import sqlite3
+from contextlib import closing
 from pathlib import Path
 from pathlib import Path
 
 
 import pandas as pd
 import pandas as pd
@@ -9,10 +10,18 @@ from defect_analysis.database import init_database
 
 
 
 
 VALID_CASE_STATUSES = {"OPEN", "IN_PROGRESS", "IMPROVED", "CLOSED", "REJECTED"}
 VALID_CASE_STATUSES = {"OPEN", "IN_PROGRESS", "IMPROVED", "CLOSED", "REJECTED"}
+VALID_CASE_TRANSITIONS = {
+    "OPEN": {"IN_PROGRESS", "CLOSED", "REJECTED"},
+    "IN_PROGRESS": {"IMPROVED", "CLOSED", "REJECTED"},
+    "IMPROVED": {"CLOSED", "IN_PROGRESS"},
+    "CLOSED": set(),
+    "REJECTED": set(),
+}
 
 
 
 
 def _connect(db_path):
 def _connect(db_path):
     conn = sqlite3.connect(Path(db_path))
     conn = sqlite3.connect(Path(db_path))
+    conn.execute("PRAGMA foreign_keys = ON")
     conn.row_factory = sqlite3.Row
     conn.row_factory = sqlite3.Row
     return conn
     return conn
 
 
@@ -41,7 +50,7 @@ def create_root_cause_case(
 ):
 ):
     """从根因候选创建异常 Case。"""
     """从根因候选创建异常 Case。"""
     init_database(db_path)
     init_database(db_path)
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         cursor = conn.execute(
         cursor = conn.execute(
             """
             """
             INSERT INTO root_cause_cases (
             INSERT INTO root_cause_cases (
@@ -70,6 +79,7 @@ def create_root_cause_case(
             actor=created_by,
             actor=created_by,
             details=f"创建 Case: {title}; 建议: {recommendation}",
             details=f"创建 Case: {title}; 建议: {recommendation}",
         )
         )
+        conn.commit()
         return case_id
         return case_id
 
 
 
 
@@ -78,21 +88,26 @@ def update_case_status(db_path, *, case_id, status, actor, note=""):
     if status not in VALID_CASE_STATUSES:
     if status not in VALID_CASE_STATUSES:
         raise ValueError(f"无效 Case 状态: {status}")
         raise ValueError(f"无效 Case 状态: {status}")
     init_database(db_path)
     init_database(db_path)
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         current = conn.execute(
         current = conn.execute(
             "SELECT status FROM root_cause_cases WHERE case_id = ?",
             "SELECT status FROM root_cause_cases WHERE case_id = ?",
             (int(case_id),),
             (int(case_id),),
         ).fetchone()
         ).fetchone()
         if current is None:
         if current is None:
             raise ValueError(f"未找到 Case: {case_id}")
             raise ValueError(f"未找到 Case: {case_id}")
-        closed_at_expr = "CURRENT_TIMESTAMP" if status == "CLOSED" else "closed_at"
+        current_status = current["status"]
+        if status not in VALID_CASE_TRANSITIONS.get(current_status, set()):
+            raise ValueError(f"不允许的 Case 状态流转: {current_status} -> {status}")
+        closed_at = pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S") if status == "CLOSED" else None
         conn.execute(
         conn.execute(
-            f"""
+            """
             UPDATE root_cause_cases
             UPDATE root_cause_cases
-            SET status = ?, updated_at = CURRENT_TIMESTAMP, closed_at = {closed_at_expr}
+            SET status = ?,
+                updated_at = CURRENT_TIMESTAMP,
+                closed_at = COALESCE(?, closed_at)
             WHERE case_id = ?
             WHERE case_id = ?
             """,
             """,
-            (status, int(case_id)),
+            (status, closed_at, int(case_id)),
         )
         )
         _write_audit_log(
         _write_audit_log(
             conn,
             conn,
@@ -100,8 +115,9 @@ def update_case_status(db_path, *, case_id, status, actor, note=""):
             entity_id=case_id,
             entity_id=case_id,
             action="UPDATE_STATUS",
             action="UPDATE_STATUS",
             actor=actor,
             actor=actor,
-            details=f"{current['status']} -> {status}; {note}",
+            details=f"{current_status} -> {status}; {note}",
         )
         )
+        conn.commit()
 
 
 
 
 def list_cases(db_path, *, status=None):
 def list_cases(db_path, *, status=None):
@@ -112,7 +128,7 @@ def list_cases(db_path, *, status=None):
     if status is not None:
     if status is not None:
         where = "WHERE status = ?"
         where = "WHERE status = ?"
         params.append(status)
         params.append(status)
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         rows = conn.execute(
         rows = conn.execute(
             f"""
             f"""
             SELECT case_id, title, status, candidate_type, candidate_value,
             SELECT case_id, title, status, candidate_type, candidate_value,
@@ -139,7 +155,7 @@ def get_audit_logs(db_path, *, entity_type=None, entity_id=None):
         clauses.append("entity_id = ?")
         clauses.append("entity_id = ?")
         params.append(int(entity_id))
         params.append(int(entity_id))
     where = "WHERE " + " AND ".join(clauses) if clauses else ""
     where = "WHERE " + " AND ".join(clauses) if clauses else ""
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         rows = conn.execute(
         rows = conn.execute(
             f"""
             f"""
             SELECT audit_id, entity_type, entity_id, action, actor, details, created_at
             SELECT audit_id, entity_type, entity_id, action, actor, details, created_at

+ 15 - 8
defect_analysis/data_quality.py

@@ -35,15 +35,20 @@ def build_data_quality_report(df):
             "issues": ["数据为空"],
             "issues": ["数据为空"],
         }
         }
 
 
+    missing_columns = [column for column in CORE_REQUIRED_COLUMNS if column not in df.columns]
     required_complete_rate = _non_empty_rate(df, CORE_REQUIRED_COLUMNS)
     required_complete_rate = _non_empty_rate(df, CORE_REQUIRED_COLUMNS)
-    coordinate_valid = (
-        (df["x_mm"] >= 0)
-        & (df["x_mm"] <= df["panel_width_mm"])
-        & (df["y_mm"] >= 0)
-        & (df["y_mm"] <= df["panel_height_mm"])
-    )
-    coordinate_valid_rate = float(coordinate_valid.mean())
-    enum_valid_rate = float(df["defect_type"].isin(VALID_DEFECT_TYPES).mean())
+    coordinate_columns = ["x_mm", "y_mm", "panel_width_mm", "panel_height_mm"]
+    if all(column in df.columns for column in coordinate_columns):
+        coordinate_valid = (
+            (df["x_mm"] >= 0)
+            & (df["x_mm"] <= df["panel_width_mm"])
+            & (df["y_mm"] >= 0)
+            & (df["y_mm"] <= df["panel_height_mm"])
+        )
+        coordinate_valid_rate = float(coordinate_valid.mean())
+    else:
+        coordinate_valid_rate = 0.0
+    enum_valid_rate = float(df["defect_type"].isin(VALID_DEFECT_TYPES).mean()) if "defect_type" in df.columns else 0.0
     traceability_rate = _non_empty_rate(df, TRACEABILITY_COLUMNS)
     traceability_rate = _non_empty_rate(df, TRACEABILITY_COLUMNS)
     duplicate_defect_rate = float(df["defect_id"].duplicated().mean())
     duplicate_defect_rate = float(df["defect_id"].duplicated().mean())
 
 
@@ -58,6 +63,8 @@ def build_data_quality_report(df):
     issues = []
     issues = []
     if required_complete_rate < 1:
     if required_complete_rate < 1:
         issues.append("必填字段存在空值")
         issues.append("必填字段存在空值")
+    if missing_columns:
+        issues.append("缺少必填字段: " + ", ".join(missing_columns))
     if coordinate_valid_rate < 1:
     if coordinate_valid_rate < 1:
         issues.append("坐标存在超出面板范围的数据")
         issues.append("坐标存在超出面板范围的数据")
     if enum_valid_rate < 1:
     if enum_valid_rate < 1:

+ 29 - 19
defect_analysis/database.py

@@ -2,6 +2,7 @@
 
 
 import json
 import json
 import sqlite3
 import sqlite3
+from contextlib import closing
 from pathlib import Path
 from pathlib import Path
 
 
 import pandas as pd
 import pandas as pd
@@ -13,13 +14,14 @@ def _connect(db_path):
     path = Path(db_path)
     path = Path(db_path)
     path.parent.mkdir(parents=True, exist_ok=True)
     path.parent.mkdir(parents=True, exist_ok=True)
     conn = sqlite3.connect(path)
     conn = sqlite3.connect(path)
+    conn.execute("PRAGMA foreign_keys = ON")
     conn.row_factory = sqlite3.Row
     conn.row_factory = sqlite3.Row
     return conn
     return conn
 
 
 
 
 def init_database(db_path):
 def init_database(db_path):
     """初始化生产级最小数据库结构。"""
     """初始化生产级最小数据库结构。"""
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         conn.execute(
         conn.execute(
             """
             """
             CREATE TABLE IF NOT EXISTS import_batches (
             CREATE TABLE IF NOT EXISTS import_batches (
@@ -77,11 +79,12 @@ def init_database(db_path):
             """
             """
         )
         )
         conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)")
         conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)")
+        conn.commit()
 
 
 
 
 def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"):
 def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"):
     """创建导入批次记录,返回 import_id。"""
     """创建导入批次记录,返回 import_id。"""
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         cursor = conn.execute(
         cursor = conn.execute(
             """
             """
             INSERT INTO import_batches (source_name, row_count, quality_score, status)
             INSERT INTO import_batches (source_name, row_count, quality_score, status)
@@ -89,13 +92,15 @@ def create_import_batch(db_path, *, source_name, row_count, quality_score=None,
             """,
             """,
             (source_name, int(row_count), quality_score, status),
             (source_name, int(row_count), quality_score, status),
         )
         )
-        return int(cursor.lastrowid)
+        import_id = int(cursor.lastrowid)
+        conn.commit()
+        return import_id
 
 
 
 
 def list_import_batches(db_path):
 def list_import_batches(db_path):
     """列出导入批次。"""
     """列出导入批次。"""
     init_database(db_path)
     init_database(db_path)
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         rows = conn.execute(
         rows = conn.execute(
             """
             """
             SELECT import_id, source_name, row_count, status, quality_score, created_at
             SELECT import_id, source_name, row_count, status, quality_score, created_at
@@ -110,20 +115,25 @@ def insert_defects(db_path, df, *, import_id=None):
     """幂等写入缺陷记录,按 defect_id 去重。"""
     """幂等写入缺陷记录,按 defect_id 去重。"""
     init_database(db_path)
     init_database(db_path)
     normalized = normalize_defect_schema(df)
     normalized = normalize_defect_schema(df)
-    inserted = 0
-    with _connect(db_path) as conn:
-        for _, row in normalized.iterrows():
-            payload = row.to_dict()
-            payload["timestamp"] = str(payload.get("timestamp", ""))
-            cursor = conn.execute(
-                """
-                INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json)
-                VALUES (?, ?, ?)
-                """,
-                (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str)),
-            )
-            inserted += cursor.rowcount
-    return inserted
+    records = []
+    for _, row in normalized.iterrows():
+        payload = row.to_dict()
+        payload["timestamp"] = str(payload.get("timestamp", ""))
+        records.append(
+            (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str))
+        )
+    with closing(_connect(db_path)) as conn:
+        before = conn.total_changes
+        conn.executemany(
+            """
+            INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json)
+            VALUES (?, ?, ?)
+            """,
+            records,
+        )
+        inserted = conn.total_changes - before
+        conn.commit()
+        return inserted
 
 
 
 
 def load_defects(db_path, *, import_id=None):
 def load_defects(db_path, *, import_id=None):
@@ -135,7 +145,7 @@ def load_defects(db_path, *, import_id=None):
         where = "WHERE import_id = ?"
         where = "WHERE import_id = ?"
         params.append(import_id)
         params.append(import_id)
 
 
-    with _connect(db_path) as conn:
+    with closing(_connect(db_path)) as conn:
         rows = conn.execute(
         rows = conn.execute(
             f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id",
             f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id",
             params,
             params,

+ 2 - 2
defect_analysis/root_cause.py

@@ -20,7 +20,7 @@ EXTENDED_ROOT_CAUSE_DIMENSIONS = [
 
 
 def build_extended_root_causes(df, dimensions=None):
 def build_extended_root_causes(df, dimensions=None):
     """按治具、吸嘴、材料批次等行业维度生成扩展根因候选。"""
     """按治具、吸嘴、材料批次等行业维度生成扩展根因候选。"""
-    dimensions = dimensions or EXTENDED_ROOT_CAUSE_DIMENSIONS
+    dimensions = EXTENDED_ROOT_CAUSE_DIMENSIONS if dimensions is None else dimensions
     total_defects = max(len(df), 1)
     total_defects = max(len(df), 1)
     rows = []
     rows = []
     for dimension in dimensions:
     for dimension in dimensions:
@@ -33,7 +33,7 @@ def build_extended_root_causes(df, dimensions=None):
         counts = valid.groupby(dimension).agg(
         counts = valid.groupby(dimension).agg(
             缺陷数=("defect_id", "count"),
             缺陷数=("defect_id", "count"),
             涉及面板=("panel_id", "nunique"),
             涉及面板=("panel_id", "nunique"),
-            主要缺陷=("defect_type", lambda s: s.mode().iloc[0]),
+            主要缺陷=("defect_type", lambda s: s.mode().iloc[0] if not s.mode().empty else "-"),
             严重数=("severity", lambda s: int((s == "严重").sum())),
             严重数=("severity", lambda s: int((s == "严重").sum())),
         ).reset_index()
         ).reset_index()
         expected = len(valid) / max(valid[dimension].nunique(), 1)
         expected = len(valid) / max(valid[dimension].nunique(), 1)

+ 1 - 1
defect_analysis/schemas.py

@@ -105,7 +105,7 @@ def normalize_defect_schema(df):
             normalized[column] = value
             normalized[column] = value
 
 
     if "timestamp" in normalized.columns:
     if "timestamp" in normalized.columns:
-        normalized["timestamp"] = pd.to_datetime(normalized["timestamp"])
+        normalized["timestamp"] = pd.to_datetime(normalized["timestamp"], errors="coerce")
         if "hour" in normalized.columns:
         if "hour" in normalized.columns:
             normalized["hour"] = normalized["hour"].fillna(normalized["timestamp"].dt.hour)
             normalized["hour"] = normalized["hour"].fillna(normalized["timestamp"].dt.hour)
         if "day" in normalized.columns:
         if "day" in normalized.columns:

+ 1 - 1
import_to_database.py

@@ -10,7 +10,7 @@ from defect_analysis.schemas import normalize_defect_schema
 
 
 
 
 def import_csv_to_database(csv_path, db_path, source_name=None):
 def import_csv_to_database(csv_path, db_path, source_name=None):
-    df = pd.read_csv(csv_path, parse_dates=["timestamp"])
+    df = pd.read_csv(csv_path, parse_dates=["timestamp"], encoding="utf-8-sig")
     df = normalize_defect_schema(df)
     df = normalize_defect_schema(df)
     quality_report = build_data_quality_report(df)
     quality_report = build_data_quality_report(df)
 
 

+ 2 - 2
manage_cases.py

@@ -2,7 +2,7 @@
 
 
 import argparse
 import argparse
 
 
-from defect_analysis.cases import create_root_cause_case, list_cases, update_case_status
+from defect_analysis.cases import VALID_CASE_STATUSES, create_root_cause_case, list_cases, update_case_status
 
 
 
 
 def main():
 def main():
@@ -23,7 +23,7 @@ def main():
     update_parser = subparsers.add_parser("update", help="更新 Case 状态")
     update_parser = subparsers.add_parser("update", help="更新 Case 状态")
     update_parser.add_argument("--db", default="defect_analysis.db")
     update_parser.add_argument("--db", default="defect_analysis.db")
     update_parser.add_argument("--case-id", type=int, required=True)
     update_parser.add_argument("--case-id", type=int, required=True)
-    update_parser.add_argument("--status", required=True)
+    update_parser.add_argument("--status", required=True, choices=sorted(VALID_CASE_STATUSES))
     update_parser.add_argument("--actor", default="system")
     update_parser.add_argument("--actor", default="system")
     update_parser.add_argument("--note", default="")
     update_parser.add_argument("--note", default="")
 
 

+ 17 - 0
tests/test_cases.py

@@ -77,6 +77,23 @@ class CasesTest(unittest.TestCase):
         self.assertEqual(["CREATE_CASE", "UPDATE_STATUS", "UPDATE_STATUS"], logs["action"].tolist())
         self.assertEqual(["CREATE_CASE", "UPDATE_STATUS", "UPDATE_STATUS"], logs["action"].tolist())
         self.assertIn("已更换治具", logs.iloc[-1]["details"])
         self.assertIn("已更换治具", logs.iloc[-1]["details"])
 
 
+    def test_closed_case_cannot_reopen(self):
+        case_id = create_root_cause_case(
+            self.db_path,
+            title="治具划痕异常",
+            candidate_type="lam_fixture_id",
+            candidate_value="FIX-A01-03",
+            defect_type="划痕",
+            panel_zone="左边缘区",
+            owner="工程师B",
+            created_by="tester",
+            recommendation="点检治具接触面",
+        )
+        update_case_status(self.db_path, case_id=case_id, status="CLOSED", actor="工程师B")
+
+        with self.assertRaises(ValueError):
+            update_case_status(self.db_path, case_id=case_id, status="OPEN", actor="工程师B")
+
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     unittest.main()
     unittest.main()

+ 7 - 0
tests/test_database.py

@@ -1,6 +1,7 @@
 import tempfile
 import tempfile
 import unittest
 import unittest
 from pathlib import Path
 from pathlib import Path
+import sqlite3
 
 
 import pandas as pd
 import pandas as pd
 
 
@@ -76,6 +77,12 @@ class DatabaseTest(unittest.TestCase):
         self.assertEqual(0, second)
         self.assertEqual(0, second)
         self.assertEqual(2, len(load_defects(self.db_path)))
         self.assertEqual(2, len(load_defects(self.db_path)))
 
 
+    def test_foreign_keys_are_enforced(self):
+        init_database(self.db_path)
+
+        with self.assertRaises(sqlite3.IntegrityError):
+            insert_defects(self.db_path, self.df, import_id=999)
+
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     unittest.main()
     unittest.main()

+ 36 - 0
tests/test_production_modules.py

@@ -78,6 +78,13 @@ class ProductionModulesTest(unittest.TestCase):
         self.assertLess(report["traceability_rate"], 1.0)
         self.assertLess(report["traceability_rate"], 1.0)
         self.assertTrue(any("坐标" in issue for issue in report["issues"]))
         self.assertTrue(any("坐标" in issue for issue in report["issues"]))
 
 
+    def test_data_quality_report_handles_missing_columns(self):
+        report = build_data_quality_report(pd.DataFrame({"defect_id": ["D1"]}))
+
+        self.assertLess(report["score"], 100)
+        self.assertEqual(0.0, report["coordinate_valid_rate"])
+        self.assertTrue(report["issues"])
+
     def test_root_cause_module_returns_extended_candidates(self):
     def test_root_cause_module_returns_extended_candidates(self):
         rows = []
         rows = []
         for i in range(10):
         for i in range(10):
@@ -110,6 +117,35 @@ class ProductionModulesTest(unittest.TestCase):
         self.assertEqual("FIX-HOT", candidates.iloc[0]["候选值"])
         self.assertEqual("FIX-HOT", candidates.iloc[0]["候选值"])
         self.assertGreater(candidates.iloc[0]["异常倍数"], 1.0)
         self.assertGreater(candidates.iloc[0]["异常倍数"], 1.0)
 
 
+    def test_root_cause_empty_dimensions_do_not_fallback_to_defaults(self):
+        df = normalize_defect_schema(
+            pd.DataFrame(
+                {
+                    "defect_id": ["D1"],
+                    "panel_id": ["P1"],
+                    "batch_id": ["B1"],
+                    "equipment_id": ["LAM-A01"],
+                    "seat_id": ["R1C1"],
+                    "inspection_station": ["AOI-1"],
+                    "timestamp": [pd.Timestamp("2026-04-01")],
+                    "defect_type": ["划痕"],
+                    "severity": ["严重"],
+                    "x_mm": [10.0],
+                    "y_mm": [20.0],
+                    "panel_width_mm": [155.0],
+                    "panel_height_mm": [340.0],
+                    "hour": [8],
+                    "shift": ["白班"],
+                    "day": ["2026-04-01"],
+                    "lam_fixture_id": ["FIX-1"],
+                }
+            )
+        )
+
+        candidates = build_extended_root_causes(df, dimensions=[])
+
+        self.assertTrue(candidates.empty)
+
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     unittest.main()
     unittest.main()