database.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """SQLite 持久化层。"""
  2. import json
  3. import sqlite3
  4. from pathlib import Path
  5. import pandas as pd
  6. from defect_analysis.schemas import TEMPLATE_COLUMNS, normalize_defect_schema
  7. def _connect(db_path):
  8. path = Path(db_path)
  9. path.parent.mkdir(parents=True, exist_ok=True)
  10. conn = sqlite3.connect(path)
  11. conn.row_factory = sqlite3.Row
  12. return conn
  13. def init_database(db_path):
  14. """初始化生产级最小数据库结构。"""
  15. with _connect(db_path) as conn:
  16. conn.execute(
  17. """
  18. CREATE TABLE IF NOT EXISTS import_batches (
  19. import_id INTEGER PRIMARY KEY AUTOINCREMENT,
  20. source_name TEXT NOT NULL,
  21. row_count INTEGER NOT NULL,
  22. status TEXT NOT NULL DEFAULT 'IMPORTED',
  23. quality_score REAL,
  24. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
  25. )
  26. """
  27. )
  28. conn.execute(
  29. """
  30. CREATE TABLE IF NOT EXISTS defects (
  31. defect_id TEXT PRIMARY KEY,
  32. import_id INTEGER,
  33. payload_json TEXT NOT NULL,
  34. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  35. FOREIGN KEY(import_id) REFERENCES import_batches(import_id)
  36. )
  37. """
  38. )
  39. conn.execute("CREATE INDEX IF NOT EXISTS idx_defects_import_id ON defects(import_id)")
  40. conn.execute(
  41. """
  42. CREATE TABLE IF NOT EXISTS root_cause_cases (
  43. case_id INTEGER PRIMARY KEY AUTOINCREMENT,
  44. title TEXT NOT NULL,
  45. status TEXT NOT NULL DEFAULT 'OPEN',
  46. candidate_type TEXT NOT NULL,
  47. candidate_value TEXT NOT NULL,
  48. defect_type TEXT,
  49. panel_zone TEXT,
  50. owner TEXT,
  51. recommendation TEXT,
  52. created_by TEXT,
  53. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  54. updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
  55. closed_at TEXT
  56. )
  57. """
  58. )
  59. conn.execute(
  60. """
  61. CREATE TABLE IF NOT EXISTS audit_logs (
  62. audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
  63. entity_type TEXT NOT NULL,
  64. entity_id INTEGER NOT NULL,
  65. action TEXT NOT NULL,
  66. actor TEXT,
  67. details TEXT,
  68. created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
  69. )
  70. """
  71. )
  72. conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)")
  73. def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"):
  74. """创建导入批次记录,返回 import_id。"""
  75. with _connect(db_path) as conn:
  76. cursor = conn.execute(
  77. """
  78. INSERT INTO import_batches (source_name, row_count, quality_score, status)
  79. VALUES (?, ?, ?, ?)
  80. """,
  81. (source_name, int(row_count), quality_score, status),
  82. )
  83. return int(cursor.lastrowid)
  84. def list_import_batches(db_path):
  85. """列出导入批次。"""
  86. init_database(db_path)
  87. with _connect(db_path) as conn:
  88. rows = conn.execute(
  89. """
  90. SELECT import_id, source_name, row_count, status, quality_score, created_at
  91. FROM import_batches
  92. ORDER BY import_id
  93. """
  94. ).fetchall()
  95. return pd.DataFrame([dict(row) for row in rows])
  96. def insert_defects(db_path, df, *, import_id=None):
  97. """幂等写入缺陷记录,按 defect_id 去重。"""
  98. init_database(db_path)
  99. normalized = normalize_defect_schema(df)
  100. inserted = 0
  101. with _connect(db_path) as conn:
  102. for _, row in normalized.iterrows():
  103. payload = row.to_dict()
  104. payload["timestamp"] = str(payload.get("timestamp", ""))
  105. cursor = conn.execute(
  106. """
  107. INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json)
  108. VALUES (?, ?, ?)
  109. """,
  110. (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str)),
  111. )
  112. inserted += cursor.rowcount
  113. return inserted
  114. def load_defects(db_path, *, import_id=None):
  115. """读取缺陷记录为标准化 DataFrame。"""
  116. init_database(db_path)
  117. params = []
  118. where = ""
  119. if import_id is not None:
  120. where = "WHERE import_id = ?"
  121. params.append(import_id)
  122. with _connect(db_path) as conn:
  123. rows = conn.execute(
  124. f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id",
  125. params,
  126. ).fetchall()
  127. records = []
  128. for row in rows:
  129. payload = json.loads(row["payload_json"])
  130. payload["import_id"] = row["import_id"]
  131. records.append(payload)
  132. if not records:
  133. return pd.DataFrame(columns=TEMPLATE_COLUMNS + ["import_id"])
  134. loaded = normalize_defect_schema(pd.DataFrame(records))
  135. if "timestamp" in loaded.columns:
  136. loaded["timestamp"] = pd.to_datetime(loaded["timestamp"])
  137. return loaded