Spaces:
Running
Running
| # app.py β Nexari G1 (Advanced Intent Analysis & Confidence Gating) | |
| import os | |
| import json | |
| import logging | |
| import asyncio | |
| from datetime import datetime | |
| import pytz | |
| from fastapi import FastAPI | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import Any, Dict, List | |
| # Local model modules | |
| import coder_model | |
| import chat_model | |
| # === SAFE IMPORT FOR NEW LIBRARIES === | |
| try: | |
| from sentence_transformers import SentenceTransformer, util | |
| from duckduckgo_search import DDGS | |
| NEURAL_AVAILABLE = True | |
| except ImportError: | |
| NEURAL_AVAILABLE = False | |
| print("β οΈ WARNING: sentence-transformers or duckduckgo-search not found.") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("nexari.app") | |
| app = FastAPI() | |
| MODEL_DIR = "./models" | |
| NEURAL_DIR = os.path.join(MODEL_DIR, "neural") | |
| # === CONFIGURATION === | |
| NEURAL_MODEL_NAME = "all-MiniLM-L6-v2" | |
| neural_classifier = None | |
| encoded_anchors = {} | |
| MAX_HISTORY_MESSAGES = 6 | |
| # Optimized Anchors for better Vector Separation | |
| INTENT_ANCHORS = { | |
| "coding": ["write python code", "fix bug", "create function", "script", "debug", "sql query", "html css", "java code"], | |
| "reasoning": ["solve math", "explain logic", "why", "prove that", "analyze", "physics", "chemistry"], | |
| "search": ["latest news", "price of gold", "weather today", "who is the ceo", "current stock price", "search google", "find info"], | |
| "time": ["what time is it", "current time", "date today", "clock", "day is today"], | |
| # New category to pull "Identity" questions away from Search | |
| "identity": ["what is your name", "who are you", "who created you", "tell me about yourself", "are you ai"] | |
| } | |
| def ensure_model_dir_or_fail(): | |
| try: | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| os.makedirs(NEURAL_DIR, exist_ok=True) | |
| except Exception as e: | |
| logger.critical("Unable to create model dir: %s", e) | |
| # === LOADERS === | |
| def load_neural_network(): | |
| global neural_classifier, encoded_anchors | |
| if not NEURAL_AVAILABLE: return | |
| try: | |
| logger.info("β³ Loading Neural Intent Model...") | |
| model = SentenceTransformer(NEURAL_MODEL_NAME, cache_folder=NEURAL_DIR, device="cpu") | |
| anchors = {} | |
| for intent, texts in INTENT_ANCHORS.items(): | |
| anchors[intent] = model.encode(texts, convert_to_tensor=True) | |
| neural_classifier = model | |
| encoded_anchors = anchors | |
| logger.info("β Neural Intent Classifier Ready!") | |
| except Exception as e: | |
| logger.error(f"β Failed to load Neural Network: {e}") | |
| async def load_neural_async(): | |
| await asyncio.to_thread(load_neural_network) | |
| async def startup_event(): | |
| ensure_model_dir_or_fail() | |
| coder_model.BASE_DIR = os.path.join(MODEL_DIR, "coder") | |
| chat_model.BASE_DIR = os.path.join(MODEL_DIR, "chat") | |
| tasks = [ | |
| asyncio.create_task(coder_model.load_model_async()), | |
| asyncio.create_task(chat_model.load_model_async()), | |
| asyncio.create_task(load_neural_async()), | |
| ] | |
| asyncio.gather(*tasks, return_exceptions=True) | |
| logger.info("π Server Startup Complete") | |
| class Message(BaseModel): | |
| role: str | |
| content: str | |
| class ChatRequest(BaseModel): | |
| messages: list[Message] | |
| stream: bool = True | |
| temperature: float = 0.7 | |
| # === TOOLS === | |
| def get_real_time(): | |
| try: | |
| ist = pytz.timezone('Asia/Kolkata') | |
| return datetime.now(ist).strftime("%A, %d %B %Y, %I:%M %p (IST)") | |
| except Exception: | |
| return str(datetime.now()) | |
| def search_sync(query: str): | |
| logger.info(f"π Executing Search for: {query}") | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=4)) | |
| if not results: return None | |
| formatted_res = "" | |
| for r in results: | |
| formatted_res += f"Source: {r['title']}\nSnippet: {r['body']}\nLink: {r['href']}\n\n" | |
| return formatted_res | |
| except Exception as e: | |
| logger.error(f"DDGS Error: {e}") | |
| return None | |
| async def perform_web_search(query: str): | |
| if not NEURAL_AVAILABLE: return None | |
| return await asyncio.to_thread(search_sync, query) | |
| # === ADVANCED INTENT LOGIC (2025 Technique) === | |
| def analyze_deep_intent(text: str): | |
| """ | |
| Combines Neural Similarity with Confidence Gating & Token Chain Analysis. | |
| Returns: (intent_name, confidence_score) | |
| """ | |
| # 1. Low-Level Token Analysis (The Chain Reaction Check) | |
| text_lower = text.lower() | |
| tokens = text_lower.split() | |
| # GUARDRAIL: Self-Reference Override | |
| # If user asks about "your name", "you", "yourself" -> Force Chat/Identity | |
| self_tokens = {"your", "you", "yourself", "created", "made"} | |
| if "name" in tokens and len(tokens) < 7 and any(t in tokens for t in self_tokens): | |
| return "identity", 0.99 | |
| if not neural_classifier: return "chat", 0.0 | |
| try: | |
| # 2. Neural Vector Search | |
| user_embedding = neural_classifier.encode(text, convert_to_tensor=True) | |
| scores = {} | |
| for intent, anchor_embeddings in encoded_anchors.items(): | |
| cosine_scores = util.cos_sim(user_embedding, anchor_embeddings) | |
| # We take the MAX score from the anchor group | |
| scores[intent] = float(cosine_scores.max()) | |
| best_intent = max(scores, key=scores.get) | |
| best_score = scores[best_intent] | |
| # 3. Confidence Gating | |
| # Search needs HIGH confidence to trigger (prevent false positives) | |
| if best_intent == "search": | |
| if best_score < 0.45: # Strict threshold for search | |
| logger.info(f"β οΈ Search detected but low confidence ({best_score:.2f}). Fallback to chat.") | |
| return "chat", best_score | |
| # Identity maps to chat logic internally | |
| if best_intent == "identity": | |
| return "chat", best_score | |
| # General Threshold | |
| if best_score < 0.30: | |
| return "chat", best_score | |
| return best_intent, best_score | |
| except Exception as e: | |
| logger.error(f"Intent Error: {e}") | |
| return "chat", 0.0 | |
| def sanitize_chunk(chunk: Any) -> Dict[str, Any]: | |
| if isinstance(chunk, dict): | |
| return {k: v for k, v in chunk.items() if k != "status"} | |
| return {"text": str(chunk)} | |
| SYSTEM_PREFIX = ( | |
| "You are Nexari G1, an advanced AI created by Piyush, the CEO of Nexari AI. " | |
| "Your core directive is to be helpful, and accurate. " | |
| "Always respond naturally, clarity, and Always respond with a friendly, positive tone, regardless of context. " | |
| "You have internet access, web search ability, and server tools. Never deny these capabilities. " | |
| # --- NEW FOCUS INSTRUCTION START --- | |
| "ATTENTION PROTOCOL: " | |
| "1. Always prioritize the userβs latest message above everything else. " | |
| "2. Use previous messages only for context (names, continuity, or details). " | |
| "3. If the user changes the topic, immediately switch and ignore the old topic. " | |
| "4. Do not repeat previous answers unless the user clearly asks for repetition. " | |
| # --- NEW FOCUS INSTRUCTION END --- | |
| "Use emojis to make the conversation lively. " | |
| ) | |
| def limit_context(messages: List[Dict]) -> List[Dict]: | |
| if not messages: return [] | |
| sys = messages[0] if messages[0].get("role") == "system" else None | |
| start = 1 if sys else 0 | |
| rem = messages[start:] | |
| if len(rem) > MAX_HISTORY_MESSAGES: rem = rem[-MAX_HISTORY_MESSAGES:] | |
| final = [] | |
| if sys: final.append(sys) | |
| final.extend(rem) | |
| return final | |
| async def chat_endpoint(request: ChatRequest): | |
| raw_msgs = [m.dict() for m in request.messages] if request.messages else [] | |
| if not raw_msgs: return {"error": "Empty messages"} | |
| last_msg_text = raw_msgs[-1]['content'] | |
| # === ANALYZE INTENT === | |
| intent, confidence = analyze_deep_intent(last_msg_text) | |
| logger.info(f"π§ Analysis: Text='{last_msg_text}' | Intent='{intent}' | Conf={confidence:.2f}") | |
| selected_model = chat_model.model | |
| sys_msg = SYSTEM_PREFIX | |
| status = "Thinking..." | |
| injected_context = "" | |
| # === ROUTING === | |
| if intent == "coding" and getattr(coder_model, "model", None): | |
| selected_model = coder_model.model | |
| sys_msg += " You are an Expert Coder. Provide clean, working code." | |
| status = "Coding..." | |
| elif intent == "reasoning" and getattr(chat_model, "model", None): | |
| selected_model = chat_model.model | |
| sys_msg += " Think step-by-step." | |
| status = "Reasoning..." | |
| elif intent == "time": | |
| t = get_real_time() | |
| injected_context = f"CURRENT DATE & TIME: {t}" | |
| status = "Checking Time..." | |
| elif intent == "search": | |
| status = "Searching Web..." | |
| clean_query = last_msg_text.replace("search", "").replace("google", "").strip() | |
| search_q = clean_query if len(clean_query) > 2 else last_msg_text | |
| res = await perform_web_search(search_q) | |
| if res: | |
| injected_context = ( | |
| f"### SEARCH RESULTS (REAL-TIME DATA):\n{res}\n" | |
| "### INSTRUCTION:\n" | |
| "Answer the user's question using ONLY the Search Results above." | |
| ) | |
| else: | |
| # Silent fallback if search fails | |
| injected_context = "" | |
| status = "Thinking..." | |
| # === CONSTRUCT MESSAGE === | |
| if raw_msgs[0].get("role") != "system": | |
| raw_msgs.insert(0, {"role":"system","content": sys_msg}) | |
| else: | |
| raw_msgs[0]["content"] = sys_msg | |
| if injected_context: | |
| new_content = ( | |
| f"{injected_context}\n\n" | |
| f"### USER QUESTION:\n{last_msg_text}" | |
| ) | |
| raw_msgs[-1]['content'] = new_content | |
| if not selected_model: | |
| if chat_model.model: selected_model = chat_model.model | |
| elif coder_model.model: selected_model = coder_model.model | |
| else: return {"error": "System warming up..."} | |
| final_msgs = limit_context(raw_msgs) | |
| def iter_response(): | |
| try: | |
| yield f"event: status\ndata: {json.dumps({'status': status})}\n\n" | |
| yield ":\n\n" | |
| stream = selected_model.create_chat_completion( | |
| messages=final_msgs, temperature=request.temperature, stream=True | |
| ) | |
| for chunk in stream: | |
| yield f"data: {json.dumps(sanitize_chunk(chunk))}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| logger.error(f"Stream error: {e}") | |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" | |
| return StreamingResponse(iter_response(), media_type="text/event-stream") | |