Explorar o código

新增结构化机器学习模型框架

DESKTOP-74CLTRG\Leol hai 4 días
pai
achega
21d6cdf17e

+ 1 - 0
defect_analysis/ml/__init__.py

@@ -0,0 +1 @@
+"""机器学习与统计模型模块。"""

+ 22 - 0
defect_analysis/ml/datasets.py

@@ -0,0 +1,22 @@
+"""训练数据集构建。"""
+
+from defect_analysis.ml.features import build_feature_frame
+from defect_analysis.schemas import normalize_defect_schema
+
+
+def build_target_series(df, *, target_defect_type=None, target_severity=None):
+    normalized = normalize_defect_schema(df)
+    if target_defect_type:
+        return (normalized["defect_type"] == target_defect_type).astype(int)
+    if target_severity:
+        return (normalized["severity"] == target_severity).astype(int)
+    return (normalized["severity"] == "严重").astype(int)
+
+
+def build_supervised_dataset(df, *, target_defect_type=None, target_severity=None):
+    """构建监督学习数据集。"""
+    return build_feature_frame(df), build_target_series(
+        df,
+        target_defect_type=target_defect_type,
+        target_severity=target_severity,
+    )

+ 46 - 0
defect_analysis/ml/features.py

@@ -0,0 +1,46 @@
+"""结构化 ML 特征工程。"""
+
+import pandas as pd
+
+from defect_analysis.schemas import normalize_defect_schema
+
+
+CATEGORICAL_FEATURES = [
+    "equipment_id",
+    "seat_id",
+    "lam_equipment_id",
+    "lam_seat_id",
+    "lam_fixture_id",
+    "lam_nozzle_id",
+    "material_lot_oca",
+    "material_lot_glass",
+    "clean_equipment_id",
+    "bond_equipment_id",
+    "shift",
+    "defect_geometry_type",
+]
+
+NUMERIC_FEATURES = [
+    "x_mm",
+    "y_mm",
+    "hour",
+    "width_mm",
+    "height_mm",
+    "length_mm",
+    "angle_deg",
+    "area_mm2",
+]
+
+
+def build_feature_frame(df):
+    """把标准缺陷数据转换为可训练的数值特征矩阵。"""
+    normalized = normalize_defect_schema(df)
+    numeric = normalized[[col for col in NUMERIC_FEATURES if col in normalized.columns]].copy()
+    for column in numeric.columns:
+        numeric[column] = pd.to_numeric(numeric[column], errors="coerce").fillna(0.0)
+
+    categorical = normalized[[col for col in CATEGORICAL_FEATURES if col in normalized.columns]].fillna("")
+    encoded = pd.get_dummies(categorical.astype(str), prefix_sep="=", dtype=float)
+
+    features = pd.concat([numeric.reset_index(drop=True), encoded.reset_index(drop=True)], axis=1)
+    return features.fillna(0.0)

+ 22 - 0
defect_analysis/ml/image_models.py

@@ -0,0 +1,22 @@
+"""图像模型接口占位。
+
+当前项目尚未接入 AOI 图片路径、标注数据和深度学习运行环境,因此这里提供
+明确的接口与不可用错误,避免把 PyTorch/TensorFlow 作为硬依赖拖入主应用。
+"""
+
+
+class ImageModelUnavailable(RuntimeError):
+    """图像模型后端或模型文件不可用。"""
+
+
+class ImageModelWrapper:
+    """未来 AOI 图像模型的统一包装接口。"""
+
+    def __init__(self, backend=None, model_path=None):
+        self.backend = backend
+        self.model_path = model_path
+
+    def predict(self, image_paths):
+        raise ImageModelUnavailable(
+            "图像模型尚未配置。请先接入 AOI 图片路径、标注数据、模型文件和 GPU/推理环境。"
+        )

+ 13 - 0
defect_analysis/ml/model_registry.py

