| | """ |
| | feature_builder.py β Converts raw rule-engine output dicts into a clean |
| | feature vector for the ML model. Single responsibility: no model logic here. |
| | |
| | Design decisions: |
| | - All bool features cast to int (0/1) β LGBM handles natively but this |
| | keeps the matrix dtype homogeneous. |
| | - Engineered interaction terms computed here, not in regime/volume modules, |
| | to keep those modules free of ML concerns. |
| | - Returns a dict (for inference) or DataFrame row (for training). |
| | - FEATURE_COLUMNS from ml_config defines the canonical order β any missing |
| | feature raises KeyError immediately rather than silently producing NaN. |
| | """ |
| |
|
| | import math |
| | from typing import Dict, Any |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | from ml_config import FEATURE_COLUMNS |
| |
|
| |
|
| | def build_feature_dict( |
| | regime_data: Dict[str, Any], |
| | volume_data: Dict[str, Any], |
| | scores: Dict[str, Any], |
| | ) -> Dict[str, float]: |
| | """ |
| | Build the canonical feature dict from rule-engine outputs. |
| | All values are Python floats or ints β no pandas/numpy scalars. |
| | """ |
| | adx = float(regime_data.get("adx", 0.0)) |
| | di_plus = float(regime_data.get("di_plus", 0.0)) |
| | di_minus = float(regime_data.get("di_minus", 0.0)) |
| | di_sum = di_plus + di_minus + 1e-9 |
| | di_diff = di_plus - di_minus |
| | di_ratio = di_plus / di_sum |
| |
|
| | atr_pct = float(regime_data.get("atr_pct", 0.0)) |
| | vol_ratio = float(regime_data.get("vol_ratio", 1.0)) |
| | vol_compressed = int(bool(regime_data.get("vol_compressed", False))) |
| | vol_expanding = int(bool(regime_data.get("vol_expanding", False))) |
| | vol_expanding_from_base = int(bool(regime_data.get("vol_expanding_from_base", False))) |
| |
|
| | absorption = int(bool(volume_data.get("absorption", False))) |
| | failed_breakout = int(bool(volume_data.get("failed_breakout", False))) |
| | recent_failed_count = int(volume_data.get("recent_failed_count", 0)) |
| | obv_slope_norm = float(volume_data.get("obv_slope_norm", 0.0)) |
| | delta_sign = int(volume_data.get("delta_sign", 0)) |
| | spike = int(bool(volume_data.get("spike", False))) |
| | climax = int(bool(volume_data.get("climax", False))) |
| |
|
| | dist_atr = float(regime_data.get("dist_atr", 0.0)) |
| | dist_atr_abs = abs(dist_atr) |
| |
|
| | regime_confidence = float(regime_data.get("regime_confidence", 0.0)) |
| | regime_score = float(scores.get("regime_score", 0.0)) |
| | volume_score = float(scores.get("volume_score", 0.0)) |
| | structure_score = float(scores.get("structure_score", 0.0)) |
| | confidence_score = float(scores.get("confidence_score", 0.0)) |
| | total_score = float(scores.get("total_score", 0.0)) |
| |
|
| | |
| | adx_x_regime = adx * regime_score |
| | vol_x_obv = vol_ratio * obv_slope_norm |
| | score_x_conf = total_score * regime_confidence |
| |
|
| | raw = { |
| | "adx": adx, |
| | "di_plus": di_plus, |
| | "di_minus": di_minus, |
| | "di_diff": di_diff, |
| | "di_ratio": di_ratio, |
| | "atr_pct": atr_pct, |
| | "vol_ratio": vol_ratio, |
| | "vol_compressed": vol_compressed, |
| | "vol_expanding": vol_expanding, |
| | "vol_expanding_from_base": vol_expanding_from_base, |
| | "absorption": absorption, |
| | "failed_breakout": failed_breakout, |
| | "recent_failed_count": recent_failed_count, |
| | "obv_slope_norm": obv_slope_norm, |
| | "delta_sign": delta_sign, |
| | "spike": spike, |
| | "climax": climax, |
| | "dist_atr": dist_atr, |
| | "dist_atr_abs": dist_atr_abs, |
| | "regime_confidence": regime_confidence, |
| | "regime_score": regime_score, |
| | "volume_score": volume_score, |
| | "structure_score": structure_score, |
| | "confidence_score": confidence_score, |
| | "total_score": total_score, |
| | "adx_x_regime": adx_x_regime, |
| | "vol_x_obv": vol_x_obv, |
| | "score_x_conf": score_x_conf, |
| | } |
| |
|
| | |
| | missing = set(FEATURE_COLUMNS) - set(raw.keys()) |
| | if missing: |
| | raise KeyError(f"Missing features: {missing}") |
| |
|
| | |
| | return {k: raw[k] for k in FEATURE_COLUMNS} |
| |
|
| |
|
| | def feature_dict_to_row(feat: Dict[str, float]) -> pd.Series: |
| | """Convert feature dict to a pandas Series with canonical column order.""" |
| | return pd.Series({k: feat[k] for k in FEATURE_COLUMNS}) |
| |
|
| |
|
| | def feature_dict_to_matrix(feat: Dict[str, float]) -> np.ndarray: |
| | """ |
| | Convert single feature dict to (1, n_features) numpy array for inference. |
| | Preserves canonical column order from FEATURE_COLUMNS. |
| | """ |
| | return np.array([[feat[k] for k in FEATURE_COLUMNS]], dtype=np.float64) |
| |
|
| |
|
| | def validate_features(feat: Dict[str, float]) -> bool: |
| | """Return True if all features are finite and present.""" |
| | for k in FEATURE_COLUMNS: |
| | v = feat.get(k) |
| | if v is None or (isinstance(v, float) and not math.isfinite(v)): |
| | return False |
| | return True |
| |
|