| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- """SQLite 持久化层。"""
- import json
- import sqlite3
- from pathlib import Path
- import pandas as pd
- from defect_analysis.schemas import TEMPLATE_COLUMNS, normalize_defect_schema
- def _connect(db_path):
- path = Path(db_path)
- path.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(path)
- conn.row_factory = sqlite3.Row
- return conn
- def init_database(db_path):
- """初始化生产级最小数据库结构。"""
- with _connect(db_path) as conn:
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS import_batches (
- import_id INTEGER PRIMARY KEY AUTOINCREMENT,
- source_name TEXT NOT NULL,
- row_count INTEGER NOT NULL,
- status TEXT NOT NULL DEFAULT 'IMPORTED',
- quality_score REAL,
- created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
- )
- """
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS defects (
- defect_id TEXT PRIMARY KEY,
- import_id INTEGER,
- payload_json TEXT NOT NULL,
- created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
- FOREIGN KEY(import_id) REFERENCES import_batches(import_id)
- )
- """
- )
- conn.execute("CREATE INDEX IF NOT EXISTS idx_defects_import_id ON defects(import_id)")
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS root_cause_cases (
- case_id INTEGER PRIMARY KEY AUTOINCREMENT,
- title TEXT NOT NULL,
- status TEXT NOT NULL DEFAULT 'OPEN',
- candidate_type TEXT NOT NULL,
- candidate_value TEXT NOT NULL,
- defect_type TEXT,
- panel_zone TEXT,
- owner TEXT,
- recommendation TEXT,
- created_by TEXT,
- created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
- updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
- closed_at TEXT
- )
- """
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS audit_logs (
- audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
- entity_type TEXT NOT NULL,
- entity_id INTEGER NOT NULL,
- action TEXT NOT NULL,
- actor TEXT,
- details TEXT,
- created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
- )
- """
- )
- conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_entity ON audit_logs(entity_type, entity_id)")
- def create_import_batch(db_path, *, source_name, row_count, quality_score=None, status="IMPORTED"):
- """创建导入批次记录,返回 import_id。"""
- with _connect(db_path) as conn:
- cursor = conn.execute(
- """
- INSERT INTO import_batches (source_name, row_count, quality_score, status)
- VALUES (?, ?, ?, ?)
- """,
- (source_name, int(row_count), quality_score, status),
- )
- return int(cursor.lastrowid)
- def list_import_batches(db_path):
- """列出导入批次。"""
- init_database(db_path)
- with _connect(db_path) as conn:
- rows = conn.execute(
- """
- SELECT import_id, source_name, row_count, status, quality_score, created_at
- FROM import_batches
- ORDER BY import_id
- """
- ).fetchall()
- return pd.DataFrame([dict(row) for row in rows])
- def insert_defects(db_path, df, *, import_id=None):
- """幂等写入缺陷记录,按 defect_id 去重。"""
- init_database(db_path)
- normalized = normalize_defect_schema(df)
- inserted = 0
- with _connect(db_path) as conn:
- for _, row in normalized.iterrows():
- payload = row.to_dict()
- payload["timestamp"] = str(payload.get("timestamp", ""))
- cursor = conn.execute(
- """
- INSERT OR IGNORE INTO defects (defect_id, import_id, payload_json)
- VALUES (?, ?, ?)
- """,
- (str(row["defect_id"]), import_id, json.dumps(payload, ensure_ascii=False, default=str)),
- )
- inserted += cursor.rowcount
- return inserted
- def load_defects(db_path, *, import_id=None):
- """读取缺陷记录为标准化 DataFrame。"""
- init_database(db_path)
- params = []
- where = ""
- if import_id is not None:
- where = "WHERE import_id = ?"
- params.append(import_id)
- with _connect(db_path) as conn:
- rows = conn.execute(
- f"SELECT import_id, payload_json FROM defects {where} ORDER BY defect_id",
- params,
- ).fetchall()
- records = []
- for row in rows:
- payload = json.loads(row["payload_json"])
- payload["import_id"] = row["import_id"]
- records.append(payload)
- if not records:
- return pd.DataFrame(columns=TEMPLATE_COLUMNS + ["import_id"])
- loaded = normalize_defect_schema(pd.DataFrame(records))
- if "timestamp" in loaded.columns:
- loaded["timestamp"] = pd.to_datetime(loaded["timestamp"])
- return loaded
|