@@ -0,0 +1,13 @@
+"""模型后端检测与注册信息。"""
+
+import importlib.util
+
+
+def detect_optional_model_backends():
+    """检测可选 ML 后端是否可用,缺失时不影响主应用启动。"""
+    return {
+        "xgboost": importlib.util.find_spec("xgboost") is not None,
+        "lightgbm": importlib.util.find_spec("lightgbm") is not None,
+        "torch": importlib.util.find_spec("torch") is not None,
+        "tensorflow": importlib.util.find_spec("tensorflow") is not None,
+    }

+ 48 - 0
defect_analysis/ml/predict.py

@@ -0,0 +1,48 @@
+"""统一 ML 推理入口。"""
+
+import pandas as pd
+
+from defect_analysis.ml.datasets import build_supervised_dataset
+from defect_analysis.ml.key_factors import find_key_factors
+from defect_analysis.ml.tabular_models import train_tabular_model
+
+
+def predict_key_factors(df, *, target_defect_type=None, target_severity=None, model_name="random_forest", top_n=20):
+    """用统计关键因子 + 监督模型概率输出候选关键因子。"""
+    key_factors = find_key_factors(
+        df,
+        target_defect_type=target_defect_type,
+        target_severity=target_severity,
+        top_n=top_n,
+    )
+    if key_factors.empty:
+        return key_factors
+
+    X, y = build_supervised_dataset(
+        df,
+        target_defect_type=target_defect_type,
+        target_severity=target_severity,
+    )
+    if y.nunique() < 2:
+        key_factors["ml_probability"] = 0.0
+        key_factors["model_name"] = model_name
+        return key_factors
+
+    trained = train_tabular_model(model_name, X, y)
+    model = trained["model"]
+    probabilities = pd.Series(model.predict_proba(X)[:, 1], index=X.index)
+
+    scored = key_factors.copy()
+    ml_scores = []
+    for _, row in scored.iterrows():
+        dimension = row["维度"]
+        value = row["因子值"]
+        column = f"{dimension}={value}"
+        if column in X.columns:
+            mask = X[column] == 1
+            ml_scores.append(float(probabilities.loc[mask].mean()) if mask.any() else 0.0)
+        else:
+            ml_scores.append(0.0)
+    scored["ml_probability"] = ml_scores
+    scored["model_name"] = model_name
+    return scored.sort_values(["ml_probability", "关键因子得分"], ascending=False).reset_index(drop=True)

+ 86 - 0
defect_analysis/ml/tabular_models.py

@@ -0,0 +1,86 @@
+"""表格模型训练入口。"""
+
+from sklearn.ensemble import IsolationForest, RandomForestClassifier
+from sklearn.linear_model import LogisticRegression
+from sklearn.metrics import accuracy_score, roc_auc_score
+from sklearn.pipeline import make_pipeline
+from sklearn.preprocessing import StandardScaler
+
+
+def _classification_metrics(model, X, y):
+    pred = model.predict(X)
+    metrics = {"train_accuracy": float(accuracy_score(y, pred))}
+    if hasattr(model, "predict_proba") and len(set(y)) > 1:
+        metrics["train_auc"] = float(roc_auc_score(y, model.predict_proba(X)[:, 1]))
+    return metrics
+
+
+def train_tabular_model(model_name, X, y=None, *, random_state=42):
+    """训练表格模型。
+
+    支持 random_forest、logistic_regression、isolation_forest。
+    """
+    if model_name == "random_forest":
+        if y is None:
+            raise ValueError("random_forest 需要监督标签 y")
+        model = RandomForestClassifier(
+            n_estimators=100,
+            max_depth=8,
+            min_samples_leaf=2,
+            random_state=random_state,
+            class_weight="balanced",
+        )
+        model.fit(X, y)
+        return {"model_name": model_name, "model": model, "metrics": _classification_metrics(model, X, y)}
+
+    if model_name == "logistic_regression":
+        if y is None:
+            raise ValueError("logistic_regression 需要监督标签 y")
+        model = make_pipeline(
+            StandardScaler(with_mean=False),
+            LogisticRegression(max_iter=3000, class_weight="balanced", solver="liblinear"),
+        )
+        model.fit(X, y)
+        return {"model_name": model_name, "model": model, "metrics": _classification_metrics(model, X, y)}
+
+    if model_name == "isolation_forest":
+        model = IsolationForest(n_estimators=100, contamination="auto", random_state=random_state)
+        model.fit(X)
+        scores = -model.decision_function(X)
+        return {"model_name": model_name, "model": model, "anomaly_scores": scores}
+
+    if model_name == "xgboost":
+        if y is None:
+            raise ValueError("xgboost 需要监督标签 y")
+        try:
+            from xgboost import XGBClassifier
+        except ImportError as exc:
+            raise RuntimeError("XGBoost 未安装,请安装 xgboost 后再启用该模型") from exc
+        model = XGBClassifier(
+            n_estimators=100,
+            max_depth=4,
+            learning_rate=0.08,
+            eval_metric="logloss",
+            random_state=random_state,
+        )
+        model.fit(X, y)
+        return {"model_name": model_name, "model": model, "metrics": _classification_metrics(model, X, y)}
+
+    if model_name == "lightgbm":
+        if y is None:
+            raise ValueError("lightgbm 需要监督标签 y")
+        try:
+            from lightgbm import LGBMClassifier
+        except ImportError as exc:
+            raise RuntimeError("LightGBM 未安装,请安装 lightgbm 后再启用该模型") from exc
+        model = LGBMClassifier(
+            n_estimators=100,
+            max_depth=4,
+            learning_rate=0.08,
+            random_state=random_state,
+            verbose=-1,
+        )
+        model.fit(X, y)
+        return {"model_name": model_name, "model": model, "metrics": _classification_metrics(model, X, y)}
+
+    raise ValueError(f"不支持的模型: {model_name}")

