"""可持久化的 ML 模型包。""" from datetime import datetime, timezone import joblib import pandas as pd from defect_analysis.ml.datasets import build_supervised_dataset from defect_analysis.ml.features import build_feature_frame from defect_analysis.ml.model_registry import detect_optional_model_backends from defect_analysis.ml.tabular_models import train_tabular_model from defect_analysis.schemas import normalize_defect_schema MODEL_BUNDLE_VERSION = 1 def _target_config(target_defect_type=None, target_severity=None): return { "defect_type": target_defect_type, "severity": target_severity, "default": target_defect_type is None and target_severity is None, } def _align_features(features, feature_columns): """按训练时特征签名对齐新数据,避免 one-hot 列漂移导致推理失败。""" aligned = features.reindex(columns=feature_columns, fill_value=0.0) return aligned.astype(float) def create_model_bundle( df, *, model_name="random_forest", target_defect_type=None, target_severity=None, random_state=42, ): """训练并创建可保存的模型包。""" normalized = normalize_defect_schema(df) X, y = build_supervised_dataset( normalized, target_defect_type=target_defect_type, target_severity=target_severity, ) if y.nunique() < 2: raise ValueError("目标标签只有一个类别,无法训练监督模型") trained = train_tabular_model(model_name, X, y, random_state=random_state) return { "bundle_version": MODEL_BUNDLE_VERSION, "created_at": datetime.now(timezone.utc).isoformat(), "model_name": model_name, "target": _target_config(target_defect_type, target_severity), "feature_columns": list(X.columns), "metrics": trained.get("metrics", {}), "optional_backends": detect_optional_model_backends(), "model": trained["model"], } def save_model_bundle(bundle, path): """保存模型包。""" joblib.dump(bundle, path) return path def load_model_bundle(path): """加载模型包。""" bundle = joblib.load(path) if bundle.get("bundle_version") != MODEL_BUNDLE_VERSION: raise ValueError("模型包版本不兼容") return bundle def predict_with_bundle(bundle, df): """使用模型包对新数据打分。""" normalized = normalize_defect_schema(df).reset_index(drop=True) features = build_feature_frame(normalized) X = _align_features(features, bundle["feature_columns"]) model = bundle["model"] scored = normalized.copy() scored["ml_prediction"] = model.predict(X) if hasattr(model, "predict_proba"): scored["ml_probability"] = model.predict_proba(X)[:, 1] else: scored["ml_probability"] = pd.NA scored["model_name"] = bundle["model_name"] return scored