# app.py – ARF v4 API with Gradio frontend (FastAPI mounted under /api) import logging import uuid from datetime import datetime, timezone from typing import Dict, Optional, List from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html from fastapi.responses import RedirectResponse from pydantic import BaseModel import gradio as gr # ARF v4 imports from agentic_reliability_framework.core.governance.risk_engine import RiskEngine from agentic_reliability_framework.runtime.memory import create_faiss_index, RAGGraphMemory from agentic_reliability_framework.runtime.memory.constants import MemoryConstants # Additional imports for policy and cost from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine from agentic_reliability_framework.core.governance.cost_estimator import CostEstimator from agentic_reliability_framework.core.governance.intents import ( DeployConfigurationIntent, Environment, ) from agentic_reliability_framework.core.governance.healing_intent import ( HealingIntent, RecommendedAction, IntentStatus, IntentSource, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ========================= FASTAPI APP ========================= fastapi_app = FastAPI(title="ARF v4 API") # Enable CORS for your frontend fastapi_app.add_middleware( CORSMiddleware, allow_origins=["https://arf-frontend-sandy.vercel.app"], allow_methods=["*"], allow_headers=["*"], ) # ========================= ARF COMPONENTS ========================= risk_engine = RiskEngine() faiss_index = create_faiss_index(dim=MemoryConstants.VECTOR_DIM) memory = RAGGraphMemory(faiss_index) # Policy engine and cost estimator policy_engine = PolicyEngine() # You may need to load policies cost_estimator = CostEstimator() # Default estimator # In‑memory storage for demo purposes (used by /v1/history and /v1/feedback) decision_history = [] # ========================= PYDANTIC MODELS ========================= class EvaluateRequest(BaseModel): service_name: str event_type: str severity: str metrics: Dict[str, float] = {} class EvaluateResponse(BaseModel): risk_score: float base_risk: float memory_risk: Optional[float] = None weight: float similar_events: list = [] confidence: float # ========================= HELPER: Demo Intent ========================= class _DemoIntent: environment = "dev" deployment_target = "dev" service_name = "demo" # ========================= API ENDPOINTS ========================= @fastapi_app.get("/") async def root(): """Root endpoint – returns a welcome message.""" return {"message": "ARF v4 API. See /docs for documentation."} @fastapi_app.get("/health") async def health(): return {"status": "ok", "version": "4.2.0"} @fastapi_app.get("/v1/get_risk") async def get_risk(): """Return the current demo risk.""" intent = _DemoIntent() risk_value, explanation, contributions = risk_engine.calculate_risk( intent=intent, cost_estimate=None, policy_violations=[], ) decision = "approve" if risk_value > 0.8: decision = "deny" elif risk_value > 0.2: decision = "escalate" decision_id = str(uuid.uuid4()) decision_history.append({ "decision_id": decision_id, "timestamp": datetime.now(timezone.utc).isoformat(), "risk_score": float(risk_value), "outcome": None, # will be filled when feedback is given }) return { "system_risk": float(risk_value), "status": "critical" if risk_value > 0.8 else "normal", "explanation": explanation, "contributions": contributions, "decision_id": decision_id, "decision": decision, "timestamp": datetime.now(timezone.utc).isoformat() } @fastapi_app.get("/v1/history") async def get_history(): """Return the last 10 decisions.""" return decision_history[-10:] @fastapi_app.post("/v1/incidents/evaluate") async def evaluate_incident(request: EvaluateRequest): """ Evaluate an incident by converting it into an infrastructure intent and running it through the full governance components. Returns a complete HealingIntent with risk assessment, similar incidents, and recommended actions. """ try: # Map the incident to a DeployConfigurationIntent (as an example) # You can change the mapping logic based on your needs. intent = DeployConfigurationIntent( service_name=request.service_name, change_scope="single_instance", # default deployment_target=Environment.DEV, # assume dev for now configuration=request.metrics, requester="system", provenance={"source": "incident_evaluation", "event_type": request.event_type, "severity": request.severity}, ) # 1. Evaluate policies policy_violations = policy_engine.evaluate_policies(intent) or [] # 2. Estimate cost cost_projection = cost_estimator.estimate_monthly_cost(intent) # 3. Compute risk score from risk engine risk_score, explanation, contributions = risk_engine.calculate_risk( intent=intent, cost_estimate=cost_projection, policy_violations=policy_violations, ) # 4. Retrieve similar incidents from memory similar_incidents = [] if memory and memory.has_historical_data(): # You need to embed the incident appropriately; for now, pass a dummy event # This is a placeholder – you'll need to adapt based on your memory module. # For simplicity, we'll leave it empty. pass # 5. Determine recommended action based on risk score if risk_score < 0.2: action = RecommendedAction.APPROVE elif risk_score > 0.8: action = RecommendedAction.DENY else: action = RecommendedAction.ESCALATE # 6. Build risk_factors from component contributions risk_factors = {} weights = contributions.get("weights", {}) if weights.get("conjugate", 0.0) > 0: conj_risk = contributions.get("conjugate_mean", risk_score) risk_factors["conjugate"] = weights["conjugate"] * conj_risk if weights.get("hyper", 0.0) > 0: hyper_risk = contributions.get("hyper_mean", risk_score) risk_factors["hyperprior"] = weights["hyper"] * hyper_risk if weights.get("hmc", 0.0) > 0: hmc_risk = contributions.get("hmc_prediction", risk_score) risk_factors["hmc"] = weights["hmc"] * hmc_risk # Fallback if no factors added if not risk_factors: risk_factors["conjugate"] = risk_score # 7. Build HealingIntent manually healing_intent = HealingIntent( action=action.value, component=intent.service_name, parameters={}, # You can add more parameters if needed justification=explanation, confidence=0.9, # Placeholder – could be derived from epistemic uncertainty incident_id="", # Not used in this context detected_at=datetime.now(timezone.utc).timestamp(), risk_score=risk_score, risk_factors=risk_factors, cost_projection=cost_projection, recommended_action=action, similar_incidents=similar_incidents, policy_violations=policy_violations, status=IntentStatus.OSS_ADVISORY_ONLY, source=IntentSource.INFRASTRUCTURE_ANALYSIS, requires_enterprise=True, execution_allowed=False, ) # Convert to dictionary for response response_dict = healing_intent.to_dict(include_oss_context=True) # Add computed fields expected by frontend # (These might already be in HealingIntent, but ensure they exist) if "epistemic_uncertainty" not in response_dict: response_dict["epistemic_uncertainty"] = 0.05 # default if "confidence_interval" not in response_dict: # Use a simple +/- 0.05 interval response_dict["confidence_interval"] = [ max(0.0, risk_score - 0.05), min(1.0, risk_score + 0.05), ] if "risk_contributions" not in response_dict: # Convert contributions to list format (keeping only factors) response_dict["risk_contributions"] = [ {"factor": k, "contribution": v} for k, v in contributions.items() if k not in ["weights", "conjugate_mean", "hmc_prediction"] ] return response_dict except Exception as e: logger.exception("Error in evaluate_incident") raise HTTPException(status_code=500, detail=str(e)) @fastapi_app.post("/v1/feedback") async def record_outcome(decision_id: str, success: bool): """Record the outcome of a decision (success/failure).""" for dec in decision_history: if dec["decision_id"] == decision_id: dec["outcome"] = "success" if success else "failure" # Update the risk engine (optional) intent = _DemoIntent() try: risk_engine.update_outcome(intent, success) except Exception as e: logger.exception("Outcome update failed") return {"status": "ok", "decision_id": decision_id, "outcome": dec["outcome"]} return {"error": "decision not found"} # ========================= NEW MEMORY STATS ENDPOINT ========================= @fastapi_app.get("/v1/memory/stats") async def get_memory_stats(): """Return current memory graph statistics.""" if memory: return memory.get_graph_stats() return {"error": "Memory not initialized"} # ========================= GRADIO UI ========================= def get_risk_snapshot(): try: intent = _DemoIntent() risk_value, explanation, contributions = risk_engine.calculate_risk( intent=intent, cost_estimate=None, policy_violations=[], ) decision = "approve" if risk_value > 0.8: decision = "deny" elif risk_value > 0.2: decision = "escalate" decision_id = str(uuid.uuid4()) decision_history.append({ "decision_id": decision_id, "timestamp": datetime.now(timezone.utc).isoformat(), "risk_score": float(risk_value), "outcome": None, }) # Build risk_factors for UI risk_factors = {} weights = contributions.get("weights", {}) if weights.get("conjugate", 0.0) > 0: conj_risk = contributions.get("conjugate_mean", risk_value) risk_factors["conjugate"] = weights["conjugate"] * conj_risk if weights.get("hyper", 0.0) > 0: hyper_risk = contributions.get("hyper_mean", risk_value) risk_factors["hyperprior"] = weights["hyper"] * hyper_risk if weights.get("hmc", 0.0) > 0: hmc_risk = contributions.get("hmc_prediction", risk_value) risk_factors["hmc"] = weights["hmc"] * hmc_risk if not risk_factors: risk_factors["conjugate"] = risk_value return { "risk": float(risk_value), "status": "critical" if risk_value > 0.8 else "normal", "explanation": explanation, "risk_factors": risk_factors, "decision_id": decision_id, "decision": decision, "timestamp": datetime.now(timezone.utc).isoformat() } except Exception as e: logger.exception("Failed to compute risk snapshot") return {"error": str(e)} def get_health_snapshot(): return {"status": "ok", "version": "4.2.0", "service": "ARF OSS API", "timestamp": datetime.now(timezone.utc).isoformat()} def get_memory_snapshot(): if memory.has_historical_data(): return {"status": "ok", "memory_stats": memory.get_graph_stats(), "timestamp": datetime.now(timezone.utc).isoformat()} return {"status": "empty", "memory_stats": "No historical memory yet.", "timestamp": datetime.now(timezone.utc).isoformat()} def record_outcome_ui(success: bool): if not decision_history: return {"error": "no decisions yet"} last = decision_history[-1] last["outcome"] = "success" if success else "failure" intent = _DemoIntent() try: risk_engine.update_outcome(intent, success) except Exception as e: logger.exception("Outcome update failed") return {"decision_id": last["decision_id"], "outcome": last["outcome"], "timestamp": datetime.now(timezone.utc).isoformat()} with gr.Blocks(title="ARF v4.2.0 Demo", theme=gr.themes.Soft()) as demo: gr.Markdown("# Agentic Reliability Framework v4.2.0") gr.Markdown("### Probabilistic Infrastructure Governance – [📚 API Docs](/api/docs) | [📦 GitHub](https://github.com/arf-foundation/agentic-reliability-framework) | [📅 Book a Call](https://calendly.com/petter2025us/30min)") gr.Markdown("---") with gr.Row(): health_output = gr.JSON(label="Health") risk_output = gr.JSON(label="Current Risk") with gr.Row(): memory_output = gr.JSON(label="Memory Stats") with gr.Row(): decision_output = gr.JSON(label="Recent Decisions") with gr.Row(): refresh_btn = gr.Button("Evaluate Intent") success_btn = gr.Button("Action Succeeded") fail_btn = gr.Button("Action Failed") refresh_btn.click(fn=get_risk_snapshot, outputs=risk_output) success_btn.click(fn=lambda: record_outcome_ui(True), outputs=decision_output) fail_btn.click(fn=lambda: record_outcome_ui(False), outputs=decision_output) with gr.Row(): health_btn = gr.Button("Refresh Health") memory_btn = gr.Button("Refresh Memory") history_btn = gr.Button("Show Decision History") health_btn.click(fn=get_health_snapshot, outputs=health_output) memory_btn.click(fn=get_memory_snapshot, outputs=memory_output) history_btn.click(fn=lambda: decision_history[-10:], outputs=decision_output) # ========================= Mount Gradio and Add Documentation Routes ========================= app = gr.mount_gradio_app(fastapi_app, demo, path="/api") # Add documentation routes at "/docs" @app.get("/docs", include_in_schema=False) async def swagger_ui(): return get_swagger_ui_html( openapi_url="/openapi.json", title="ARF API Docs" ) @app.get("/redoc", include_in_schema=False) async def redoc_ui(): return get_redoc_html( openapi_url="/openapi.json", title="ARF API ReDoc" ) @app.get("/openapi.json", include_in_schema=False) async def openapi(): return fastapi_app.openapi() @app.get("/api/docs", include_in_schema=False) async def redirect_docs(): return RedirectResponse(url="/docs")