"""统一 ML 推理入口。""" import pandas as pd from defect_analysis.ml.datasets import build_supervised_dataset from defect_analysis.ml.key_factors import find_key_factors from defect_analysis.ml.tabular_models import train_tabular_model def predict_key_factors(df, *, target_defect_type=None, target_severity=None, model_name="random_forest", top_n=20): """用统计关键因子 + 监督模型概率输出候选关键因子。""" key_factors = find_key_factors( df, target_defect_type=target_defect_type, target_severity=target_severity, top_n=top_n, ) if key_factors.empty: return key_factors X, y = build_supervised_dataset( df, target_defect_type=target_defect_type, target_severity=target_severity, ) if y.nunique() < 2: key_factors["ml_probability"] = 0.0 key_factors["model_name"] = model_name return key_factors trained = train_tabular_model(model_name, X, y) model = trained["model"] probabilities = pd.Series(model.predict_proba(X)[:, 1], index=X.index) scored = key_factors.copy() ml_scores = [] for _, row in scored.iterrows(): dimension = row["维度"] value = row["因子值"] column = f"{dimension}={value}" if column in X.columns: mask = X[column] == 1 ml_scores.append(float(probabilities.loc[mask].mean()) if mask.any() else 0.0) else: ml_scores.append(0.0) scored["ml_probability"] = ml_scores scored["model_name"] = model_name return scored.sort_values(["ml_probability", "关键因子得分"], ascending=False).reset_index(drop=True)