petter2025's picture
Update app.py
abc61d7 verified
# app.py – ARF v4 API with Gradio frontend (FastAPI mounted under /api)
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional, List
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
import gradio as gr
# ARF v4 imports
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory
from agentic_reliability_framework.runtime.memory.constants import MemoryConstants
# Additional imports for policy and cost
from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine
from agentic_reliability_framework.core.governance.cost_estimator import CostEstimator
from agentic_reliability_framework.core.governance.intents import (
DeployConfigurationIntent,
Environment,
)
from agentic_reliability_framework.core.governance.healing_intent import (
HealingIntent,
RecommendedAction,
IntentStatus,
IntentSource,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ========================= FASTAPI APP =========================
fastapi_app = FastAPI(title="ARF v4 API")
# Enable CORS for your frontend
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["https://arf-frontend-sandy.vercel.app"],
allow_methods=["*"],
allow_headers=["*"],
)
# ========================= ARF COMPONENTS =========================
risk_engine = RiskEngine()
faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM)
memory = RAGGraphMemory(faiss_index)
# Policy engine and cost estimator
policy_engine = PolicyEngine() # You may need to load policies
cost_estimator = CostEstimator() # Default estimator
# In‑memory storage for demo purposes (used by /v1/history and /v1/feedback)
decision_history = []
# ========================= PYDANTIC MODELS =========================
class EvaluateRequest(BaseModel):
service_name: str
event_type: str
severity: str
metrics: Dict[str, float] = {}
class EvaluateResponse(BaseModel):
risk_score: float
base_risk: float
memory_risk: Optional[float] = None
weight: float
similar_events: list = []
confidence: float
# ========================= HELPER: Demo Intent =========================
class _DemoIntent:
environment = "dev"
deployment_target = "dev"
service_name = "demo"
# ========================= API ENDPOINTS =========================
@fastapi_app.get("/")
async def root():
"""Root endpoint – returns a welcome message."""
return {"message": "ARF v4 API. See /docs for documentation."}
@fastapi_app.get("/health")
async def health():
return {"status": "ok", "version": "4.2.0"}
@fastapi_app.get("/v1/get_risk")
async def get_risk():
"""Return the current demo risk."""
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None, # will be filled when feedback is given
})
return {
"system_risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"contributions": contributions,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
@fastapi_app.get("/v1/history")
async def get_history():
"""Return the last 10 decisions."""
return decision_history[-10:]
@fastapi_app.post("/v1/incidents/evaluate")
async def evaluate_incident(request: EvaluateRequest):
"""
Evaluate an incident by converting it into an infrastructure intent
and running it through the full governance components. Returns a complete
HealingIntent with risk assessment, similar incidents, and recommended actions.
"""
try:
# Map the incident to a DeployConfigurationIntent (as an example)
# You can change the mapping logic based on your needs.
intent = DeployConfigurationIntent(
service_name=request.service_name,
change_scope="single_instance", # default
deployment_target=Environment.DEV, # assume dev for now
configuration=request.metrics,
requester="system",
provenance={"source": "incident_evaluation", "event_type": request.event_type, "severity": request.severity},
)
# 1. Evaluate policies
policy_violations = policy_engine.evaluate_policies(intent) or []
# 2. Estimate cost
cost_projection = cost_estimator.estimate_monthly_cost(intent)
# 3. Compute risk score from risk engine
risk_score, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=cost_projection,
policy_violations=policy_violations,
)
# 4. Retrieve similar incidents from memory
similar_incidents = []
if memory and memory.has_historical_data():
# You need to embed the incident appropriately; for now, pass a dummy event
# This is a placeholder – you'll need to adapt based on your memory module.
# For simplicity, we'll leave it empty.
pass
# 5. Determine recommended action based on risk score
if risk_score < 0.2:
action = RecommendedAction.APPROVE
elif risk_score > 0.8:
action = RecommendedAction.DENY
else:
action = RecommendedAction.ESCALATE
# 6. Build risk_factors from component contributions
risk_factors = {}
weights = contributions.get("weights", {})
if weights.get("conjugate", 0.0) > 0:
conj_risk = contributions.get("conjugate_mean", risk_score)
risk_factors["conjugate"] = weights["conjugate"] * conj_risk
if weights.get("hyper", 0.0) > 0:
hyper_risk = contributions.get("hyper_mean", risk_score)
risk_factors["hyperprior"] = weights["hyper"] * hyper_risk
if weights.get("hmc", 0.0) > 0:
hmc_risk = contributions.get("hmc_prediction", risk_score)
risk_factors["hmc"] = weights["hmc"] * hmc_risk
# Fallback if no factors added
if not risk_factors:
risk_factors["conjugate"] = risk_score
# 7. Build HealingIntent manually
healing_intent = HealingIntent(
action=action.value,
component=intent.service_name,
parameters={}, # You can add more parameters if needed
justification=explanation,
confidence=0.9, # Placeholder – could be derived from epistemic uncertainty
incident_id="", # Not used in this context
detected_at=datetime.now(timezone.utc).timestamp(),
risk_score=risk_score,
risk_factors=risk_factors,
cost_projection=cost_projection,
recommended_action=action,
similar_incidents=similar_incidents,
policy_violations=policy_violations,
status=IntentStatus.OSS_ADVISORY_ONLY,
source=IntentSource.INFRASTRUCTURE_ANALYSIS,
requires_enterprise=True,
execution_allowed=False,
)
# Convert to dictionary for response
response_dict = healing_intent.to_dict(include_oss_context=True)
# Add computed fields expected by frontend
# (These might already be in HealingIntent, but ensure they exist)
if "epistemic_uncertainty" not in response_dict:
response_dict["epistemic_uncertainty"] = 0.05 # default
if "confidence_interval" not in response_dict:
# Use a simple +/- 0.05 interval
response_dict["confidence_interval"] = [
max(0.0, risk_score - 0.05),
min(1.0, risk_score + 0.05),
]
if "risk_contributions" not in response_dict:
# Convert contributions to list format (keeping only factors)
response_dict["risk_contributions"] = [
{"factor": k, "contribution": v}
for k, v in contributions.items() if k not in ["weights", "conjugate_mean", "hmc_prediction"]
]
return response_dict
except Exception as e:
logger.exception("Error in evaluate_incident")
raise HTTPException(status_code=500, detail=str(e))
@fastapi_app.post("/v1/feedback")
async def record_outcome(decision_id: str, success: bool):
"""Record the outcome of a decision (success/failure)."""
for dec in decision_history:
if dec["decision_id"] == decision_id:
dec["outcome"] = "success" if success else "failure"
# Update the risk engine (optional)
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"status": "ok", "decision_id": decision_id, "outcome": dec["outcome"]}
return {"error": "decision not found"}
# ========================= NEW MEMORY STATS ENDPOINT =========================
@fastapi_app.get("/v1/memory/stats")
async def get_memory_stats():
"""Return current memory graph statistics."""
if memory:
return memory.get_graph_stats()
return {"error": "Memory not initialized"}
# ========================= GRADIO UI =========================
def get_risk_snapshot():
try:
intent = _DemoIntent()
risk_value, explanation, contributions = risk_engine.calculate_risk(
intent=intent,
cost_estimate=None,
policy_violations=[],
)
decision = "approve"
if risk_value > 0.8:
decision = "deny"
elif risk_value > 0.2:
decision = "escalate"
decision_id = str(uuid.uuid4())
decision_history.append({
"decision_id": decision_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"risk_score": float(risk_value),
"outcome": None,
})
# Build risk_factors for UI
risk_factors = {}
weights = contributions.get("weights", {})
if weights.get("conjugate", 0.0) > 0:
conj_risk = contributions.get("conjugate_mean", risk_value)
risk_factors["conjugate"] = weights["conjugate"] * conj_risk
if weights.get("hyper", 0.0) > 0:
hyper_risk = contributions.get("hyper_mean", risk_value)
risk_factors["hyperprior"] = weights["hyper"] * hyper_risk
if weights.get("hmc", 0.0) > 0:
hmc_risk = contributions.get("hmc_prediction", risk_value)
risk_factors["hmc"] = weights["hmc"] * hmc_risk
if not risk_factors:
risk_factors["conjugate"] = risk_value
return {
"risk": float(risk_value),
"status": "critical" if risk_value > 0.8 else "normal",
"explanation": explanation,
"risk_factors": risk_factors,
"decision_id": decision_id,
"decision": decision,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.exception("Failed to compute risk snapshot")
return {"error": str(e)}
def get_health_snapshot():
return {"status": "ok", "version": "4.2.0", "service": "ARF OSS API", "timestamp": datetime.now(timezone.utc).isoformat()}
def get_memory_snapshot():
if memory.has_historical_data():
return {"status": "ok", "memory_stats": memory.get_graph_stats(), "timestamp": datetime.now(timezone.utc).isoformat()}
return {"status": "empty", "memory_stats": "No historical memory yet.", "timestamp": datetime.now(timezone.utc).isoformat()}
def record_outcome_ui(success: bool):
if not decision_history:
return {"error": "no decisions yet"}
last = decision_history[-1]
last["outcome"] = "success" if success else "failure"
intent = _DemoIntent()
try:
risk_engine.update_outcome(intent, success)
except Exception as e:
logger.exception("Outcome update failed")
return {"decision_id": last["decision_id"], "outcome": last["outcome"], "timestamp": datetime.now(timezone.utc).isoformat()}
with gr.Blocks(title="ARF v4.2.0 Demo", theme=gr.themes.Soft()) as demo:
gr.Markdown("# Agentic Reliability Framework v4.2.0")
gr.Markdown("### Probabilistic Infrastructure Governance – [📚 API Docs](/api/docs) | [📦 GitHub](https://github.com/arf-foundation/agentic-reliability-framework) | [📅 Book a Call](https://calendly.com/petter2025us/30min)")
gr.Markdown("---")
with gr.Row():
health_output = gr.JSON(label="Health")
risk_output = gr.JSON(label="Current Risk")
with gr.Row():
memory_output = gr.JSON(label="Memory Stats")
with gr.Row():
decision_output = gr.JSON(label="Recent Decisions")
with gr.Row():
refresh_btn = gr.Button("Evaluate Intent")
success_btn = gr.Button("Action Succeeded")
fail_btn = gr.Button("Action Failed")
refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output)
success_btn.click(fn=lambda: record_outcome_ui(True), outputs=decision_output)
fail_btn.click(fn=lambda: record_outcome_ui(False), outputs=decision_output)
with gr.Row():
health_btn = gr.Button("Refresh Health")
memory_btn = gr.Button("Refresh Memory")
history_btn = gr.Button("Show Decision History")
health_btn.click(fn=get_health_snapshot, outputs=health_output)
memory_btn.click(fn=get_memory_snapshot, outputs=memory_output)
history_btn.click(fn=lambda: decision_history[-10:], outputs=decision_output)
# ========================= Mount Gradio and Add Documentation Routes =========================
app = gr.mount_gradio_app(fastapi_app, demo, path="/api")
# Add documentation routes at "/docs"
@app.get("/docs", include_in_schema=False)
async def swagger_ui():
return get_swagger_ui_html(
openapi_url="/openapi.json",
title="ARF API Docs"
)
@app.get("/redoc", include_in_schema=False)
async def redoc_ui():
return get_redoc_html(
openapi_url="/openapi.json",
title="ARF API ReDoc"
)
@app.get("/openapi.json", include_in_schema=False)
async def openapi():
return fastapi_app.openapi()
@app.get("/api/docs", include_in_schema=False)
async def redirect_docs():
return RedirectResponse(url="/docs")