""" volume_analysis.py — Volume & order flow with absorption detection, multi-bar breakout confirmation, and fake breakout identification. Key fixes vs prior version: - Absorption detection: high-volume small-body bars at resistance = institutional selling - Multi-bar breakout confirmation (BREAKOUT_CONFIRMATION_BARS) before firing signal - ATR buffer on breakout level (price must exceed level by N*ATR, not just 1 tick) - OBV slope computed over configurable window, normalized vs rolling stddev - Climax threshold lowered (3.0x) and now triggers a hard absorption check - Failed retest detection: breakout that closes back below the level = fake """ from typing import Dict, Any import numpy as np import pandas as pd from config import ( VOLUME_MA_PERIOD, VOLUME_SPIKE_MULT, VOLUME_CLIMAX_MULT, VOLUME_WEAK_THRESHOLD, BREAKOUT_LOOKBACK, BREAKOUT_ATR_BUFFER, BREAKOUT_CONFIRMATION_BARS, BREAKOUT_RETEST_BARS, ABSORPTION_WICK_RATIO, ABSORPTION_VOL_MULT, ABSORPTION_BODY_RATIO, OBV_SLOPE_BARS, ATR_PERIOD, ) def compute_volume_ma(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series: return df["volume"].rolling(period).mean() def detect_spikes(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: return df["volume"] > vol_ma * VOLUME_SPIKE_MULT def detect_climax(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: return df["volume"] > vol_ma * VOLUME_CLIMAX_MULT def detect_absorption(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: """ Absorption = high-volume bar with small body and large upper wick, occurring near recent highs (institutional supply absorbing retail demand). Conditions (all must be true): - Volume > ABSORPTION_VOL_MULT * MA - Body / range < ABSORPTION_BODY_RATIO (small real body) - Upper wick / range > ABSORPTION_WICK_RATIO (large upper wick) - Close is in lower half of the bar's range (sellers won the bar) """ bar_range = (df["high"] - df["low"]).replace(0, np.nan) body = (df["close"] - df["open"]).abs() upper_wick = df["high"] - df[["close", "open"]].max(axis=1) body_ratio = body / bar_range wick_ratio = upper_wick / bar_range close_in_lower_half = df["close"] < (df["low"] + bar_range * 0.5) high_volume = df["volume"] > vol_ma * ABSORPTION_VOL_MULT small_body = body_ratio < ABSORPTION_BODY_RATIO large_wick = wick_ratio > ABSORPTION_WICK_RATIO return high_volume & small_body & large_wick & close_in_lower_half def compute_obv(df: pd.DataFrame) -> pd.Series: direction = np.sign(df["close"].diff()).fillna(0) return (df["volume"] * direction).cumsum() def compute_obv_slope(obv: pd.Series, bars: int = OBV_SLOPE_BARS) -> pd.Series: """ OBV slope normalized by rolling stddev of OBV to make it comparable across different price scales. Values > 1 = strong upward flow. """ x = np.arange(bars) def slope_normalized(window): if len(window) < bars: return np.nan s = np.polyfit(x, window, 1)[0] std = np.std(window) return s / std if std > 0 else 0.0 return obv.rolling(bars).apply(slope_normalized, raw=True) def compute_delta_approx(df: pd.DataFrame) -> pd.Series: body = df["close"] - df["open"] wick = (df["high"] - df["low"]).replace(0, np.nan) buy_ratio = ((body / wick) * 0.5 + 0.5).clip(0.0, 1.0).fillna(0.5) return df["volume"] * buy_ratio - df["volume"] * (1 - buy_ratio) def compute_vwap_deviation(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series: typical = (df["high"] + df["low"] + df["close"]) / 3 cum_vp = (typical * df["volume"]).rolling(period).sum() cum_vol = df["volume"].rolling(period).sum().replace(0, np.nan) vwap = cum_vp / cum_vol atr_approx = (df["high"] - df["low"]).rolling(ATR_PERIOD).mean().replace(0, np.nan) return (df["close"] - vwap) / atr_approx def compute_confirmed_breakout( df: pd.DataFrame, atr_series: pd.Series, vol_ma: pd.Series, lookback: int = BREAKOUT_LOOKBACK, confirm_bars: int = BREAKOUT_CONFIRMATION_BARS, atr_buffer: float = BREAKOUT_ATR_BUFFER, ) -> pd.Series: """ Genuine breakout requires ALL of: 1. Close exceeds prior N-bar high/low by at least atr_buffer * ATR 2. Close holds above/below that level for confirm_bars consecutive bars 3. Volume spike on at least one of the confirmation bars 4. No absorption signal on the breakout bar or confirmation bars Returns: +1 confirmed bull breakout, -1 confirmed bear, 0 none """ prior_high = df["high"].rolling(lookback).max().shift(lookback) prior_low = df["low"].rolling(lookback).min().shift(lookback) spike = detect_spikes(df, vol_ma) absorption = detect_absorption(df, vol_ma) # Level cleared with buffer cleared_up = df["close"] > prior_high + atr_series * atr_buffer cleared_dn = df["close"] < prior_low - atr_series * atr_buffer # Rolling confirmation: all bars in last confirm_bars cleared the level held_up = cleared_up.rolling(confirm_bars).min().fillna(0).astype(bool) held_dn = cleared_dn.rolling(confirm_bars).min().fillna(0).astype(bool) # Volume spike in confirmation window vol_ok = spike.rolling(confirm_bars).max().fillna(0).astype(bool) # No absorption in confirmation window no_absorption = (~absorption).rolling(confirm_bars).min().fillna(1).astype(bool) signal = pd.Series(0, index=df.index) signal[held_up & vol_ok & no_absorption] = 1 signal[held_dn & vol_ok & no_absorption] = -1 return signal def detect_failed_breakout( df: pd.DataFrame, breakout_series: pd.Series, atr_series: pd.Series, retest_bars: int = BREAKOUT_RETEST_BARS, ) -> pd.Series: """ A breakout that closes back below/above the breakout level within retest_bars is flagged as a failed (fake) breakout. Returns: True where a prior confirmed breakout has since failed. """ prior_high = df["high"].rolling(BREAKOUT_LOOKBACK).max().shift(BREAKOUT_LOOKBACK) prior_low = df["low"].rolling(BREAKOUT_LOOKBACK).min().shift(BREAKOUT_LOOKBACK) had_bull_bo = breakout_series.shift(1).rolling(retest_bars).max().fillna(0) > 0 had_bear_bo = breakout_series.shift(1).rolling(retest_bars).min().fillna(0) < 0 # Failed: price returned below the breakout level bull_failed = had_bull_bo & (df["close"] < prior_high.shift(retest_bars)) bear_failed = had_bear_bo & (df["close"] > prior_low.shift(retest_bars)) return bull_failed | bear_failed def analyze_volume(df: pd.DataFrame, atr_series: pd.Series = None) -> Dict[str, Any]: if atr_series is None: # Fallback: compute simple ATR if not provided high, low, prev_close = df["high"], df["low"], df["close"].shift(1) tr = pd.concat( [high - low, (high - prev_close).abs(), (low - prev_close).abs()], axis=1, ).max(axis=1) atr_series = tr.ewm(alpha=1.0 / ATR_PERIOD, adjust=False).mean() vol_ma = compute_volume_ma(df, VOLUME_MA_PERIOD) spike_series = detect_spikes(df, vol_ma) climax_series = detect_climax(df, vol_ma) absorption_series = detect_absorption(df, vol_ma) obv = compute_obv(df) obv_slope_series = compute_obv_slope(obv, OBV_SLOPE_BARS) delta = compute_delta_approx(df) vwap_dev = compute_vwap_deviation(df, VOLUME_MA_PERIOD) breakout_series = compute_confirmed_breakout( df, atr_series, vol_ma, lookback=BREAKOUT_LOOKBACK, confirm_bars=BREAKOUT_CONFIRMATION_BARS, atr_buffer=BREAKOUT_ATR_BUFFER, ) failed_breakout_series = detect_failed_breakout(df, breakout_series, atr_series) last_vol = float(df["volume"].iloc[-1]) last_vol_ma = float(vol_ma.iloc[-1]) if not np.isnan(vol_ma.iloc[-1]) else 1.0 last_spike = bool(spike_series.iloc[-1]) last_climax = bool(climax_series.iloc[-1]) last_absorption = bool(absorption_series.iloc[-1]) last_breakout = int(breakout_series.iloc[-1]) last_failed_bo = bool(failed_breakout_series.iloc[-1]) last_obv_slope = float(obv_slope_series.iloc[-1]) if not np.isnan(obv_slope_series.iloc[-1]) else 0.0 last_vwap_dev = float(vwap_dev.iloc[-1]) if not np.isnan(vwap_dev.iloc[-1]) else 0.0 vol_ratio = last_vol / last_vol_ma if last_vol_ma > 0 else 1.0 weak_vol = vol_ratio < VOLUME_WEAK_THRESHOLD delta_5 = float(delta.iloc[-5:].sum()) delta_sign = 1 if delta_5 > 0 else -1 # Recent failed breakout count (rolling 10 bars) — context for trust level recent_failed = int(failed_breakout_series.iloc[-10:].sum()) # Score construction if last_absorption: # Absorption at high: bearish signal masquerading as bullish base_score = 0.15 elif last_climax: base_score = 0.25 elif last_breakout != 0 and not last_failed_bo: base_score = 1.0 elif last_breakout != 0 and last_failed_bo: base_score = 0.20 elif last_spike and not last_absorption: base_score = 0.60 elif vol_ratio >= 1.2: base_score = 0.45 elif vol_ratio >= 0.8: base_score = 0.30 else: base_score = 0.10 # OBV slope bonus/penalty (normalized) obv_bonus = float(np.clip(last_obv_slope * 0.08, -0.12, 0.12)) # VWAP deviation bonus for on-side entries vwap_bonus = 0.05 if (last_vwap_dev > 0 and last_breakout == 1) else 0.0 vwap_bonus += 0.05 if (last_vwap_dev < 0 and last_breakout == -1) else 0.0 # Penalty for recent failed breakouts (trust decay) fake_penalty = min(0.20, recent_failed * 0.05) volume_score = float(np.clip(base_score + obv_bonus + vwap_bonus - fake_penalty, 0.0, 1.0)) return { "vol_ratio": round(vol_ratio, 3), "spike": last_spike, "climax": last_climax, "absorption": last_absorption, "weak": weak_vol, "breakout": last_breakout, "failed_breakout": last_failed_bo, "recent_failed_count": recent_failed, "obv_slope_norm": round(last_obv_slope, 4), "delta_sum_5": round(delta_5, 2), "delta_sign": delta_sign, "vwap_deviation": round(last_vwap_dev, 4), "volume_score": round(volume_score, 4), "spike_series": spike_series, "climax_series": climax_series, "absorption_series": absorption_series, "breakout_series": breakout_series, "failed_breakout_series": failed_breakout_series, }