"""可持久化的 ML 模型包。""" import warnings from datetime import datetime, timezone import joblib import pandas as pd from sklearn.model_selection import train_test_split from defect_analysis.ml.datasets import build_supervised_dataset from defect_analysis.ml.features import build_feature_frame from defect_analysis.ml.model_registry import detect_optional_model_backends from defect_analysis.ml.tabular_models import classification_metrics, extract_feature_importance, train_tabular_model from defect_analysis.schemas import normalize_defect_schema MODEL_BUNDLE_VERSION = 1 def _target_config(target_defect_type=None, target_severity=None): return { "defect_type": target_defect_type, "severity": target_severity, "default": target_defect_type is None and target_severity is None, } def _align_features(features, feature_columns): """按训练时特征签名对齐新数据,避免 one-hot 列漂移导致推理失败。""" aligned = features.reindex(columns=feature_columns, fill_value=0.0) return aligned.astype(float) def create_model_bundle( df, *, model_name="random_forest", target_defect_type=None, target_severity=None, random_state=42, test_size=0.25, ): """训练并创建可保存的模型包。""" normalized = normalize_defect_schema(df) X, y = build_supervised_dataset( normalized, target_defect_type=target_defect_type, target_severity=target_severity, ) if y.nunique() < 2: raise ValueError("目标标签只有一个类别,无法训练监督模型") min_count = int(y.value_counts().min()) if min_count < 2: warnings.warn( f"最小类别仅 {min_count} 个样本,已关闭分层抽样。验证集可能不包含少数类别。", UserWarning, ) stratify = y if min_count >= 2 else None X_train, X_valid, y_train, y_valid = train_test_split( X, y, test_size=test_size, random_state=random_state, stratify=stratify, ) validation_model = train_tabular_model(model_name, X_train, y_train, random_state=random_state)["model"] validation_metrics = classification_metrics(validation_model, X_valid, y_valid, prefix="validation") trained = train_tabular_model(model_name, X, y, random_state=random_state) feature_importance = extract_feature_importance(trained["model"], X.columns) return { "bundle_version": MODEL_BUNDLE_VERSION, "created_at": datetime.now(timezone.utc).isoformat(), "model_name": model_name, "target": _target_config(target_defect_type, target_severity), "feature_columns": list(X.columns), "metrics": trained.get("metrics", {}), "validation_metrics": validation_metrics, "feature_importance": feature_importance, "optional_backends": detect_optional_model_backends(), "model": trained["model"], } def save_model_bundle(bundle, path): """保存模型包。""" joblib.dump(bundle, path) return path def load_model_bundle(path): """加载模型包。""" bundle = joblib.load(path) if bundle.get("bundle_version") != MODEL_BUNDLE_VERSION: raise ValueError("模型包版本不兼容") return bundle def predict_with_bundle(bundle, df): """使用模型包对新数据打分。""" normalized = normalize_defect_schema(df).reset_index(drop=True) features = build_feature_frame(normalized) X = _align_features(features, bundle["feature_columns"]) model = bundle["model"] scored = normalized.copy() scored["ml_prediction"] = model.predict(X) if hasattr(model, "predict_proba"): scored["ml_probability"] = model.predict_proba(X)[:, 1] else: scored["ml_probability"] = pd.NA scored["model_name"] = bundle["model_name"] return scored