Goshawk_Hedge_Pro / volume_analysis.py
GoshawkVortexAI's picture
Update volume_analysis.py
a461698 verified
"""
volume_analysis.py — Volume & order flow with absorption detection,
multi-bar breakout confirmation, and fake breakout identification.
Key fixes vs prior version:
- Absorption detection: high-volume small-body bars at resistance = institutional selling
- Multi-bar breakout confirmation (BREAKOUT_CONFIRMATION_BARS) before firing signal
- ATR buffer on breakout level (price must exceed level by N*ATR, not just 1 tick)
- OBV slope computed over configurable window, normalized vs rolling stddev
- Climax threshold lowered (3.0x) and now triggers a hard absorption check
- Failed retest detection: breakout that closes back below the level = fake
"""
from typing import Dict, Any
import numpy as np
import pandas as pd
from config import (
VOLUME_MA_PERIOD,
VOLUME_SPIKE_MULT,
VOLUME_CLIMAX_MULT,
VOLUME_WEAK_THRESHOLD,
BREAKOUT_LOOKBACK,
BREAKOUT_ATR_BUFFER,
BREAKOUT_CONFIRMATION_BARS,
BREAKOUT_RETEST_BARS,
ABSORPTION_WICK_RATIO,
ABSORPTION_VOL_MULT,
ABSORPTION_BODY_RATIO,
OBV_SLOPE_BARS,
ATR_PERIOD,
)
def compute_volume_ma(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series:
return df["volume"].rolling(period).mean()
def detect_spikes(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
return df["volume"] > vol_ma * VOLUME_SPIKE_MULT
def detect_climax(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
return df["volume"] > vol_ma * VOLUME_CLIMAX_MULT
def detect_absorption(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
"""
Absorption = high-volume bar with small body and large upper wick,
occurring near recent highs (institutional supply absorbing retail demand).
Conditions (all must be true):
- Volume > ABSORPTION_VOL_MULT * MA
- Body / range < ABSORPTION_BODY_RATIO (small real body)
- Upper wick / range > ABSORPTION_WICK_RATIO (large upper wick)
- Close is in lower half of the bar's range (sellers won the bar)
"""
bar_range = (df["high"] - df["low"]).replace(0, np.nan)
body = (df["close"] - df["open"]).abs()
upper_wick = df["high"] - df[["close", "open"]].max(axis=1)
body_ratio = body / bar_range
wick_ratio = upper_wick / bar_range
close_in_lower_half = df["close"] < (df["low"] + bar_range * 0.5)
high_volume = df["volume"] > vol_ma * ABSORPTION_VOL_MULT
small_body = body_ratio < ABSORPTION_BODY_RATIO
large_wick = wick_ratio > ABSORPTION_WICK_RATIO
return high_volume & small_body & large_wick & close_in_lower_half
def compute_obv(df: pd.DataFrame) -> pd.Series:
direction = np.sign(df["close"].diff()).fillna(0)
return (df["volume"] * direction).cumsum()
def compute_obv_slope(obv: pd.Series, bars: int = OBV_SLOPE_BARS) -> pd.Series:
"""
OBV slope normalized by rolling stddev of OBV to make it comparable
across different price scales. Values > 1 = strong upward flow.
"""
x = np.arange(bars)
def slope_normalized(window):
if len(window) < bars:
return np.nan
s = np.polyfit(x, window, 1)[0]
std = np.std(window)
return s / std if std > 0 else 0.0
return obv.rolling(bars).apply(slope_normalized, raw=True)
def compute_delta_approx(df: pd.DataFrame) -> pd.Series:
body = df["close"] - df["open"]
wick = (df["high"] - df["low"]).replace(0, np.nan)
buy_ratio = ((body / wick) * 0.5 + 0.5).clip(0.0, 1.0).fillna(0.5)
return df["volume"] * buy_ratio - df["volume"] * (1 - buy_ratio)
def compute_vwap_deviation(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series:
typical = (df["high"] + df["low"] + df["close"]) / 3
cum_vp = (typical * df["volume"]).rolling(period).sum()
cum_vol = df["volume"].rolling(period).sum().replace(0, np.nan)
vwap = cum_vp / cum_vol
atr_approx = (df["high"] - df["low"]).rolling(ATR_PERIOD).mean().replace(0, np.nan)
return (df["close"] - vwap) / atr_approx
def compute_confirmed_breakout(
df: pd.DataFrame,
atr_series: pd.Series,
vol_ma: pd.Series,
lookback: int = BREAKOUT_LOOKBACK,
confirm_bars: int = BREAKOUT_CONFIRMATION_BARS,
atr_buffer: float = BREAKOUT_ATR_BUFFER,
) -> pd.Series:
"""
Genuine breakout requires ALL of:
1. Close exceeds prior N-bar high/low by at least atr_buffer * ATR
2. Close holds above/below that level for confirm_bars consecutive bars
3. Volume spike on at least one of the confirmation bars
4. No absorption signal on the breakout bar or confirmation bars
Returns: +1 confirmed bull breakout, -1 confirmed bear, 0 none
"""
prior_high = df["high"].rolling(lookback).max().shift(lookback)
prior_low = df["low"].rolling(lookback).min().shift(lookback)
spike = detect_spikes(df, vol_ma)
absorption = detect_absorption(df, vol_ma)
# Level cleared with buffer
cleared_up = df["close"] > prior_high + atr_series * atr_buffer
cleared_dn = df["close"] < prior_low - atr_series * atr_buffer
# Rolling confirmation: all bars in last confirm_bars cleared the level
held_up = cleared_up.rolling(confirm_bars).min().fillna(0).astype(bool)
held_dn = cleared_dn.rolling(confirm_bars).min().fillna(0).astype(bool)
# Volume spike in confirmation window
vol_ok = spike.rolling(confirm_bars).max().fillna(0).astype(bool)
# No absorption in confirmation window
no_absorption = (~absorption).rolling(confirm_bars).min().fillna(1).astype(bool)
signal = pd.Series(0, index=df.index)
signal[held_up & vol_ok & no_absorption] = 1
signal[held_dn & vol_ok & no_absorption] = -1
return signal
def detect_failed_breakout(
df: pd.DataFrame,
breakout_series: pd.Series,
atr_series: pd.Series,
retest_bars: int = BREAKOUT_RETEST_BARS,
) -> pd.Series:
"""
A breakout that closes back below/above the breakout level within
retest_bars is flagged as a failed (fake) breakout.
Returns: True where a prior confirmed breakout has since failed.
"""
prior_high = df["high"].rolling(BREAKOUT_LOOKBACK).max().shift(BREAKOUT_LOOKBACK)
prior_low = df["low"].rolling(BREAKOUT_LOOKBACK).min().shift(BREAKOUT_LOOKBACK)
had_bull_bo = breakout_series.shift(1).rolling(retest_bars).max().fillna(0) > 0
had_bear_bo = breakout_series.shift(1).rolling(retest_bars).min().fillna(0) < 0
# Failed: price returned below the breakout level
bull_failed = had_bull_bo & (df["close"] < prior_high.shift(retest_bars))
bear_failed = had_bear_bo & (df["close"] > prior_low.shift(retest_bars))
return bull_failed | bear_failed
def analyze_volume(df: pd.DataFrame, atr_series: pd.Series = None) -> Dict[str, Any]:
if atr_series is None:
# Fallback: compute simple ATR if not provided
high, low, prev_close = df["high"], df["low"], df["close"].shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
atr_series = tr.ewm(alpha=1.0 / ATR_PERIOD, adjust=False).mean()
vol_ma = compute_volume_ma(df, VOLUME_MA_PERIOD)
spike_series = detect_spikes(df, vol_ma)
climax_series = detect_climax(df, vol_ma)
absorption_series = detect_absorption(df, vol_ma)
obv = compute_obv(df)
obv_slope_series = compute_obv_slope(obv, OBV_SLOPE_BARS)
delta = compute_delta_approx(df)
vwap_dev = compute_vwap_deviation(df, VOLUME_MA_PERIOD)
breakout_series = compute_confirmed_breakout(
df, atr_series, vol_ma,
lookback=BREAKOUT_LOOKBACK,
confirm_bars=BREAKOUT_CONFIRMATION_BARS,
atr_buffer=BREAKOUT_ATR_BUFFER,
)
failed_breakout_series = detect_failed_breakout(df, breakout_series, atr_series)
last_vol = float(df["volume"].iloc[-1])
last_vol_ma = float(vol_ma.iloc[-1]) if not np.isnan(vol_ma.iloc[-1]) else 1.0
last_spike = bool(spike_series.iloc[-1])
last_climax = bool(climax_series.iloc[-1])
last_absorption = bool(absorption_series.iloc[-1])
last_breakout = int(breakout_series.iloc[-1])
last_failed_bo = bool(failed_breakout_series.iloc[-1])
last_obv_slope = float(obv_slope_series.iloc[-1]) if not np.isnan(obv_slope_series.iloc[-1]) else 0.0
last_vwap_dev = float(vwap_dev.iloc[-1]) if not np.isnan(vwap_dev.iloc[-1]) else 0.0
vol_ratio = last_vol / last_vol_ma if last_vol_ma > 0 else 1.0
weak_vol = vol_ratio < VOLUME_WEAK_THRESHOLD
delta_5 = float(delta.iloc[-5:].sum())
delta_sign = 1 if delta_5 > 0 else -1
# Recent failed breakout count (rolling 10 bars) — context for trust level
recent_failed = int(failed_breakout_series.iloc[-10:].sum())
# Score construction
if last_absorption:
# Absorption at high: bearish signal masquerading as bullish
base_score = 0.15
elif last_climax:
base_score = 0.25
elif last_breakout != 0 and not last_failed_bo:
base_score = 1.0
elif last_breakout != 0 and last_failed_bo:
base_score = 0.20
elif last_spike and not last_absorption:
base_score = 0.60
elif vol_ratio >= 1.2:
base_score = 0.45
elif vol_ratio >= 0.8:
base_score = 0.30
else:
base_score = 0.10
# OBV slope bonus/penalty (normalized)
obv_bonus = float(np.clip(last_obv_slope * 0.08, -0.12, 0.12))
# VWAP deviation bonus for on-side entries
vwap_bonus = 0.05 if (last_vwap_dev > 0 and last_breakout == 1) else 0.0
vwap_bonus += 0.05 if (last_vwap_dev < 0 and last_breakout == -1) else 0.0
# Penalty for recent failed breakouts (trust decay)
fake_penalty = min(0.20, recent_failed * 0.05)
volume_score = float(np.clip(base_score + obv_bonus + vwap_bonus - fake_penalty, 0.0, 1.0))
return {
"vol_ratio": round(vol_ratio, 3),
"spike": last_spike,
"climax": last_climax,
"absorption": last_absorption,
"weak": weak_vol,
"breakout": last_breakout,
"failed_breakout": last_failed_bo,
"recent_failed_count": recent_failed,
"obv_slope_norm": round(last_obv_slope, 4),
"delta_sum_5": round(delta_5, 2),
"delta_sign": delta_sign,
"vwap_deviation": round(last_vwap_dev, 4),
"volume_score": round(volume_score, 4),
"spike_series": spike_series,
"climax_series": climax_series,
"absorption_series": absorption_series,
"breakout_series": breakout_series,
"failed_breakout_series": failed_breakout_series,
}