"""统一 ML 推理入口。""" import pandas as pd from defect_analysis.ml.datasets import build_supervised_dataset from defect_analysis.ml.key_factors import find_key_factors from defect_analysis.ml.tabular_models import train_tabular_model def predict_key_factors(df, *, target_defect_type=None, target_severity=None, model_name="random_forest", top_n=20): """用统计关键因子 + 监督模型概率输出候选关键因子。""" key_factors = find_key_factors( df, target_defect_type=target_defect_type, target_severity=target_severity, top_n=top_n, ) if key_factors.empty: return key_factors X, y = build_supervised_dataset( df, target_defect_type=target_defect_type, target_severity=target_severity, ) if y.nunique() < 2: key_factors["ml_probability"] = 0.0 key_factors["model_name"] = model_name return key_factors trained = train_tabular_model(model_name, X, y) model = trained["model"] probabilities = pd.Series(model.predict_proba(X)[:, 1], index=X.index) scored = key_factors.copy() # 向量化:把 key_factors 的维度/因子值映射为 one-hot 列名后取概率均值 dimension = scored["维度"].astype(str) value = scored["因子值"].astype(str) column_names = dimension + "=" + value ml_scores = [] for col in column_names: if col in X.columns: ml_scores.append(float(probabilities.loc[X[col] == 1].mean()) if X[col].any() else 0.0) else: ml_scores.append(0.0) scored["ml_probability"] = ml_scores scored["model_name"] = model_name return scored.sort_values(["ml_probability", "关键因子得分"], ascending=False).reset_index(drop=True)