| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- """可持久化的 ML 模型包。"""
- from datetime import datetime, timezone
- import joblib
- import pandas as pd
- from defect_analysis.ml.datasets import build_supervised_dataset
- from defect_analysis.ml.features import build_feature_frame
- from defect_analysis.ml.model_registry import detect_optional_model_backends
- from defect_analysis.ml.tabular_models import train_tabular_model
- from defect_analysis.schemas import normalize_defect_schema
- MODEL_BUNDLE_VERSION = 1
- def _target_config(target_defect_type=None, target_severity=None):
- return {
- "defect_type": target_defect_type,
- "severity": target_severity,
- "default": target_defect_type is None and target_severity is None,
- }
- def _align_features(features, feature_columns):
- """按训练时特征签名对齐新数据,避免 one-hot 列漂移导致推理失败。"""
- aligned = features.reindex(columns=feature_columns, fill_value=0.0)
- return aligned.astype(float)
- def create_model_bundle(
- df,
- *,
- model_name="random_forest",
- target_defect_type=None,
- target_severity=None,
- random_state=42,
- ):
- """训练并创建可保存的模型包。"""
- normalized = normalize_defect_schema(df)
- X, y = build_supervised_dataset(
- normalized,
- target_defect_type=target_defect_type,
- target_severity=target_severity,
- )
- if y.nunique() < 2:
- raise ValueError("目标标签只有一个类别,无法训练监督模型")
- trained = train_tabular_model(model_name, X, y, random_state=random_state)
- return {
- "bundle_version": MODEL_BUNDLE_VERSION,
- "created_at": datetime.now(timezone.utc).isoformat(),
- "model_name": model_name,
- "target": _target_config(target_defect_type, target_severity),
- "feature_columns": list(X.columns),
- "metrics": trained.get("metrics", {}),
- "optional_backends": detect_optional_model_backends(),
- "model": trained["model"],
- }
- def save_model_bundle(bundle, path):
- """保存模型包。"""
- joblib.dump(bundle, path)
- return path
- def load_model_bundle(path):
- """加载模型包。"""
- bundle = joblib.load(path)
- if bundle.get("bundle_version") != MODEL_BUNDLE_VERSION:
- raise ValueError("模型包版本不兼容")
- return bundle
- def predict_with_bundle(bundle, df):
- """使用模型包对新数据打分。"""
- normalized = normalize_defect_schema(df).reset_index(drop=True)
- features = build_feature_frame(normalized)
- X = _align_features(features, bundle["feature_columns"])
- model = bundle["model"]
- scored = normalized.copy()
- scored["ml_prediction"] = model.predict(X)
- if hasattr(model, "predict_proba"):
- scored["ml_probability"] = model.predict_proba(X)[:, 1]
- else:
- scored["ml_probability"] = pd.NA
- scored["model_name"] = bundle["model_name"]
- return scored
|