"""异常 Case 闭环与审计日志。""" import sqlite3 from contextlib import closing from pathlib import Path import pandas as pd from defect_analysis.database import init_database VALID_CASE_STATUSES = {"OPEN", "IN_PROGRESS", "IMPROVED", "CLOSED", "REJECTED"} VALID_CASE_TRANSITIONS = { "OPEN": {"IN_PROGRESS", "CLOSED", "REJECTED"}, "IN_PROGRESS": {"IMPROVED", "CLOSED", "REJECTED"}, "IMPROVED": {"CLOSED", "IN_PROGRESS"}, "CLOSED": set(), "REJECTED": set(), } def _connect(db_path): conn = sqlite3.connect(Path(db_path)) conn.execute("PRAGMA foreign_keys = ON") conn.row_factory = sqlite3.Row return conn def _write_audit_log(conn, *, entity_type, entity_id, action, actor, details): conn.execute( """ INSERT INTO audit_logs (entity_type, entity_id, action, actor, details) VALUES (?, ?, ?, ?, ?) """, (entity_type, int(entity_id), action, actor, details), ) def create_root_cause_case( db_path, *, title, candidate_type, candidate_value, defect_type, panel_zone, owner, created_by, recommendation, ): """从根因候选创建异常 Case。""" init_database(db_path) with closing(_connect(db_path)) as conn: cursor = conn.execute( """ INSERT INTO root_cause_cases ( title, status, candidate_type, candidate_value, defect_type, panel_zone, owner, recommendation, created_by ) VALUES (?, 'OPEN', ?, ?, ?, ?, ?, ?, ?) """, ( title, candidate_type, candidate_value, defect_type, panel_zone, owner, recommendation, created_by, ), ) case_id = int(cursor.lastrowid) _write_audit_log( conn, entity_type="case", entity_id=case_id, action="CREATE_CASE", actor=created_by, details=f"创建 Case: {title}; 建议: {recommendation}", ) conn.commit() return case_id def update_case_status(db_path, *, case_id, status, actor, note=""): """更新 Case 状态并记录审计日志。""" if status not in VALID_CASE_STATUSES: raise ValueError(f"无效 Case 状态: {status}") init_database(db_path) with closing(_connect(db_path)) as conn: current = conn.execute( "SELECT status FROM root_cause_cases WHERE case_id = ?", (int(case_id),), ).fetchone() if current is None: raise ValueError(f"未找到 Case: {case_id}") current_status = current["status"] if status not in VALID_CASE_TRANSITIONS.get(current_status, set()): raise ValueError(f"不允许的 Case 状态流转: {current_status} -> {status}") closed_at = pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S") if status == "CLOSED" else None conn.execute( """ UPDATE root_cause_cases SET status = ?, updated_at = CURRENT_TIMESTAMP, closed_at = COALESCE(?, closed_at) WHERE case_id = ? """, (status, closed_at, int(case_id)), ) _write_audit_log( conn, entity_type="case", entity_id=case_id, action="UPDATE_STATUS", actor=actor, details=f"{current_status} -> {status}; {note}", ) conn.commit() def list_cases(db_path, *, status=None): """列出异常 Case。""" init_database(db_path) params = [] where = "" if status is not None: where = "WHERE status = ?" params.append(status) with closing(_connect(db_path)) as conn: rows = conn.execute( f""" SELECT case_id, title, status, candidate_type, candidate_value, defect_type, panel_zone, owner, recommendation, created_by, created_at, updated_at, closed_at FROM root_cause_cases {where} ORDER BY case_id """, params, ).fetchall() return pd.DataFrame([dict(row) for row in rows]) def get_audit_logs(db_path, *, entity_type=None, entity_id=None): """读取审计日志。""" init_database(db_path) clauses = [] params = [] if entity_type is not None: clauses.append("entity_type = ?") params.append(entity_type) if entity_id is not None: clauses.append("entity_id = ?") params.append(int(entity_id)) where = "WHERE " + " AND ".join(clauses) if clauses else "" with closing(_connect(db_path)) as conn: rows = conn.execute( f""" SELECT audit_id, entity_type, entity_id, action, actor, details, created_at FROM audit_logs {where} ORDER BY audit_id """, params, ).fetchall() return pd.DataFrame([dict(row) for row in rows])