"""SQLite 持久化层。""" import json import sqlite3 from contextlib import closing from pathlib import Path import pandas as pd from defect_analysis.schemas import TEMPLATE_COLUMNS, normalize_defect_schema def _connect(db_path): path = Path(db_path) path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path) conn.execute("PRAGMA foreign_keys = ON") conn.row_factory = sqlite3.Row return conn def init_database(db_path): """初始化生产级最小数据库结构。""" with closing(_connect(db_path)) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS import_batches ( import_id INTEGER PRIMARY KEY AUTOINCREMENT, source_name TEXT NOT NULL, row_count INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'IMPORTED', quality_score REAL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS defects ( defect_id TEXT PRIMARY KEY, import_id INTEGER, payload_json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(import_id) REFERENCES import_batches(import_id) ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_defects_import_id ON defects(import_id)") conn.execute( """ CREATE TABLE IF NOT EXISTS root_cause_cases ( case_id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'OPEN', candidate_type TEXT NOT NULL, candidate_value TEXT NOT NULL, defect_type TEXT, panel_zone TEXT, owner TEXT, recommendation TEXT, created_by TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, closed_at TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS audit_logs ( audit_id INTEGER PRIMARY KEY AUTOINCREMENT, entity_type TEXT NOT NULL, entity_id INTEGER NOT NULL, action TEXT NOT NULL, actor TEXT, details TEXT, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)") conn.commit() def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"): """创建导入批次记录,返回 import_id。""" with closing(_connect(db_path)) as conn: cursor = conn.execute( """ INSERT INTO import_batches (source_name, row_count, quality_score, status) VALUES (?, ?, ?, ?) """, (source_name, int(row_count), quality_score, status), ) import_id = int(cursor.lastrowid) conn.commit() return import_id def list_import_batches(db_path): """列出导入批次。""" init_database(db_path) with closing(_connect(db_path)) as conn: rows = conn.execute( """ SELECT import_id, source_name, row_count, status, quality_score, created_at FROM import_batches ORDER BY import_id """ ).fetchall() return pd.DataFrame([dict(row) for row in rows]) def insert_defects(db_path, df, *, import_id=None): """幂等写入缺陷记录,按 defect_id 去重。""" init_database(db_path) normalized = normalize_defect_schema(df) records = [] for _, row in normalized.iterrows(): payload = row.to_dict() payload["timestamp"] = str(payload.get("timestamp", "")) records.append( (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str)) ) with closing(_connect(db_path)) as conn: before = conn.total_changes conn.executemany( """ INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json) VALUES (?, ?, ?) """, records, ) inserted = conn.total_changes - before conn.commit() return inserted def load_defects(db_path, *, import_id=None): """读取缺陷记录为标准化 DataFrame。""" init_database(db_path) params = [] where = "" if import_id is not None: where = "WHERE import_id = ?" params.append(import_id) with closing(_connect(db_path)) as conn: rows = conn.execute( f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id", params, ).fetchall() records = [] for row in rows: payload = json.loads(row["payload_json"]) payload["import_id"] = row["import_id"] records.append(payload) if not records: return pd.DataFrame(columns=TEMPLATE_COLUMNS + ["import_id"]) loaded = normalize_defect_schema(pd.DataFrame(records)) if "timestamp" in loaded.columns: loaded["timestamp"] = pd.to_datetime(loaded["timestamp"]) return loaded