| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- """统一 ML 推理入口。"""
- import pandas as pd
- from defect_analysis.ml.datasets import build_supervised_dataset
- from defect_analysis.ml.key_factors import find_key_factors
- from defect_analysis.ml.tabular_models import train_tabular_model
- def predict_key_factors(df, *, target_defect_type=None, target_severity=None, model_name="random_forest", top_n=20):
- """用统计关键因子 + 监督模型概率输出候选关键因子。"""
- key_factors = find_key_factors(
- df,
- target_defect_type=target_defect_type,
- target_severity=target_severity,
- top_n=top_n,
- )
- if key_factors.empty:
- return key_factors
- X, y = build_supervised_dataset(
- df,
- target_defect_type=target_defect_type,
- target_severity=target_severity,
- )
- if y.nunique() < 2:
- key_factors["ml_probability"] = 0.0
- key_factors["model_name"] = model_name
- return key_factors
- trained = train_tabular_model(model_name, X, y)
- model = trained["model"]
- probabilities = pd.Series(model.predict_proba(X)[:, 1], index=X.index)
- scored = key_factors.copy()
- ml_scores = []
- for _, row in scored.iterrows():
- dimension = row["维度"]
- value = row["因子值"]
- column = f"{dimension}={value}"
- if column in X.columns:
- mask = X[column] == 1
- ml_scores.append(float(probabilities.loc[mask].mean()) if mask.any() else 0.0)
- else:
- ml_scores.append(0.0)
- scored["ml_probability"] = ml_scores
- scored["model_name"] = model_name
- return scored.sort_values(["ml_probability", "关键因子得分"], ascending=False).reset_index(drop=True)
|