database.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. """SQLite 持久化层。"""
  2. import json
  3. import sqlite3
  4. from contextlib import closing
  5. from pathlib import Path
  6. import pandas as pd
  7. from defect_analysis.schemas import TEMPLATE_COLUMNS, normalize_defect_schema
  8. def _connect(db_path):
  9. path = Path(db_path)
  10. path.parent.mkdir(parents=True, exist_ok=True)
  11. conn = sqlite3.connect(path)
  12. conn.execute("PRAGMA foreign_keys = ON")
  13. conn.row_factory = sqlite3.Row
  14. return conn
  15. def init_database(db_path):
  16. """初始化生产级最小数据库结构。"""
  17. with closing(_connect(db_path)) as conn:
  18. conn.execute(
  19. """
  20. CREATE TABLE IF NOT EXISTS import_batches (
  21. import_id INTEGER PRIMARY KEY AUTOINCREMENT,
  22. source_name TEXT NOT NULL,
  23. row_count INTEGER NOT NULL,
  24. status TEXT NOT NULL DEFAULT 'IMPORTED',
  25. quality_score REAL,
  26. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
  27. )
  28. """
  29. )
  30. conn.execute(
  31. """
  32. CREATE TABLE IF NOT EXISTS defects (
  33. defect_id TEXT PRIMARY KEY,
  34. import_id INTEGER,
  35. payload_json TEXT NOT NULL,
  36. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  37. FOREIGN KEY(import_id) REFERENCES import_batches(import_id)
  38. )
  39. """
  40. )
  41. conn.execute("CREATE INDEX IF NOT EXISTS idx_defects_import_id ON defects(import_id)")
  42. conn.execute(
  43. """
  44. CREATE TABLE IF NOT EXISTS root_cause_cases (
  45. case_id INTEGER PRIMARY KEY AUTOINCREMENT,
  46. title TEXT NOT NULL,
  47. status TEXT NOT NULL DEFAULT 'OPEN',
  48. candidate_type TEXT NOT NULL,
  49. candidate_value TEXT NOT NULL,
  50. defect_type TEXT,
  51. panel_zone TEXT,
  52. owner TEXT,
  53. recommendation TEXT,
  54. created_by TEXT,
  55. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  56. updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  57. closed_at TEXT
  58. )
  59. """
  60. )
  61. conn.execute(
  62. """
  63. CREATE TABLE IF NOT EXISTS audit_logs (
  64. audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
  65. entity_type TEXT NOT NULL,
  66. entity_id INTEGER NOT NULL,
  67. action TEXT NOT NULL,
  68. actor TEXT,
  69. details TEXT,
  70. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
  71. )
  72. """
  73. )
  74. conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)")
  75. conn.commit()
  76. def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"):
  77. """创建导入批次记录,返回 import_id。"""
  78. with closing(_connect(db_path)) as conn:
  79. cursor = conn.execute(
  80. """
  81. INSERT INTO import_batches (source_name, row_count, quality_score, status)
  82. VALUES (?, ?, ?, ?)
  83. """,
  84. (source_name, int(row_count), quality_score, status),
  85. )
  86. import_id = int(cursor.lastrowid)
  87. conn.commit()
  88. return import_id
  89. def list_import_batches(db_path):
  90. """列出导入批次。"""
  91. init_database(db_path)
  92. with closing(_connect(db_path)) as conn:
  93. rows = conn.execute(
  94. """
  95. SELECT import_id, source_name, row_count, status, quality_score, created_at
  96. FROM import_batches
  97. ORDER BY import_id
  98. """
  99. ).fetchall()
  100. return pd.DataFrame([dict(row) for row in rows])
  101. def insert_defects(db_path, df, *, import_id=None):
  102. """幂等写入缺陷记录,按 defect_id 去重。"""
  103. init_database(db_path)
  104. normalized = normalize_defect_schema(df)
  105. records = []
  106. for _, row in normalized.iterrows():
  107. payload = row.to_dict()
  108. payload["timestamp"] = str(payload.get("timestamp", ""))
  109. records.append(
  110. (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str))
  111. )
  112. with closing(_connect(db_path)) as conn:
  113. before = conn.total_changes
  114. conn.executemany(
  115. """
  116. INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json)
  117. VALUES (?, ?, ?)
  118. """,
  119. records,
  120. )
  121. inserted = conn.total_changes - before
  122. conn.commit()
  123. return inserted
  124. def load_defects(db_path, *, import_id=None):
  125. """读取缺陷记录为标准化 DataFrame。"""
  126. init_database(db_path)
  127. params = []
  128. where = ""
  129. if import_id is not None:
  130. where = "WHERE import_id = ?"
  131. params.append(import_id)
  132. with closing(_connect(db_path)) as conn:
  133. rows = conn.execute(
  134. f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id",
  135. params,
  136. ).fetchall()
  137. records = []
  138. for row in rows:
  139. payload = json.loads(row["payload_json"])
  140. payload["import_id"] = row["import_id"]
  141. records.append(payload)
  142. if not records:
  143. return pd.DataFrame(columns=TEMPLATE_COLUMNS + ["import_id"])
  144. loaded = normalize_defect_schema(pd.DataFrame(records))
  145. if "timestamp" in loaded.columns:
  146. loaded["timestamp"] = pd.to_datetime(loaded["timestamp"])
  147. return loaded