+ 96 - 0
tests/test_ml_platform.py

@@ -0,0 +1,96 @@
+import unittest
+
+import pandas as pd
+
+from defect_analysis.ml.datasets import build_supervised_dataset
+from defect_analysis.ml.features import build_feature_frame
+from defect_analysis.ml.image_models import ImageModelUnavailable, ImageModelWrapper
+from defect_analysis.ml.model_registry import detect_optional_model_backends
+from defect_analysis.ml.predict import predict_key_factors
+from defect_analysis.ml.tabular_models import train_tabular_model
+from defect_analysis.schemas import normalize_defect_schema
+
+
+class MLPlatformTest(unittest.TestCase):
+    def setUp(self):
+        rows = []
+        for i in range(40):
+            hot = i < 24
+            rows.append(
+                {
+                    "defect_id": f"D{i}",
+                    "panel_id": f"P{i}",
+                    "batch_id": "B1",
+                    "equipment_id": "LAM-A01" if hot else "LAM-B01",
+                    "seat_id": "R1C1" if hot else "R2C2",
+                    "inspection_station": "AOI-1",
+                    "timestamp": pd.Timestamp("2026-04-01 08:00:00"),
+                    "defect_type": "气泡" if hot else "划痕",
+                    "severity": "严重" if i % 5 == 0 else "轻微",
+                    "x_mm": 10.0 + i,
+                    "y_mm": 20.0,
+                    "panel_width_mm": 155.0,
+                    "panel_height_mm": 340.0,
+                    "hour": 8,
+                    "shift": "白班",
+                    "day": "2026-04-01",
+                    "lam_fixture_id": "FIX-HOT" if hot else "FIX-OK",
+                    "material_lot_oca": "OCA-HOT" if hot else "OCA-OK",
+                }
+            )
+        self.df = normalize_defect_schema(pd.DataFrame(rows))
+
+    def test_build_feature_frame_creates_numeric_matrix(self):
+        features = build_feature_frame(self.df)
+
+        self.assertEqual(len(self.df), len(features))
+        self.assertTrue(all(dtype.kind in "biufc" for dtype in features.dtypes))
+        self.assertTrue(any(col.startswith("equipment_id=") for col in features.columns))
+
+    def test_build_supervised_dataset_targets_defect_type(self):
+        X, y = build_supervised_dataset(self.df, target_defect_type="气泡")
+
+        self.assertEqual(len(self.df), len(X))
+        self.assertEqual(24, int(y.sum()))
+
+    def test_train_random_forest_and_logistic_regression(self):
+        X, y = build_supervised_dataset(self.df, target_defect_type="气泡")
+
+        rf = train_tabular_model("random_forest", X, y)
+        lr = train_tabular_model("logistic_regression", X, y)
+
+        self.assertIn("model", rf)
+        self.assertIn("metrics", rf)
+        self.assertIn("model", lr)
+        self.assertGreaterEqual(rf["metrics"]["train_accuracy"], 0.5)
+
+    def test_train_isolation_forest_outputs_anomaly_scores(self):
+        X = build_feature_frame(self.df)
+
+        result = train_tabular_model("isolation_forest", X)
+
+        self.assertIn("anomaly_scores", result)
+        self.assertEqual(len(self.df), len(result["anomaly_scores"]))
+
+    def test_predict_key_factors_returns_model_scores(self):
+        predictions = predict_key_factors(self.df, target_defect_type="气泡")
+
+        self.assertFalse(predictions.empty)
+        self.assertIn("ml_probability", predictions.columns)
+        self.assertIn("model_name", predictions.columns)
+
+    def test_optional_backends_are_reported_without_import_failure(self):
+        backends = detect_optional_model_backends()
+
+        self.assertIn("xgboost", backends)
+        self.assertIn("lightgbm", backends)
+
+    def test_image_model_wrapper_is_explicitly_unavailable_without_backend(self):
+        wrapper = ImageModelWrapper()
+
+        with self.assertRaises(ImageModelUnavailable):
+            wrapper.predict([])
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 67 - 0
train_ml_models.py

