cases.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """异常 Case 闭环与审计日志。"""
  2. import sqlite3
  3. from contextlib import closing
  4. from pathlib import Path
  5. import pandas as pd
  6. from defect_analysis.database import init_database
  7. VALID_CASE_STATUSES = {"OPEN", "IN_PROGRESS", "IMPROVED", "CLOSED", "REJECTED"}
  8. VALID_CASE_TRANSITIONS = {
  9. "OPEN": {"IN_PROGRESS", "CLOSED", "REJECTED"},
  10. "IN_PROGRESS": {"IMPROVED", "CLOSED", "REJECTED"},
  11. "IMPROVED": {"CLOSED", "IN_PROGRESS"},
  12. "CLOSED": set(),
  13. "REJECTED": set(),
  14. }
  15. def _connect(db_path):
  16. conn = sqlite3.connect(Path(db_path))
  17. conn.execute("PRAGMA foreign_keys = ON")
  18. conn.row_factory = sqlite3.Row
  19. return conn
  20. def _write_audit_log(conn, *, entity_type, entity_id, action, actor, details):
  21. conn.execute(
  22. """
  23. INSERT INTO audit_logs (entity_type, entity_id, action, actor, details)
  24. VALUES (?, ?, ?, ?, ?)
  25. """,
  26. (entity_type, int(entity_id), action, actor, details),
  27. )
  28. def create_root_cause_case(
  29. db_path,
  30. *,
  31. title,
  32. candidate_type,
  33. candidate_value,
  34. defect_type,
  35. panel_zone,
  36. owner,
  37. created_by,
  38. recommendation,
  39. ):
  40. """从根因候选创建异常 Case。"""
  41. init_database(db_path)
  42. with closing(_connect(db_path)) as conn:
  43. cursor = conn.execute(
  44. """
  45. INSERT INTO root_cause_cases (
  46. title, status, candidate_type, candidate_value, defect_type,
  47. panel_zone, owner, recommendation, created_by
  48. )
  49. VALUES (?, 'OPEN', ?, ?, ?, ?, ?, ?, ?)
  50. """,
  51. (
  52. title,
  53. candidate_type,
  54. candidate_value,
  55. defect_type,
  56. panel_zone,
  57. owner,
  58. recommendation,
  59. created_by,
  60. ),
  61. )
  62. case_id = int(cursor.lastrowid)
  63. _write_audit_log(
  64. conn,
  65. entity_type="case",
  66. entity_id=case_id,
  67. action="CREATE_CASE",
  68. actor=created_by,
  69. details=f"创建 Case: {title}; 建议: {recommendation}",
  70. )
  71. conn.commit()
  72. return case_id
  73. def update_case_status(db_path, *, case_id, status, actor, note=""):
  74. """更新 Case 状态并记录审计日志。"""
  75. if status not in VALID_CASE_STATUSES:
  76. raise ValueError(f"无效 Case 状态: {status}")
  77. init_database(db_path)
  78. with closing(_connect(db_path)) as conn:
  79. current = conn.execute(
  80. "SELECT status FROM root_cause_cases WHERE case_id = ?",
  81. (int(case_id),),
  82. ).fetchone()
  83. if current is None:
  84. raise ValueError(f"未找到 Case: {case_id}")
  85. current_status = current["status"]
  86. if status not in VALID_CASE_TRANSITIONS.get(current_status, set()):
  87. raise ValueError(f"不允许的 Case 状态流转: {current_status} -> {status}")
  88. closed_at = pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S") if status == "CLOSED" else None
  89. conn.execute(
  90. """
  91. UPDATE root_cause_cases
  92. SET status = ?,
  93. updated_at = CURRENT_TIMESTAMP,
  94. closed_at = COALESCE(?, closed_at)
  95. WHERE case_id = ?
  96. """,
  97. (status, closed_at, int(case_id)),
  98. )
  99. _write_audit_log(
  100. conn,
  101. entity_type="case",
  102. entity_id=case_id,
  103. action="UPDATE_STATUS",
  104. actor=actor,
  105. details=f"{current_status} -> {status}; {note}",
  106. )
  107. conn.commit()
  108. def list_cases(db_path, *, status=None):
  109. """列出异常 Case。"""
  110. init_database(db_path)
  111. params = []
  112. where = ""
  113. if status is not None:
  114. where = "WHERE status = ?"
  115. params.append(status)
  116. with closing(_connect(db_path)) as conn:
  117. rows = conn.execute(
  118. f"""
  119. SELECT case_id, title, status, candidate_type, candidate_value,
  120. defect_type, panel_zone, owner, recommendation,
  121. created_by, created_at, updated_at, closed_at
  122. FROM root_cause_cases
  123. {where}
  124. ORDER BY case_id
  125. """,
  126. params,
  127. ).fetchall()
  128. return pd.DataFrame([dict(row) for row in rows])
  129. def get_audit_logs(db_path, *, entity_type=None, entity_id=None):
  130. """读取审计日志。"""
  131. init_database(db_path)
  132. clauses = []
  133. params = []
  134. if entity_type is not None:
  135. clauses.append("entity_type = ?")
  136. params.append(entity_type)
  137. if entity_id is not None:
  138. clauses.append("entity_id = ?")
  139. params.append(int(entity_id))
  140. where = "WHERE " + " AND ".join(clauses) if clauses else ""
  141. with closing(_connect(db_path)) as conn:
  142. rows = conn.execute(
  143. f"""
  144. SELECT audit_id, entity_type, entity_id, action, actor, details, created_at
  145. FROM audit_logs
  146. {where}
  147. ORDER BY audit_id
  148. """,
  149. params,
  150. ).fetchall()
  151. return pd.DataFrame([dict(row) for row in rows])