# app.py — Nexari G1 (Advanced Intent Analysis & Confidence Gating) import os import json import logging import asyncio from datetime import datetime import pytz from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Any, Dict, List # Local model modules import coder_model import chat_model # === SAFE IMPORT FOR NEW LIBRARIES === try: from sentence_transformers import SentenceTransformer, util from duckduckgo_search import DDGS NEURAL_AVAILABLE = True except ImportError: NEURAL_AVAILABLE = False print("⚠️ WARNING: sentence-transformers or duckduckgo-search not found.") logging.basicConfig(level=logging.INFO) logger = logging.getLogger("nexari.app") app = FastAPI() MODEL_DIR = "./models" NEURAL_DIR = os.path.join(MODEL_DIR, "neural") # === CONFIGURATION === NEURAL_MODEL_NAME = "all-MiniLM-L6-v2" neural_classifier = None encoded_anchors = {} MAX_HISTORY_MESSAGES = 6 # Optimized Anchors for better Vector Separation INTENT_ANCHORS = { "coding": ["write python code", "fix bug", "create function", "script", "debug", "sql query", "html css", "java code"], "reasoning": ["solve math", "explain logic", "why", "prove that", "analyze", "physics", "chemistry"], "search": ["latest news", "price of gold", "weather today", "who is the ceo", "current stock price", "search google", "find info"], "time": ["what time is it", "current time", "date today", "clock", "day is today"], # New category to pull "Identity" questions away from Search "identity": ["what is your name", "who are you", "who created you", "tell me about yourself", "are you ai"] } def ensure_model_dir_or_fail(): try: os.makedirs(MODEL_DIR, exist_ok=True) os.makedirs(NEURAL_DIR, exist_ok=True) except Exception as e: logger.critical("Unable to create model dir: %s", e) # === LOADERS === def load_neural_network(): global neural_classifier, encoded_anchors if not NEURAL_AVAILABLE: return try: logger.info("⏳ Loading Neural Intent Model...") model = SentenceTransformer(NEURAL_MODEL_NAME, cache_folder=NEURAL_DIR, device="cpu") anchors = {} for intent, texts in INTENT_ANCHORS.items(): anchors[intent] = model.encode(texts, convert_to_tensor=True) neural_classifier = model encoded_anchors = anchors logger.info("✅ Neural Intent Classifier Ready!") except Exception as e: logger.error(f"❌ Failed to load Neural Network: {e}") async def load_neural_async(): await asyncio.to_thread(load_neural_network) @app.on_event("startup") async def startup_event(): ensure_model_dir_or_fail() coder_model.BASE_DIR = os.path.join(MODEL_DIR, "coder") chat_model.BASE_DIR = os.path.join(MODEL_DIR, "chat") tasks = [ asyncio.create_task(coder_model.load_model_async()), asyncio.create_task(chat_model.load_model_async()), asyncio.create_task(load_neural_async()), ] asyncio.gather(*tasks, return_exceptions=True) logger.info("🚀 Server Startup Complete") class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: list[Message] stream: bool = True temperature: float = 0.7 # === TOOLS === def get_real_time(): try: ist = pytz.timezone('Asia/Kolkata') return datetime.now(ist).strftime("%A, %d %B %Y, %I:%M %p (IST)") except Exception: return str(datetime.now()) def search_sync(query: str): logger.info(f"🔎 Executing Search for: {query}") try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=4)) if not results: return None formatted_res = "" for r in results: formatted_res += f"Source: {r['title']}\nSnippet: {r['body']}\nLink: {r['href']}\n\n" return formatted_res except Exception as e: logger.error(f"DDGS Error: {e}") return None async def perform_web_search(query: str): if not NEURAL_AVAILABLE: return None return await asyncio.to_thread(search_sync, query) # === ADVANCED INTENT LOGIC (2025 Technique) === def analyze_deep_intent(text: str): """ Combines Neural Similarity with Confidence Gating & Token Chain Analysis. Returns: (intent_name, confidence_score) """ # 1. Low-Level Token Analysis (The Chain Reaction Check) text_lower = text.lower() tokens = text_lower.split() # GUARDRAIL: Self-Reference Override # If user asks about "your name", "you", "yourself" -> Force Chat/Identity self_tokens = {"your", "you", "yourself", "created", "made"} if "name" in tokens and len(tokens) < 7 and any(t in tokens for t in self_tokens): return "identity", 0.99 if not neural_classifier: return "chat", 0.0 try: # 2. Neural Vector Search user_embedding = neural_classifier.encode(text, convert_to_tensor=True) scores = {} for intent, anchor_embeddings in encoded_anchors.items(): cosine_scores = util.cos_sim(user_embedding, anchor_embeddings) # We take the MAX score from the anchor group scores[intent] = float(cosine_scores.max()) best_intent = max(scores, key=scores.get) best_score = scores[best_intent] # 3. Confidence Gating # Search needs HIGH confidence to trigger (prevent false positives) if best_intent == "search": if best_score < 0.45: # Strict threshold for search logger.info(f"⚠️ Search detected but low confidence ({best_score:.2f}). Fallback to chat.") return "chat", best_score # Identity maps to chat logic internally if best_intent == "identity": return "chat", best_score # General Threshold if best_score < 0.30: return "chat", best_score return best_intent, best_score except Exception as e: logger.error(f"Intent Error: {e}") return "chat", 0.0 def sanitize_chunk(chunk: Any) -> Dict[str, Any]: if isinstance(chunk, dict): return {k: v for k, v in chunk.items() if k != "status"} return {"text": str(chunk)} SYSTEM_PREFIX = ( "You are Nexari G1, an advanced AI created by Piyush, the CEO of Nexari AI. " "Your core directive is to be helpful, and accurate. " "Always respond naturally, clarity, and Always respond with a friendly, positive tone, regardless of context. " "You have internet access, web search ability, and server tools. Never deny these capabilities. " # --- NEW FOCUS INSTRUCTION START --- "ATTENTION PROTOCOL: " "1. Always prioritize the user’s latest message above everything else. " "2. Use previous messages only for context (names, continuity, or details). " "3. If the user changes the topic, immediately switch and ignore the old topic. " "4. Do not repeat previous answers unless the user clearly asks for repetition. " # --- NEW FOCUS INSTRUCTION END --- "Use emojis to make the conversation lively. " ) def limit_context(messages: List[Dict]) -> List[Dict]: if not messages: return [] sys = messages[0] if messages[0].get("role") == "system" else None start = 1 if sys else 0 rem = messages[start:] if len(rem) > MAX_HISTORY_MESSAGES: rem = rem[-MAX_HISTORY_MESSAGES:] final = [] if sys: final.append(sys) final.extend(rem) return final @app.post("/v1/chat/completions") async def chat_endpoint(request: ChatRequest): raw_msgs = [m.dict() for m in request.messages] if request.messages else [] if not raw_msgs: return {"error": "Empty messages"} last_msg_text = raw_msgs[-1]['content'] # === ANALYZE INTENT === intent, confidence = analyze_deep_intent(last_msg_text) logger.info(f"🧠 Analysis: Text='{last_msg_text}' | Intent='{intent}' | Conf={confidence:.2f}") selected_model = chat_model.model sys_msg = SYSTEM_PREFIX status = "Thinking..." injected_context = "" # === ROUTING === if intent == "coding" and getattr(coder_model, "model", None): selected_model = coder_model.model sys_msg += " You are an Expert Coder. Provide clean, working code." status = "Coding..." elif intent == "reasoning" and getattr(chat_model, "model", None): selected_model = chat_model.model sys_msg += " Think step-by-step." status = "Reasoning..." elif intent == "time": t = get_real_time() injected_context = f"CURRENT DATE & TIME: {t}" status = "Checking Time..." elif intent == "search": status = "Searching Web..." clean_query = last_msg_text.replace("search", "").replace("google", "").strip() search_q = clean_query if len(clean_query) > 2 else last_msg_text res = await perform_web_search(search_q) if res: injected_context = ( f"### SEARCH RESULTS (REAL-TIME DATA):\n{res}\n" "### INSTRUCTION:\n" "Answer the user's question using ONLY the Search Results above." ) else: # Silent fallback if search fails injected_context = "" status = "Thinking..." # === CONSTRUCT MESSAGE === if raw_msgs[0].get("role") != "system": raw_msgs.insert(0, {"role":"system","content": sys_msg}) else: raw_msgs[0]["content"] = sys_msg if injected_context: new_content = ( f"{injected_context}\n\n" f"### USER QUESTION:\n{last_msg_text}" ) raw_msgs[-1]['content'] = new_content if not selected_model: if chat_model.model: selected_model = chat_model.model elif coder_model.model: selected_model = coder_model.model else: return {"error": "System warming up..."} final_msgs = limit_context(raw_msgs) def iter_response(): try: yield f"event: status\ndata: {json.dumps({'status': status})}\n\n" yield ":\n\n" stream = selected_model.create_chat_completion( messages=final_msgs, temperature=request.temperature, stream=True ) for chunk in stream: yield f"data: {json.dumps(sanitize_chunk(chunk))}\n\n" yield "data: [DONE]\n\n" except Exception as e: logger.error(f"Stream error: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(iter_response(), media_type="text/event-stream")