@@ -0,0 +1,67 @@
+"""训练和验证结构化 ML 模型。"""
+
+import argparse
+
+import pandas as pd
+
+from defect_analysis.ml.datasets import build_supervised_dataset
+from defect_analysis.ml.features import build_feature_frame
+from defect_analysis.ml.model_registry import detect_optional_model_backends
+from defect_analysis.ml.predict import predict_key_factors
+from defect_analysis.ml.tabular_models import train_tabular_model
+from defect_analysis.schemas import normalize_defect_schema
+
+
+def load_defect_csv(csv_path):
+    return normalize_defect_schema(pd.read_csv(csv_path, parse_dates=["timestamp"], encoding="utf-8-sig"))
+
+
+def main():
+    parser = argparse.ArgumentParser(description="训练/运行不良分析 ML 模型")
+    parser.add_argument("--csv", default="defect_data.csv")
+    parser.add_argument(
+        "--model",
+        default="random_forest",
+        choices=["random_forest", "logistic_regression", "isolation_forest", "xgboost", "lightgbm"],
+    )
+    parser.add_argument("--target-defect-type")
+    parser.add_argument("--target-severity")
+    parser.add_argument("--top-n", type=int, default=10)
+    parser.add_argument("--show-backends", action="store_true")
+    args = parser.parse_args()
+
+    if args.show_backends:
+        print(detect_optional_model_backends())
+
+    df = load_defect_csv(args.csv)
+    if args.model == "isolation_forest":
+        X = build_feature_frame(df)
+        result = train_tabular_model("isolation_forest", X)
+        scores = pd.Series(result["anomaly_scores"])
+        print(f"IsolationForest 完成: 样本数={len(scores)}, 最高异常分={scores.max():.4f}, 平均异常分={scores.mean():.4f}")
+        return
+
+    X, y = build_supervised_dataset(
+        df,
+        target_defect_type=args.target_defect_type,
+        target_severity=args.target_severity,
+    )
+    result = train_tabular_model(args.model, X, y)
+    print(f"{args.model} 训练完成: {result['metrics']}")
+
+    predictions = predict_key_factors(
+        df,
+        target_defect_type=args.target_defect_type,
+        target_severity=args.target_severity,
+        model_name=args.model,
+        top_n=args.top_n,
+    )
+    if predictions.empty:
+        print("未找到关键因子候选。")
+    else:
+        columns = ["维度", "因子值", "目标数", "异常倍数", "关键因子得分", "ml_probability", "model_name"]
+        print(predictions[columns].to_string(index=False))
+
+
+if __name__ == "__main__":
+    main()